1import unittest 2from unittest import mock 3from test import support 4from test.support import import_helper 5from test.support import os_helper 6from test.support import warnings_helper 7import subprocess 8import sys 9import signal 10import io 11import itertools 12import os 13import errno 14import tempfile 15import time 16import traceback 17import types 18import selectors 19import sysconfig 20import select 21import shutil 22import threading 23import gc 24import textwrap 25import json 26import pathlib 27from test.support.os_helper import FakePath 28 29try: 30 import _testcapi 31except ImportError: 32 _testcapi = None 33 34try: 35 import pwd 36except ImportError: 37 pwd = None 38try: 39 import grp 40except ImportError: 41 grp = None 42 43try: 44 import fcntl 45except: 46 fcntl = None 47 48if support.PGO: 49 raise unittest.SkipTest("test is not helpful for PGO") 50 51mswindows = (sys.platform == "win32") 52 53# 54# Depends on the following external programs: Python 55# 56 57if mswindows: 58 SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), ' 59 'os.O_BINARY);') 60else: 61 SETBINARY = '' 62 63NONEXISTING_CMD = ('nonexisting_i_hope',) 64# Ignore errors that indicate the command was not found 65NONEXISTING_ERRORS = (FileNotFoundError, NotADirectoryError, PermissionError) 66 67ZERO_RETURN_CMD = (sys.executable, '-c', 'pass') 68 69 70def setUpModule(): 71 shell_true = shutil.which('true') 72 if shell_true is None: 73 return 74 if (os.access(shell_true, os.X_OK) and 75 subprocess.run([shell_true]).returncode == 0): 76 global ZERO_RETURN_CMD 77 ZERO_RETURN_CMD = (shell_true,) # Faster than Python startup. 78 79 80class BaseTestCase(unittest.TestCase): 81 def setUp(self): 82 # Try to minimize the number of children we have so this test 83 # doesn't crash on some buildbots (Alphas in particular). 84 support.reap_children() 85 86 def tearDown(self): 87 if not mswindows: 88 # subprocess._active is not used on Windows and is set to None. 89 for inst in subprocess._active: 90 inst.wait() 91 subprocess._cleanup() 92 self.assertFalse( 93 subprocess._active, "subprocess._active not empty" 94 ) 95 self.doCleanups() 96 support.reap_children() 97 98 99class PopenTestException(Exception): 100 pass 101 102 103class PopenExecuteChildRaises(subprocess.Popen): 104 """Popen subclass for testing cleanup of subprocess.PIPE filehandles when 105 _execute_child fails. 106 """ 107 def _execute_child(self, *args, **kwargs): 108 raise PopenTestException("Forced Exception for Test") 109 110 111class ProcessTestCase(BaseTestCase): 112 113 def test_io_buffered_by_default(self): 114 p = subprocess.Popen(ZERO_RETURN_CMD, 115 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 116 stderr=subprocess.PIPE) 117 try: 118 self.assertIsInstance(p.stdin, io.BufferedIOBase) 119 self.assertIsInstance(p.stdout, io.BufferedIOBase) 120 self.assertIsInstance(p.stderr, io.BufferedIOBase) 121 finally: 122 p.stdin.close() 123 p.stdout.close() 124 p.stderr.close() 125 p.wait() 126 127 def test_io_unbuffered_works(self): 128 p = subprocess.Popen(ZERO_RETURN_CMD, 129 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 130 stderr=subprocess.PIPE, bufsize=0) 131 try: 132 self.assertIsInstance(p.stdin, io.RawIOBase) 133 self.assertIsInstance(p.stdout, io.RawIOBase) 134 self.assertIsInstance(p.stderr, io.RawIOBase) 135 finally: 136 p.stdin.close() 137 p.stdout.close() 138 p.stderr.close() 139 p.wait() 140 141 def test_call_seq(self): 142 # call() function with sequence argument 143 rc = subprocess.call([sys.executable, "-c", 144 "import sys; sys.exit(47)"]) 145 self.assertEqual(rc, 47) 146 147 def test_call_timeout(self): 148 # call() function with timeout argument; we want to test that the child 149 # process gets killed when the timeout expires. If the child isn't 150 # killed, this call will deadlock since subprocess.call waits for the 151 # child. 152 self.assertRaises(subprocess.TimeoutExpired, subprocess.call, 153 [sys.executable, "-c", "while True: pass"], 154 timeout=0.1) 155 156 def test_check_call_zero(self): 157 # check_call() function with zero return code 158 rc = subprocess.check_call(ZERO_RETURN_CMD) 159 self.assertEqual(rc, 0) 160 161 def test_check_call_nonzero(self): 162 # check_call() function with non-zero return code 163 with self.assertRaises(subprocess.CalledProcessError) as c: 164 subprocess.check_call([sys.executable, "-c", 165 "import sys; sys.exit(47)"]) 166 self.assertEqual(c.exception.returncode, 47) 167 168 def test_check_output(self): 169 # check_output() function with zero return code 170 output = subprocess.check_output( 171 [sys.executable, "-c", "print('BDFL')"]) 172 self.assertIn(b'BDFL', output) 173 174 with self.assertRaisesRegex(ValueError, 175 "stdout argument not allowed, it will be overridden"): 176 subprocess.check_output([], stdout=None) 177 178 with self.assertRaisesRegex(ValueError, 179 "check argument not allowed, it will be overridden"): 180 subprocess.check_output([], check=False) 181 182 def test_check_output_nonzero(self): 183 # check_call() function with non-zero return code 184 with self.assertRaises(subprocess.CalledProcessError) as c: 185 subprocess.check_output( 186 [sys.executable, "-c", "import sys; sys.exit(5)"]) 187 self.assertEqual(c.exception.returncode, 5) 188 189 def test_check_output_stderr(self): 190 # check_output() function stderr redirected to stdout 191 output = subprocess.check_output( 192 [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"], 193 stderr=subprocess.STDOUT) 194 self.assertIn(b'BDFL', output) 195 196 def test_check_output_stdin_arg(self): 197 # check_output() can be called with stdin set to a file 198 tf = tempfile.TemporaryFile() 199 self.addCleanup(tf.close) 200 tf.write(b'pear') 201 tf.seek(0) 202 output = subprocess.check_output( 203 [sys.executable, "-c", 204 "import sys; sys.stdout.write(sys.stdin.read().upper())"], 205 stdin=tf) 206 self.assertIn(b'PEAR', output) 207 208 def test_check_output_input_arg(self): 209 # check_output() can be called with input set to a string 210 output = subprocess.check_output( 211 [sys.executable, "-c", 212 "import sys; sys.stdout.write(sys.stdin.read().upper())"], 213 input=b'pear') 214 self.assertIn(b'PEAR', output) 215 216 def test_check_output_input_none(self): 217 """input=None has a legacy meaning of input='' on check_output.""" 218 output = subprocess.check_output( 219 [sys.executable, "-c", 220 "import sys; print('XX' if sys.stdin.read() else '')"], 221 input=None) 222 self.assertNotIn(b'XX', output) 223 224 def test_check_output_input_none_text(self): 225 output = subprocess.check_output( 226 [sys.executable, "-c", 227 "import sys; print('XX' if sys.stdin.read() else '')"], 228 input=None, text=True) 229 self.assertNotIn('XX', output) 230 231 def test_check_output_input_none_universal_newlines(self): 232 output = subprocess.check_output( 233 [sys.executable, "-c", 234 "import sys; print('XX' if sys.stdin.read() else '')"], 235 input=None, universal_newlines=True) 236 self.assertNotIn('XX', output) 237 238 def test_check_output_stdout_arg(self): 239 # check_output() refuses to accept 'stdout' argument 240 with self.assertRaises(ValueError) as c: 241 output = subprocess.check_output( 242 [sys.executable, "-c", "print('will not be run')"], 243 stdout=sys.stdout) 244 self.fail("Expected ValueError when stdout arg supplied.") 245 self.assertIn('stdout', c.exception.args[0]) 246 247 def test_check_output_stdin_with_input_arg(self): 248 # check_output() refuses to accept 'stdin' with 'input' 249 tf = tempfile.TemporaryFile() 250 self.addCleanup(tf.close) 251 tf.write(b'pear') 252 tf.seek(0) 253 with self.assertRaises(ValueError) as c: 254 output = subprocess.check_output( 255 [sys.executable, "-c", "print('will not be run')"], 256 stdin=tf, input=b'hare') 257 self.fail("Expected ValueError when stdin and input args supplied.") 258 self.assertIn('stdin', c.exception.args[0]) 259 self.assertIn('input', c.exception.args[0]) 260 261 def test_check_output_timeout(self): 262 # check_output() function with timeout arg 263 with self.assertRaises(subprocess.TimeoutExpired) as c: 264 output = subprocess.check_output( 265 [sys.executable, "-c", 266 "import sys, time\n" 267 "sys.stdout.write('BDFL')\n" 268 "sys.stdout.flush()\n" 269 "time.sleep(3600)"], 270 # Some heavily loaded buildbots (sparc Debian 3.x) require 271 # this much time to start and print. 272 timeout=3) 273 self.fail("Expected TimeoutExpired.") 274 self.assertEqual(c.exception.output, b'BDFL') 275 276 def test_call_kwargs(self): 277 # call() function with keyword args 278 newenv = os.environ.copy() 279 newenv["FRUIT"] = "banana" 280 rc = subprocess.call([sys.executable, "-c", 281 'import sys, os;' 282 'sys.exit(os.getenv("FRUIT")=="banana")'], 283 env=newenv) 284 self.assertEqual(rc, 1) 285 286 def test_invalid_args(self): 287 # Popen() called with invalid arguments should raise TypeError 288 # but Popen.__del__ should not complain (issue #12085) 289 with support.captured_stderr() as s: 290 self.assertRaises(TypeError, subprocess.Popen, invalid_arg_name=1) 291 argcount = subprocess.Popen.__init__.__code__.co_argcount 292 too_many_args = [0] * (argcount + 1) 293 self.assertRaises(TypeError, subprocess.Popen, *too_many_args) 294 self.assertEqual(s.getvalue(), '') 295 296 def test_stdin_none(self): 297 # .stdin is None when not redirected 298 p = subprocess.Popen([sys.executable, "-c", 'print("banana")'], 299 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 300 self.addCleanup(p.stdout.close) 301 self.addCleanup(p.stderr.close) 302 p.wait() 303 self.assertEqual(p.stdin, None) 304 305 def test_stdout_none(self): 306 # .stdout is None when not redirected, and the child's stdout will 307 # be inherited from the parent. In order to test this we run a 308 # subprocess in a subprocess: 309 # this_test 310 # \-- subprocess created by this test (parent) 311 # \-- subprocess created by the parent subprocess (child) 312 # The parent doesn't specify stdout, so the child will use the 313 # parent's stdout. This test checks that the message printed by the 314 # child goes to the parent stdout. The parent also checks that the 315 # child's stdout is None. See #11963. 316 code = ('import sys; from subprocess import Popen, PIPE;' 317 'p = Popen([sys.executable, "-c", "print(\'test_stdout_none\')"],' 318 ' stdin=PIPE, stderr=PIPE);' 319 'p.wait(); assert p.stdout is None;') 320 p = subprocess.Popen([sys.executable, "-c", code], 321 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 322 self.addCleanup(p.stdout.close) 323 self.addCleanup(p.stderr.close) 324 out, err = p.communicate() 325 self.assertEqual(p.returncode, 0, err) 326 self.assertEqual(out.rstrip(), b'test_stdout_none') 327 328 def test_stderr_none(self): 329 # .stderr is None when not redirected 330 p = subprocess.Popen([sys.executable, "-c", 'print("banana")'], 331 stdin=subprocess.PIPE, stdout=subprocess.PIPE) 332 self.addCleanup(p.stdout.close) 333 self.addCleanup(p.stdin.close) 334 p.wait() 335 self.assertEqual(p.stderr, None) 336 337 def _assert_python(self, pre_args, **kwargs): 338 # We include sys.exit() to prevent the test runner from hanging 339 # whenever python is found. 340 args = pre_args + ["import sys; sys.exit(47)"] 341 p = subprocess.Popen(args, **kwargs) 342 p.wait() 343 self.assertEqual(47, p.returncode) 344 345 def test_executable(self): 346 # Check that the executable argument works. 347 # 348 # On Unix (non-Mac and non-Windows), Python looks at args[0] to 349 # determine where its standard library is, so we need the directory 350 # of args[0] to be valid for the Popen() call to Python to succeed. 351 # See also issue #16170 and issue #7774. 352 doesnotexist = os.path.join(os.path.dirname(sys.executable), 353 "doesnotexist") 354 self._assert_python([doesnotexist, "-c"], executable=sys.executable) 355 356 def test_bytes_executable(self): 357 doesnotexist = os.path.join(os.path.dirname(sys.executable), 358 "doesnotexist") 359 self._assert_python([doesnotexist, "-c"], 360 executable=os.fsencode(sys.executable)) 361 362 def test_pathlike_executable(self): 363 doesnotexist = os.path.join(os.path.dirname(sys.executable), 364 "doesnotexist") 365 self._assert_python([doesnotexist, "-c"], 366 executable=FakePath(sys.executable)) 367 368 def test_executable_takes_precedence(self): 369 # Check that the executable argument takes precedence over args[0]. 370 # 371 # Verify first that the call succeeds without the executable arg. 372 pre_args = [sys.executable, "-c"] 373 self._assert_python(pre_args) 374 self.assertRaises(NONEXISTING_ERRORS, 375 self._assert_python, pre_args, 376 executable=NONEXISTING_CMD[0]) 377 378 @unittest.skipIf(mswindows, "executable argument replaces shell") 379 def test_executable_replaces_shell(self): 380 # Check that the executable argument replaces the default shell 381 # when shell=True. 382 self._assert_python([], executable=sys.executable, shell=True) 383 384 @unittest.skipIf(mswindows, "executable argument replaces shell") 385 def test_bytes_executable_replaces_shell(self): 386 self._assert_python([], executable=os.fsencode(sys.executable), 387 shell=True) 388 389 @unittest.skipIf(mswindows, "executable argument replaces shell") 390 def test_pathlike_executable_replaces_shell(self): 391 self._assert_python([], executable=FakePath(sys.executable), 392 shell=True) 393 394 # For use in the test_cwd* tests below. 395 def _normalize_cwd(self, cwd): 396 # Normalize an expected cwd (for Tru64 support). 397 # We can't use os.path.realpath since it doesn't expand Tru64 {memb} 398 # strings. See bug #1063571. 399 with os_helper.change_cwd(cwd): 400 return os.getcwd() 401 402 # For use in the test_cwd* tests below. 403 def _split_python_path(self): 404 # Return normalized (python_dir, python_base). 405 python_path = os.path.realpath(sys.executable) 406 return os.path.split(python_path) 407 408 # For use in the test_cwd* tests below. 409 def _assert_cwd(self, expected_cwd, python_arg, **kwargs): 410 # Invoke Python via Popen, and assert that (1) the call succeeds, 411 # and that (2) the current working directory of the child process 412 # matches *expected_cwd*. 413 p = subprocess.Popen([python_arg, "-c", 414 "import os, sys; " 415 "buf = sys.stdout.buffer; " 416 "buf.write(os.getcwd().encode()); " 417 "buf.flush(); " 418 "sys.exit(47)"], 419 stdout=subprocess.PIPE, 420 **kwargs) 421 self.addCleanup(p.stdout.close) 422 p.wait() 423 self.assertEqual(47, p.returncode) 424 normcase = os.path.normcase 425 self.assertEqual(normcase(expected_cwd), 426 normcase(p.stdout.read().decode())) 427 428 def test_cwd(self): 429 # Check that cwd changes the cwd for the child process. 430 temp_dir = tempfile.gettempdir() 431 temp_dir = self._normalize_cwd(temp_dir) 432 self._assert_cwd(temp_dir, sys.executable, cwd=temp_dir) 433 434 def test_cwd_with_bytes(self): 435 temp_dir = tempfile.gettempdir() 436 temp_dir = self._normalize_cwd(temp_dir) 437 self._assert_cwd(temp_dir, sys.executable, cwd=os.fsencode(temp_dir)) 438 439 def test_cwd_with_pathlike(self): 440 temp_dir = tempfile.gettempdir() 441 temp_dir = self._normalize_cwd(temp_dir) 442 self._assert_cwd(temp_dir, sys.executable, cwd=FakePath(temp_dir)) 443 444 @unittest.skipIf(mswindows, "pending resolution of issue #15533") 445 def test_cwd_with_relative_arg(self): 446 # Check that Popen looks for args[0] relative to cwd if args[0] 447 # is relative. 448 python_dir, python_base = self._split_python_path() 449 rel_python = os.path.join(os.curdir, python_base) 450 with os_helper.temp_cwd() as wrong_dir: 451 # Before calling with the correct cwd, confirm that the call fails 452 # without cwd and with the wrong cwd. 453 self.assertRaises(FileNotFoundError, subprocess.Popen, 454 [rel_python]) 455 self.assertRaises(FileNotFoundError, subprocess.Popen, 456 [rel_python], cwd=wrong_dir) 457 python_dir = self._normalize_cwd(python_dir) 458 self._assert_cwd(python_dir, rel_python, cwd=python_dir) 459 460 @unittest.skipIf(mswindows, "pending resolution of issue #15533") 461 def test_cwd_with_relative_executable(self): 462 # Check that Popen looks for executable relative to cwd if executable 463 # is relative (and that executable takes precedence over args[0]). 464 python_dir, python_base = self._split_python_path() 465 rel_python = os.path.join(os.curdir, python_base) 466 doesntexist = "somethingyoudonthave" 467 with os_helper.temp_cwd() as wrong_dir: 468 # Before calling with the correct cwd, confirm that the call fails 469 # without cwd and with the wrong cwd. 470 self.assertRaises(FileNotFoundError, subprocess.Popen, 471 [doesntexist], executable=rel_python) 472 self.assertRaises(FileNotFoundError, subprocess.Popen, 473 [doesntexist], executable=rel_python, 474 cwd=wrong_dir) 475 python_dir = self._normalize_cwd(python_dir) 476 self._assert_cwd(python_dir, doesntexist, executable=rel_python, 477 cwd=python_dir) 478 479 def test_cwd_with_absolute_arg(self): 480 # Check that Popen can find the executable when the cwd is wrong 481 # if args[0] is an absolute path. 482 python_dir, python_base = self._split_python_path() 483 abs_python = os.path.join(python_dir, python_base) 484 rel_python = os.path.join(os.curdir, python_base) 485 with os_helper.temp_dir() as wrong_dir: 486 # Before calling with an absolute path, confirm that using a 487 # relative path fails. 488 self.assertRaises(FileNotFoundError, subprocess.Popen, 489 [rel_python], cwd=wrong_dir) 490 wrong_dir = self._normalize_cwd(wrong_dir) 491 self._assert_cwd(wrong_dir, abs_python, cwd=wrong_dir) 492 493 @unittest.skipIf(sys.base_prefix != sys.prefix, 494 'Test is not venv-compatible') 495 def test_executable_with_cwd(self): 496 python_dir, python_base = self._split_python_path() 497 python_dir = self._normalize_cwd(python_dir) 498 self._assert_cwd(python_dir, "somethingyoudonthave", 499 executable=sys.executable, cwd=python_dir) 500 501 @unittest.skipIf(sys.base_prefix != sys.prefix, 502 'Test is not venv-compatible') 503 @unittest.skipIf(sysconfig.is_python_build(), 504 "need an installed Python. See #7774") 505 def test_executable_without_cwd(self): 506 # For a normal installation, it should work without 'cwd' 507 # argument. For test runs in the build directory, see #7774. 508 self._assert_cwd(os.getcwd(), "somethingyoudonthave", 509 executable=sys.executable) 510 511 def test_stdin_pipe(self): 512 # stdin redirection 513 p = subprocess.Popen([sys.executable, "-c", 514 'import sys; sys.exit(sys.stdin.read() == "pear")'], 515 stdin=subprocess.PIPE) 516 p.stdin.write(b"pear") 517 p.stdin.close() 518 p.wait() 519 self.assertEqual(p.returncode, 1) 520 521 def test_stdin_filedes(self): 522 # stdin is set to open file descriptor 523 tf = tempfile.TemporaryFile() 524 self.addCleanup(tf.close) 525 d = tf.fileno() 526 os.write(d, b"pear") 527 os.lseek(d, 0, 0) 528 p = subprocess.Popen([sys.executable, "-c", 529 'import sys; sys.exit(sys.stdin.read() == "pear")'], 530 stdin=d) 531 p.wait() 532 self.assertEqual(p.returncode, 1) 533 534 def test_stdin_fileobj(self): 535 # stdin is set to open file object 536 tf = tempfile.TemporaryFile() 537 self.addCleanup(tf.close) 538 tf.write(b"pear") 539 tf.seek(0) 540 p = subprocess.Popen([sys.executable, "-c", 541 'import sys; sys.exit(sys.stdin.read() == "pear")'], 542 stdin=tf) 543 p.wait() 544 self.assertEqual(p.returncode, 1) 545 546 def test_stdout_pipe(self): 547 # stdout redirection 548 p = subprocess.Popen([sys.executable, "-c", 549 'import sys; sys.stdout.write("orange")'], 550 stdout=subprocess.PIPE) 551 with p: 552 self.assertEqual(p.stdout.read(), b"orange") 553 554 def test_stdout_filedes(self): 555 # stdout is set to open file descriptor 556 tf = tempfile.TemporaryFile() 557 self.addCleanup(tf.close) 558 d = tf.fileno() 559 p = subprocess.Popen([sys.executable, "-c", 560 'import sys; sys.stdout.write("orange")'], 561 stdout=d) 562 p.wait() 563 os.lseek(d, 0, 0) 564 self.assertEqual(os.read(d, 1024), b"orange") 565 566 def test_stdout_fileobj(self): 567 # stdout is set to open file object 568 tf = tempfile.TemporaryFile() 569 self.addCleanup(tf.close) 570 p = subprocess.Popen([sys.executable, "-c", 571 'import sys; sys.stdout.write("orange")'], 572 stdout=tf) 573 p.wait() 574 tf.seek(0) 575 self.assertEqual(tf.read(), b"orange") 576 577 def test_stderr_pipe(self): 578 # stderr redirection 579 p = subprocess.Popen([sys.executable, "-c", 580 'import sys; sys.stderr.write("strawberry")'], 581 stderr=subprocess.PIPE) 582 with p: 583 self.assertEqual(p.stderr.read(), b"strawberry") 584 585 def test_stderr_filedes(self): 586 # stderr is set to open file descriptor 587 tf = tempfile.TemporaryFile() 588 self.addCleanup(tf.close) 589 d = tf.fileno() 590 p = subprocess.Popen([sys.executable, "-c", 591 'import sys; sys.stderr.write("strawberry")'], 592 stderr=d) 593 p.wait() 594 os.lseek(d, 0, 0) 595 self.assertEqual(os.read(d, 1024), b"strawberry") 596 597 def test_stderr_fileobj(self): 598 # stderr is set to open file object 599 tf = tempfile.TemporaryFile() 600 self.addCleanup(tf.close) 601 p = subprocess.Popen([sys.executable, "-c", 602 'import sys; sys.stderr.write("strawberry")'], 603 stderr=tf) 604 p.wait() 605 tf.seek(0) 606 self.assertEqual(tf.read(), b"strawberry") 607 608 def test_stderr_redirect_with_no_stdout_redirect(self): 609 # test stderr=STDOUT while stdout=None (not set) 610 611 # - grandchild prints to stderr 612 # - child redirects grandchild's stderr to its stdout 613 # - the parent should get grandchild's stderr in child's stdout 614 p = subprocess.Popen([sys.executable, "-c", 615 'import sys, subprocess;' 616 'rc = subprocess.call([sys.executable, "-c",' 617 ' "import sys;"' 618 ' "sys.stderr.write(\'42\')"],' 619 ' stderr=subprocess.STDOUT);' 620 'sys.exit(rc)'], 621 stdout=subprocess.PIPE, 622 stderr=subprocess.PIPE) 623 stdout, stderr = p.communicate() 624 #NOTE: stdout should get stderr from grandchild 625 self.assertEqual(stdout, b'42') 626 self.assertEqual(stderr, b'') # should be empty 627 self.assertEqual(p.returncode, 0) 628 629 def test_stdout_stderr_pipe(self): 630 # capture stdout and stderr to the same pipe 631 p = subprocess.Popen([sys.executable, "-c", 632 'import sys;' 633 'sys.stdout.write("apple");' 634 'sys.stdout.flush();' 635 'sys.stderr.write("orange")'], 636 stdout=subprocess.PIPE, 637 stderr=subprocess.STDOUT) 638 with p: 639 self.assertEqual(p.stdout.read(), b"appleorange") 640 641 def test_stdout_stderr_file(self): 642 # capture stdout and stderr to the same open file 643 tf = tempfile.TemporaryFile() 644 self.addCleanup(tf.close) 645 p = subprocess.Popen([sys.executable, "-c", 646 'import sys;' 647 'sys.stdout.write("apple");' 648 'sys.stdout.flush();' 649 'sys.stderr.write("orange")'], 650 stdout=tf, 651 stderr=tf) 652 p.wait() 653 tf.seek(0) 654 self.assertEqual(tf.read(), b"appleorange") 655 656 def test_stdout_filedes_of_stdout(self): 657 # stdout is set to 1 (#1531862). 658 # To avoid printing the text on stdout, we do something similar to 659 # test_stdout_none (see above). The parent subprocess calls the child 660 # subprocess passing stdout=1, and this test uses stdout=PIPE in 661 # order to capture and check the output of the parent. See #11963. 662 code = ('import sys, subprocess; ' 663 'rc = subprocess.call([sys.executable, "-c", ' 664 ' "import os, sys; sys.exit(os.write(sys.stdout.fileno(), ' 665 'b\'test with stdout=1\'))"], stdout=1); ' 666 'assert rc == 18') 667 p = subprocess.Popen([sys.executable, "-c", code], 668 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 669 self.addCleanup(p.stdout.close) 670 self.addCleanup(p.stderr.close) 671 out, err = p.communicate() 672 self.assertEqual(p.returncode, 0, err) 673 self.assertEqual(out.rstrip(), b'test with stdout=1') 674 675 def test_stdout_devnull(self): 676 p = subprocess.Popen([sys.executable, "-c", 677 'for i in range(10240):' 678 'print("x" * 1024)'], 679 stdout=subprocess.DEVNULL) 680 p.wait() 681 self.assertEqual(p.stdout, None) 682 683 def test_stderr_devnull(self): 684 p = subprocess.Popen([sys.executable, "-c", 685 'import sys\n' 686 'for i in range(10240):' 687 'sys.stderr.write("x" * 1024)'], 688 stderr=subprocess.DEVNULL) 689 p.wait() 690 self.assertEqual(p.stderr, None) 691 692 def test_stdin_devnull(self): 693 p = subprocess.Popen([sys.executable, "-c", 694 'import sys;' 695 'sys.stdin.read(1)'], 696 stdin=subprocess.DEVNULL) 697 p.wait() 698 self.assertEqual(p.stdin, None) 699 700 @unittest.skipUnless(fcntl and hasattr(fcntl, 'F_GETPIPE_SZ'), 701 'fcntl.F_GETPIPE_SZ required for test.') 702 def test_pipesizes(self): 703 test_pipe_r, test_pipe_w = os.pipe() 704 try: 705 # Get the default pipesize with F_GETPIPE_SZ 706 pipesize_default = fcntl.fcntl(test_pipe_w, fcntl.F_GETPIPE_SZ) 707 finally: 708 os.close(test_pipe_r) 709 os.close(test_pipe_w) 710 pipesize = pipesize_default // 2 711 if pipesize < 512: # the POSIX minimum 712 raise unittest.SkitTest( 713 'default pipesize too small to perform test.') 714 p = subprocess.Popen( 715 [sys.executable, "-c", 716 'import sys; sys.stdin.read(); sys.stdout.write("out"); ' 717 'sys.stderr.write("error!")'], 718 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 719 stderr=subprocess.PIPE, pipesize=pipesize) 720 try: 721 for fifo in [p.stdin, p.stdout, p.stderr]: 722 self.assertEqual( 723 fcntl.fcntl(fifo.fileno(), fcntl.F_GETPIPE_SZ), 724 pipesize) 725 # Windows pipe size can be acquired via GetNamedPipeInfoFunction 726 # https://docs.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-getnamedpipeinfo 727 # However, this function is not yet in _winapi. 728 p.stdin.write(b"pear") 729 p.stdin.close() 730 p.stdout.close() 731 p.stderr.close() 732 finally: 733 p.kill() 734 p.wait() 735 736 @unittest.skipUnless(fcntl and hasattr(fcntl, 'F_GETPIPE_SZ'), 737 'fcntl.F_GETPIPE_SZ required for test.') 738 def test_pipesize_default(self): 739 p = subprocess.Popen( 740 [sys.executable, "-c", 741 'import sys; sys.stdin.read(); sys.stdout.write("out"); ' 742 'sys.stderr.write("error!")'], 743 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 744 stderr=subprocess.PIPE, pipesize=-1) 745 try: 746 fp_r, fp_w = os.pipe() 747 try: 748 default_pipesize = fcntl.fcntl(fp_w, fcntl.F_GETPIPE_SZ) 749 for fifo in [p.stdin, p.stdout, p.stderr]: 750 self.assertEqual( 751 fcntl.fcntl(fifo.fileno(), fcntl.F_GETPIPE_SZ), 752 default_pipesize) 753 finally: 754 os.close(fp_r) 755 os.close(fp_w) 756 # On other platforms we cannot test the pipe size (yet). But above 757 # code using pipesize=-1 should not crash. 758 p.stdin.close() 759 p.stdout.close() 760 p.stderr.close() 761 finally: 762 p.kill() 763 p.wait() 764 765 def test_env(self): 766 newenv = os.environ.copy() 767 newenv["FRUIT"] = "orange" 768 with subprocess.Popen([sys.executable, "-c", 769 'import sys,os;' 770 'sys.stdout.write(os.getenv("FRUIT"))'], 771 stdout=subprocess.PIPE, 772 env=newenv) as p: 773 stdout, stderr = p.communicate() 774 self.assertEqual(stdout, b"orange") 775 776 # Windows requires at least the SYSTEMROOT environment variable to start 777 # Python 778 @unittest.skipIf(sys.platform == 'win32', 779 'cannot test an empty env on Windows') 780 @unittest.skipIf(sysconfig.get_config_var('Py_ENABLE_SHARED') == 1, 781 'The Python shared library cannot be loaded ' 782 'with an empty environment.') 783 def test_empty_env(self): 784 """Verify that env={} is as empty as possible.""" 785 786 def is_env_var_to_ignore(n): 787 """Determine if an environment variable is under our control.""" 788 # This excludes some __CF_* and VERSIONER_* keys MacOS insists 789 # on adding even when the environment in exec is empty. 790 # Gentoo sandboxes also force LD_PRELOAD and SANDBOX_* to exist. 791 return ('VERSIONER' in n or '__CF' in n or # MacOS 792 n == 'LD_PRELOAD' or n.startswith('SANDBOX') or # Gentoo 793 n == 'LC_CTYPE') # Locale coercion triggered 794 795 with subprocess.Popen([sys.executable, "-c", 796 'import os; print(list(os.environ.keys()))'], 797 stdout=subprocess.PIPE, env={}) as p: 798 stdout, stderr = p.communicate() 799 child_env_names = eval(stdout.strip()) 800 self.assertIsInstance(child_env_names, list) 801 child_env_names = [k for k in child_env_names 802 if not is_env_var_to_ignore(k)] 803 self.assertEqual(child_env_names, []) 804 805 def test_invalid_cmd(self): 806 # null character in the command name 807 cmd = sys.executable + '\0' 808 with self.assertRaises(ValueError): 809 subprocess.Popen([cmd, "-c", "pass"]) 810 811 # null character in the command argument 812 with self.assertRaises(ValueError): 813 subprocess.Popen([sys.executable, "-c", "pass#\0"]) 814 815 def test_invalid_env(self): 816 # null character in the environment variable name 817 newenv = os.environ.copy() 818 newenv["FRUIT\0VEGETABLE"] = "cabbage" 819 with self.assertRaises(ValueError): 820 subprocess.Popen(ZERO_RETURN_CMD, env=newenv) 821 822 # null character in the environment variable value 823 newenv = os.environ.copy() 824 newenv["FRUIT"] = "orange\0VEGETABLE=cabbage" 825 with self.assertRaises(ValueError): 826 subprocess.Popen(ZERO_RETURN_CMD, env=newenv) 827 828 # equal character in the environment variable name 829 newenv = os.environ.copy() 830 newenv["FRUIT=ORANGE"] = "lemon" 831 with self.assertRaises(ValueError): 832 subprocess.Popen(ZERO_RETURN_CMD, env=newenv) 833 834 # equal character in the environment variable value 835 newenv = os.environ.copy() 836 newenv["FRUIT"] = "orange=lemon" 837 with subprocess.Popen([sys.executable, "-c", 838 'import sys, os;' 839 'sys.stdout.write(os.getenv("FRUIT"))'], 840 stdout=subprocess.PIPE, 841 env=newenv) as p: 842 stdout, stderr = p.communicate() 843 self.assertEqual(stdout, b"orange=lemon") 844 845 def test_communicate_stdin(self): 846 p = subprocess.Popen([sys.executable, "-c", 847 'import sys;' 848 'sys.exit(sys.stdin.read() == "pear")'], 849 stdin=subprocess.PIPE) 850 p.communicate(b"pear") 851 self.assertEqual(p.returncode, 1) 852 853 def test_communicate_stdout(self): 854 p = subprocess.Popen([sys.executable, "-c", 855 'import sys; sys.stdout.write("pineapple")'], 856 stdout=subprocess.PIPE) 857 (stdout, stderr) = p.communicate() 858 self.assertEqual(stdout, b"pineapple") 859 self.assertEqual(stderr, None) 860 861 def test_communicate_stderr(self): 862 p = subprocess.Popen([sys.executable, "-c", 863 'import sys; sys.stderr.write("pineapple")'], 864 stderr=subprocess.PIPE) 865 (stdout, stderr) = p.communicate() 866 self.assertEqual(stdout, None) 867 self.assertEqual(stderr, b"pineapple") 868 869 def test_communicate(self): 870 p = subprocess.Popen([sys.executable, "-c", 871 'import sys,os;' 872 'sys.stderr.write("pineapple");' 873 'sys.stdout.write(sys.stdin.read())'], 874 stdin=subprocess.PIPE, 875 stdout=subprocess.PIPE, 876 stderr=subprocess.PIPE) 877 self.addCleanup(p.stdout.close) 878 self.addCleanup(p.stderr.close) 879 self.addCleanup(p.stdin.close) 880 (stdout, stderr) = p.communicate(b"banana") 881 self.assertEqual(stdout, b"banana") 882 self.assertEqual(stderr, b"pineapple") 883 884 def test_communicate_timeout(self): 885 p = subprocess.Popen([sys.executable, "-c", 886 'import sys,os,time;' 887 'sys.stderr.write("pineapple\\n");' 888 'time.sleep(1);' 889 'sys.stderr.write("pear\\n");' 890 'sys.stdout.write(sys.stdin.read())'], 891 universal_newlines=True, 892 stdin=subprocess.PIPE, 893 stdout=subprocess.PIPE, 894 stderr=subprocess.PIPE) 895 self.assertRaises(subprocess.TimeoutExpired, p.communicate, "banana", 896 timeout=0.3) 897 # Make sure we can keep waiting for it, and that we get the whole output 898 # after it completes. 899 (stdout, stderr) = p.communicate() 900 self.assertEqual(stdout, "banana") 901 self.assertEqual(stderr.encode(), b"pineapple\npear\n") 902 903 def test_communicate_timeout_large_output(self): 904 # Test an expiring timeout while the child is outputting lots of data. 905 p = subprocess.Popen([sys.executable, "-c", 906 'import sys,os,time;' 907 'sys.stdout.write("a" * (64 * 1024));' 908 'time.sleep(0.2);' 909 'sys.stdout.write("a" * (64 * 1024));' 910 'time.sleep(0.2);' 911 'sys.stdout.write("a" * (64 * 1024));' 912 'time.sleep(0.2);' 913 'sys.stdout.write("a" * (64 * 1024));'], 914 stdout=subprocess.PIPE) 915 self.assertRaises(subprocess.TimeoutExpired, p.communicate, timeout=0.4) 916 (stdout, _) = p.communicate() 917 self.assertEqual(len(stdout), 4 * 64 * 1024) 918 919 # Test for the fd leak reported in http://bugs.python.org/issue2791. 920 def test_communicate_pipe_fd_leak(self): 921 for stdin_pipe in (False, True): 922 for stdout_pipe in (False, True): 923 for stderr_pipe in (False, True): 924 options = {} 925 if stdin_pipe: 926 options['stdin'] = subprocess.PIPE 927 if stdout_pipe: 928 options['stdout'] = subprocess.PIPE 929 if stderr_pipe: 930 options['stderr'] = subprocess.PIPE 931 if not options: 932 continue 933 p = subprocess.Popen(ZERO_RETURN_CMD, **options) 934 p.communicate() 935 if p.stdin is not None: 936 self.assertTrue(p.stdin.closed) 937 if p.stdout is not None: 938 self.assertTrue(p.stdout.closed) 939 if p.stderr is not None: 940 self.assertTrue(p.stderr.closed) 941 942 def test_communicate_returns(self): 943 # communicate() should return None if no redirection is active 944 p = subprocess.Popen([sys.executable, "-c", 945 "import sys; sys.exit(47)"]) 946 (stdout, stderr) = p.communicate() 947 self.assertEqual(stdout, None) 948 self.assertEqual(stderr, None) 949 950 def test_communicate_pipe_buf(self): 951 # communicate() with writes larger than pipe_buf 952 # This test will probably deadlock rather than fail, if 953 # communicate() does not work properly. 954 x, y = os.pipe() 955 os.close(x) 956 os.close(y) 957 p = subprocess.Popen([sys.executable, "-c", 958 'import sys,os;' 959 'sys.stdout.write(sys.stdin.read(47));' 960 'sys.stderr.write("x" * %d);' 961 'sys.stdout.write(sys.stdin.read())' % 962 support.PIPE_MAX_SIZE], 963 stdin=subprocess.PIPE, 964 stdout=subprocess.PIPE, 965 stderr=subprocess.PIPE) 966 self.addCleanup(p.stdout.close) 967 self.addCleanup(p.stderr.close) 968 self.addCleanup(p.stdin.close) 969 string_to_write = b"a" * support.PIPE_MAX_SIZE 970 (stdout, stderr) = p.communicate(string_to_write) 971 self.assertEqual(stdout, string_to_write) 972 973 def test_writes_before_communicate(self): 974 # stdin.write before communicate() 975 p = subprocess.Popen([sys.executable, "-c", 976 'import sys,os;' 977 'sys.stdout.write(sys.stdin.read())'], 978 stdin=subprocess.PIPE, 979 stdout=subprocess.PIPE, 980 stderr=subprocess.PIPE) 981 self.addCleanup(p.stdout.close) 982 self.addCleanup(p.stderr.close) 983 self.addCleanup(p.stdin.close) 984 p.stdin.write(b"banana") 985 (stdout, stderr) = p.communicate(b"split") 986 self.assertEqual(stdout, b"bananasplit") 987 self.assertEqual(stderr, b"") 988 989 def test_universal_newlines_and_text(self): 990 args = [ 991 sys.executable, "-c", 992 'import sys,os;' + SETBINARY + 993 'buf = sys.stdout.buffer;' 994 'buf.write(sys.stdin.readline().encode());' 995 'buf.flush();' 996 'buf.write(b"line2\\n");' 997 'buf.flush();' 998 'buf.write(sys.stdin.read().encode());' 999 'buf.flush();' 1000 'buf.write(b"line4\\n");' 1001 'buf.flush();' 1002 'buf.write(b"line5\\r\\n");' 1003 'buf.flush();' 1004 'buf.write(b"line6\\r");' 1005 'buf.flush();' 1006 'buf.write(b"\\nline7");' 1007 'buf.flush();' 1008 'buf.write(b"\\nline8");'] 1009 1010 for extra_kwarg in ('universal_newlines', 'text'): 1011 p = subprocess.Popen(args, **{'stdin': subprocess.PIPE, 1012 'stdout': subprocess.PIPE, 1013 extra_kwarg: True}) 1014 with p: 1015 p.stdin.write("line1\n") 1016 p.stdin.flush() 1017 self.assertEqual(p.stdout.readline(), "line1\n") 1018 p.stdin.write("line3\n") 1019 p.stdin.close() 1020 self.addCleanup(p.stdout.close) 1021 self.assertEqual(p.stdout.readline(), 1022 "line2\n") 1023 self.assertEqual(p.stdout.read(6), 1024 "line3\n") 1025 self.assertEqual(p.stdout.read(), 1026 "line4\nline5\nline6\nline7\nline8") 1027 1028 def test_universal_newlines_communicate(self): 1029 # universal newlines through communicate() 1030 p = subprocess.Popen([sys.executable, "-c", 1031 'import sys,os;' + SETBINARY + 1032 'buf = sys.stdout.buffer;' 1033 'buf.write(b"line2\\n");' 1034 'buf.flush();' 1035 'buf.write(b"line4\\n");' 1036 'buf.flush();' 1037 'buf.write(b"line5\\r\\n");' 1038 'buf.flush();' 1039 'buf.write(b"line6\\r");' 1040 'buf.flush();' 1041 'buf.write(b"\\nline7");' 1042 'buf.flush();' 1043 'buf.write(b"\\nline8");'], 1044 stderr=subprocess.PIPE, 1045 stdout=subprocess.PIPE, 1046 universal_newlines=1) 1047 self.addCleanup(p.stdout.close) 1048 self.addCleanup(p.stderr.close) 1049 (stdout, stderr) = p.communicate() 1050 self.assertEqual(stdout, 1051 "line2\nline4\nline5\nline6\nline7\nline8") 1052 1053 def test_universal_newlines_communicate_stdin(self): 1054 # universal newlines through communicate(), with only stdin 1055 p = subprocess.Popen([sys.executable, "-c", 1056 'import sys,os;' + SETBINARY + textwrap.dedent(''' 1057 s = sys.stdin.readline() 1058 assert s == "line1\\n", repr(s) 1059 s = sys.stdin.read() 1060 assert s == "line3\\n", repr(s) 1061 ''')], 1062 stdin=subprocess.PIPE, 1063 universal_newlines=1) 1064 (stdout, stderr) = p.communicate("line1\nline3\n") 1065 self.assertEqual(p.returncode, 0) 1066 1067 def test_universal_newlines_communicate_input_none(self): 1068 # Test communicate(input=None) with universal newlines. 1069 # 1070 # We set stdout to PIPE because, as of this writing, a different 1071 # code path is tested when the number of pipes is zero or one. 1072 p = subprocess.Popen(ZERO_RETURN_CMD, 1073 stdin=subprocess.PIPE, 1074 stdout=subprocess.PIPE, 1075 universal_newlines=True) 1076 p.communicate() 1077 self.assertEqual(p.returncode, 0) 1078 1079 def test_universal_newlines_communicate_stdin_stdout_stderr(self): 1080 # universal newlines through communicate(), with stdin, stdout, stderr 1081 p = subprocess.Popen([sys.executable, "-c", 1082 'import sys,os;' + SETBINARY + textwrap.dedent(''' 1083 s = sys.stdin.buffer.readline() 1084 sys.stdout.buffer.write(s) 1085 sys.stdout.buffer.write(b"line2\\r") 1086 sys.stderr.buffer.write(b"eline2\\n") 1087 s = sys.stdin.buffer.read() 1088 sys.stdout.buffer.write(s) 1089 sys.stdout.buffer.write(b"line4\\n") 1090 sys.stdout.buffer.write(b"line5\\r\\n") 1091 sys.stderr.buffer.write(b"eline6\\r") 1092 sys.stderr.buffer.write(b"eline7\\r\\nz") 1093 ''')], 1094 stdin=subprocess.PIPE, 1095 stderr=subprocess.PIPE, 1096 stdout=subprocess.PIPE, 1097 universal_newlines=True) 1098 self.addCleanup(p.stdout.close) 1099 self.addCleanup(p.stderr.close) 1100 (stdout, stderr) = p.communicate("line1\nline3\n") 1101 self.assertEqual(p.returncode, 0) 1102 self.assertEqual("line1\nline2\nline3\nline4\nline5\n", stdout) 1103 # Python debug build push something like "[42442 refs]\n" 1104 # to stderr at exit of subprocess. 1105 self.assertTrue(stderr.startswith("eline2\neline6\neline7\n")) 1106 1107 def test_universal_newlines_communicate_encodings(self): 1108 # Check that universal newlines mode works for various encodings, 1109 # in particular for encodings in the UTF-16 and UTF-32 families. 1110 # See issue #15595. 1111 # 1112 # UTF-16 and UTF-32-BE are sufficient to check both with BOM and 1113 # without, and UTF-16 and UTF-32. 1114 for encoding in ['utf-16', 'utf-32-be']: 1115 code = ("import sys; " 1116 r"sys.stdout.buffer.write('1\r\n2\r3\n4'.encode('%s'))" % 1117 encoding) 1118 args = [sys.executable, '-c', code] 1119 # We set stdin to be non-None because, as of this writing, 1120 # a different code path is used when the number of pipes is 1121 # zero or one. 1122 popen = subprocess.Popen(args, 1123 stdin=subprocess.PIPE, 1124 stdout=subprocess.PIPE, 1125 encoding=encoding) 1126 stdout, stderr = popen.communicate(input='') 1127 self.assertEqual(stdout, '1\n2\n3\n4') 1128 1129 def test_communicate_errors(self): 1130 for errors, expected in [ 1131 ('ignore', ''), 1132 ('replace', '\ufffd\ufffd'), 1133 ('surrogateescape', '\udc80\udc80'), 1134 ('backslashreplace', '\\x80\\x80'), 1135 ]: 1136 code = ("import sys; " 1137 r"sys.stdout.buffer.write(b'[\x80\x80]')") 1138 args = [sys.executable, '-c', code] 1139 # We set stdin to be non-None because, as of this writing, 1140 # a different code path is used when the number of pipes is 1141 # zero or one. 1142 popen = subprocess.Popen(args, 1143 stdin=subprocess.PIPE, 1144 stdout=subprocess.PIPE, 1145 encoding='utf-8', 1146 errors=errors) 1147 stdout, stderr = popen.communicate(input='') 1148 self.assertEqual(stdout, '[{}]'.format(expected)) 1149 1150 def test_no_leaking(self): 1151 # Make sure we leak no resources 1152 if not mswindows: 1153 max_handles = 1026 # too much for most UNIX systems 1154 else: 1155 max_handles = 2050 # too much for (at least some) Windows setups 1156 handles = [] 1157 tmpdir = tempfile.mkdtemp() 1158 try: 1159 for i in range(max_handles): 1160 try: 1161 tmpfile = os.path.join(tmpdir, os_helper.TESTFN) 1162 handles.append(os.open(tmpfile, os.O_WRONLY|os.O_CREAT)) 1163 except OSError as e: 1164 if e.errno != errno.EMFILE: 1165 raise 1166 break 1167 else: 1168 self.skipTest("failed to reach the file descriptor limit " 1169 "(tried %d)" % max_handles) 1170 # Close a couple of them (should be enough for a subprocess) 1171 for i in range(10): 1172 os.close(handles.pop()) 1173 # Loop creating some subprocesses. If one of them leaks some fds, 1174 # the next loop iteration will fail by reaching the max fd limit. 1175 for i in range(15): 1176 p = subprocess.Popen([sys.executable, "-c", 1177 "import sys;" 1178 "sys.stdout.write(sys.stdin.read())"], 1179 stdin=subprocess.PIPE, 1180 stdout=subprocess.PIPE, 1181 stderr=subprocess.PIPE) 1182 data = p.communicate(b"lime")[0] 1183 self.assertEqual(data, b"lime") 1184 finally: 1185 for h in handles: 1186 os.close(h) 1187 shutil.rmtree(tmpdir) 1188 1189 def test_list2cmdline(self): 1190 self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']), 1191 '"a b c" d e') 1192 self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']), 1193 'ab\\"c \\ d') 1194 self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']), 1195 'ab\\"c " \\\\" d') 1196 self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']), 1197 'a\\\\\\b "de fg" h') 1198 self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']), 1199 'a\\\\\\"b c d') 1200 self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']), 1201 '"a\\\\b c" d e') 1202 self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']), 1203 '"a\\\\b\\ c" d e') 1204 self.assertEqual(subprocess.list2cmdline(['ab', '']), 1205 'ab ""') 1206 1207 def test_poll(self): 1208 p = subprocess.Popen([sys.executable, "-c", 1209 "import os; os.read(0, 1)"], 1210 stdin=subprocess.PIPE) 1211 self.addCleanup(p.stdin.close) 1212 self.assertIsNone(p.poll()) 1213 os.write(p.stdin.fileno(), b'A') 1214 p.wait() 1215 # Subsequent invocations should just return the returncode 1216 self.assertEqual(p.poll(), 0) 1217 1218 def test_wait(self): 1219 p = subprocess.Popen(ZERO_RETURN_CMD) 1220 self.assertEqual(p.wait(), 0) 1221 # Subsequent invocations should just return the returncode 1222 self.assertEqual(p.wait(), 0) 1223 1224 def test_wait_timeout(self): 1225 p = subprocess.Popen([sys.executable, 1226 "-c", "import time; time.sleep(0.3)"]) 1227 with self.assertRaises(subprocess.TimeoutExpired) as c: 1228 p.wait(timeout=0.0001) 1229 self.assertIn("0.0001", str(c.exception)) # For coverage of __str__. 1230 self.assertEqual(p.wait(timeout=support.SHORT_TIMEOUT), 0) 1231 1232 def test_invalid_bufsize(self): 1233 # an invalid type of the bufsize argument should raise 1234 # TypeError. 1235 with self.assertRaises(TypeError): 1236 subprocess.Popen(ZERO_RETURN_CMD, "orange") 1237 1238 def test_bufsize_is_none(self): 1239 # bufsize=None should be the same as bufsize=0. 1240 p = subprocess.Popen(ZERO_RETURN_CMD, None) 1241 self.assertEqual(p.wait(), 0) 1242 # Again with keyword arg 1243 p = subprocess.Popen(ZERO_RETURN_CMD, bufsize=None) 1244 self.assertEqual(p.wait(), 0) 1245 1246 def _test_bufsize_equal_one(self, line, expected, universal_newlines): 1247 # subprocess may deadlock with bufsize=1, see issue #21332 1248 with subprocess.Popen([sys.executable, "-c", "import sys;" 1249 "sys.stdout.write(sys.stdin.readline());" 1250 "sys.stdout.flush()"], 1251 stdin=subprocess.PIPE, 1252 stdout=subprocess.PIPE, 1253 stderr=subprocess.DEVNULL, 1254 bufsize=1, 1255 universal_newlines=universal_newlines) as p: 1256 p.stdin.write(line) # expect that it flushes the line in text mode 1257 os.close(p.stdin.fileno()) # close it without flushing the buffer 1258 read_line = p.stdout.readline() 1259 with support.SuppressCrashReport(): 1260 try: 1261 p.stdin.close() 1262 except OSError: 1263 pass 1264 p.stdin = None 1265 self.assertEqual(p.returncode, 0) 1266 self.assertEqual(read_line, expected) 1267 1268 def test_bufsize_equal_one_text_mode(self): 1269 # line is flushed in text mode with bufsize=1. 1270 # we should get the full line in return 1271 line = "line\n" 1272 self._test_bufsize_equal_one(line, line, universal_newlines=True) 1273 1274 def test_bufsize_equal_one_binary_mode(self): 1275 # line is not flushed in binary mode with bufsize=1. 1276 # we should get empty response 1277 line = b'line' + os.linesep.encode() # assume ascii-based locale 1278 with self.assertWarnsRegex(RuntimeWarning, 'line buffering'): 1279 self._test_bufsize_equal_one(line, b'', universal_newlines=False) 1280 1281 def test_leaking_fds_on_error(self): 1282 # see bug #5179: Popen leaks file descriptors to PIPEs if 1283 # the child fails to execute; this will eventually exhaust 1284 # the maximum number of open fds. 1024 seems a very common 1285 # value for that limit, but Windows has 2048, so we loop 1286 # 1024 times (each call leaked two fds). 1287 for i in range(1024): 1288 with self.assertRaises(NONEXISTING_ERRORS): 1289 subprocess.Popen(NONEXISTING_CMD, 1290 stdout=subprocess.PIPE, 1291 stderr=subprocess.PIPE) 1292 1293 def test_nonexisting_with_pipes(self): 1294 # bpo-30121: Popen with pipes must close properly pipes on error. 1295 # Previously, os.close() was called with a Windows handle which is not 1296 # a valid file descriptor. 1297 # 1298 # Run the test in a subprocess to control how the CRT reports errors 1299 # and to get stderr content. 1300 try: 1301 import msvcrt 1302 msvcrt.CrtSetReportMode 1303 except (AttributeError, ImportError): 1304 self.skipTest("need msvcrt.CrtSetReportMode") 1305 1306 code = textwrap.dedent(f""" 1307 import msvcrt 1308 import subprocess 1309 1310 cmd = {NONEXISTING_CMD!r} 1311 1312 for report_type in [msvcrt.CRT_WARN, 1313 msvcrt.CRT_ERROR, 1314 msvcrt.CRT_ASSERT]: 1315 msvcrt.CrtSetReportMode(report_type, msvcrt.CRTDBG_MODE_FILE) 1316 msvcrt.CrtSetReportFile(report_type, msvcrt.CRTDBG_FILE_STDERR) 1317 1318 try: 1319 subprocess.Popen(cmd, 1320 stdout=subprocess.PIPE, 1321 stderr=subprocess.PIPE) 1322 except OSError: 1323 pass 1324 """) 1325 cmd = [sys.executable, "-c", code] 1326 proc = subprocess.Popen(cmd, 1327 stderr=subprocess.PIPE, 1328 universal_newlines=True) 1329 with proc: 1330 stderr = proc.communicate()[1] 1331 self.assertEqual(stderr, "") 1332 self.assertEqual(proc.returncode, 0) 1333 1334 def test_double_close_on_error(self): 1335 # Issue #18851 1336 fds = [] 1337 def open_fds(): 1338 for i in range(20): 1339 fds.extend(os.pipe()) 1340 time.sleep(0.001) 1341 t = threading.Thread(target=open_fds) 1342 t.start() 1343 try: 1344 with self.assertRaises(EnvironmentError): 1345 subprocess.Popen(NONEXISTING_CMD, 1346 stdin=subprocess.PIPE, 1347 stdout=subprocess.PIPE, 1348 stderr=subprocess.PIPE) 1349 finally: 1350 t.join() 1351 exc = None 1352 for fd in fds: 1353 # If a double close occurred, some of those fds will 1354 # already have been closed by mistake, and os.close() 1355 # here will raise. 1356 try: 1357 os.close(fd) 1358 except OSError as e: 1359 exc = e 1360 if exc is not None: 1361 raise exc 1362 1363 def test_threadsafe_wait(self): 1364 """Issue21291: Popen.wait() needs to be threadsafe for returncode.""" 1365 proc = subprocess.Popen([sys.executable, '-c', 1366 'import time; time.sleep(12)']) 1367 self.assertEqual(proc.returncode, None) 1368 results = [] 1369 1370 def kill_proc_timer_thread(): 1371 results.append(('thread-start-poll-result', proc.poll())) 1372 # terminate it from the thread and wait for the result. 1373 proc.kill() 1374 proc.wait() 1375 results.append(('thread-after-kill-and-wait', proc.returncode)) 1376 # this wait should be a no-op given the above. 1377 proc.wait() 1378 results.append(('thread-after-second-wait', proc.returncode)) 1379 1380 # This is a timing sensitive test, the failure mode is 1381 # triggered when both the main thread and this thread are in 1382 # the wait() call at once. The delay here is to allow the 1383 # main thread to most likely be blocked in its wait() call. 1384 t = threading.Timer(0.2, kill_proc_timer_thread) 1385 t.start() 1386 1387 if mswindows: 1388 expected_errorcode = 1 1389 else: 1390 # Should be -9 because of the proc.kill() from the thread. 1391 expected_errorcode = -9 1392 1393 # Wait for the process to finish; the thread should kill it 1394 # long before it finishes on its own. Supplying a timeout 1395 # triggers a different code path for better coverage. 1396 proc.wait(timeout=support.SHORT_TIMEOUT) 1397 self.assertEqual(proc.returncode, expected_errorcode, 1398 msg="unexpected result in wait from main thread") 1399 1400 # This should be a no-op with no change in returncode. 1401 proc.wait() 1402 self.assertEqual(proc.returncode, expected_errorcode, 1403 msg="unexpected result in second main wait.") 1404 1405 t.join() 1406 # Ensure that all of the thread results are as expected. 1407 # When a race condition occurs in wait(), the returncode could 1408 # be set by the wrong thread that doesn't actually have it 1409 # leading to an incorrect value. 1410 self.assertEqual([('thread-start-poll-result', None), 1411 ('thread-after-kill-and-wait', expected_errorcode), 1412 ('thread-after-second-wait', expected_errorcode)], 1413 results) 1414 1415 def test_issue8780(self): 1416 # Ensure that stdout is inherited from the parent 1417 # if stdout=PIPE is not used 1418 code = ';'.join(( 1419 'import subprocess, sys', 1420 'retcode = subprocess.call(' 1421 "[sys.executable, '-c', 'print(\"Hello World!\")'])", 1422 'assert retcode == 0')) 1423 output = subprocess.check_output([sys.executable, '-c', code]) 1424 self.assertTrue(output.startswith(b'Hello World!'), ascii(output)) 1425 1426 def test_handles_closed_on_exception(self): 1427 # If CreateProcess exits with an error, ensure the 1428 # duplicate output handles are released 1429 ifhandle, ifname = tempfile.mkstemp() 1430 ofhandle, ofname = tempfile.mkstemp() 1431 efhandle, efname = tempfile.mkstemp() 1432 try: 1433 subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle, 1434 stderr=efhandle) 1435 except OSError: 1436 os.close(ifhandle) 1437 os.remove(ifname) 1438 os.close(ofhandle) 1439 os.remove(ofname) 1440 os.close(efhandle) 1441 os.remove(efname) 1442 self.assertFalse(os.path.exists(ifname)) 1443 self.assertFalse(os.path.exists(ofname)) 1444 self.assertFalse(os.path.exists(efname)) 1445 1446 def test_communicate_epipe(self): 1447 # Issue 10963: communicate() should hide EPIPE 1448 p = subprocess.Popen(ZERO_RETURN_CMD, 1449 stdin=subprocess.PIPE, 1450 stdout=subprocess.PIPE, 1451 stderr=subprocess.PIPE) 1452 self.addCleanup(p.stdout.close) 1453 self.addCleanup(p.stderr.close) 1454 self.addCleanup(p.stdin.close) 1455 p.communicate(b"x" * 2**20) 1456 1457 def test_repr(self): 1458 path_cmd = pathlib.Path("my-tool.py") 1459 pathlib_cls = path_cmd.__class__.__name__ 1460 1461 cases = [ 1462 ("ls", True, 123, "<Popen: returncode: 123 args: 'ls'>"), 1463 ('a' * 100, True, 0, 1464 "<Popen: returncode: 0 args: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa...>"), 1465 (["ls"], False, None, "<Popen: returncode: None args: ['ls']>"), 1466 (["ls", '--my-opts', 'a' * 100], False, None, 1467 "<Popen: returncode: None args: ['ls', '--my-opts', 'aaaaaaaaaaaaaaaaaaaaaaaa...>"), 1468 (path_cmd, False, 7, f"<Popen: returncode: 7 args: {pathlib_cls}('my-tool.py')>") 1469 ] 1470 with unittest.mock.patch.object(subprocess.Popen, '_execute_child'): 1471 for cmd, shell, code, sx in cases: 1472 p = subprocess.Popen(cmd, shell=shell) 1473 p.returncode = code 1474 self.assertEqual(repr(p), sx) 1475 1476 def test_communicate_epipe_only_stdin(self): 1477 # Issue 10963: communicate() should hide EPIPE 1478 p = subprocess.Popen(ZERO_RETURN_CMD, 1479 stdin=subprocess.PIPE) 1480 self.addCleanup(p.stdin.close) 1481 p.wait() 1482 p.communicate(b"x" * 2**20) 1483 1484 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 1485 "Requires signal.SIGUSR1") 1486 @unittest.skipUnless(hasattr(os, 'kill'), 1487 "Requires os.kill") 1488 @unittest.skipUnless(hasattr(os, 'getppid'), 1489 "Requires os.getppid") 1490 def test_communicate_eintr(self): 1491 # Issue #12493: communicate() should handle EINTR 1492 def handler(signum, frame): 1493 pass 1494 old_handler = signal.signal(signal.SIGUSR1, handler) 1495 self.addCleanup(signal.signal, signal.SIGUSR1, old_handler) 1496 1497 args = [sys.executable, "-c", 1498 'import os, signal;' 1499 'os.kill(os.getppid(), signal.SIGUSR1)'] 1500 for stream in ('stdout', 'stderr'): 1501 kw = {stream: subprocess.PIPE} 1502 with subprocess.Popen(args, **kw) as process: 1503 # communicate() will be interrupted by SIGUSR1 1504 process.communicate() 1505 1506 1507 # This test is Linux-ish specific for simplicity to at least have 1508 # some coverage. It is not a platform specific bug. 1509 @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()), 1510 "Linux specific") 1511 def test_failed_child_execute_fd_leak(self): 1512 """Test for the fork() failure fd leak reported in issue16327.""" 1513 fd_directory = '/proc/%d/fd' % os.getpid() 1514 fds_before_popen = os.listdir(fd_directory) 1515 with self.assertRaises(PopenTestException): 1516 PopenExecuteChildRaises( 1517 ZERO_RETURN_CMD, stdin=subprocess.PIPE, 1518 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 1519 1520 # NOTE: This test doesn't verify that the real _execute_child 1521 # does not close the file descriptors itself on the way out 1522 # during an exception. Code inspection has confirmed that. 1523 1524 fds_after_exception = os.listdir(fd_directory) 1525 self.assertEqual(fds_before_popen, fds_after_exception) 1526 1527 @unittest.skipIf(mswindows, "behavior currently not supported on Windows") 1528 def test_file_not_found_includes_filename(self): 1529 with self.assertRaises(FileNotFoundError) as c: 1530 subprocess.call(['/opt/nonexistent_binary', 'with', 'some', 'args']) 1531 self.assertEqual(c.exception.filename, '/opt/nonexistent_binary') 1532 1533 @unittest.skipIf(mswindows, "behavior currently not supported on Windows") 1534 def test_file_not_found_with_bad_cwd(self): 1535 with self.assertRaises(FileNotFoundError) as c: 1536 subprocess.Popen(['exit', '0'], cwd='/some/nonexistent/directory') 1537 self.assertEqual(c.exception.filename, '/some/nonexistent/directory') 1538 1539 def test_class_getitems(self): 1540 self.assertIsInstance(subprocess.Popen[bytes], types.GenericAlias) 1541 self.assertIsInstance(subprocess.CompletedProcess[str], types.GenericAlias) 1542 1543class RunFuncTestCase(BaseTestCase): 1544 def run_python(self, code, **kwargs): 1545 """Run Python code in a subprocess using subprocess.run""" 1546 argv = [sys.executable, "-c", code] 1547 return subprocess.run(argv, **kwargs) 1548 1549 def test_returncode(self): 1550 # call() function with sequence argument 1551 cp = self.run_python("import sys; sys.exit(47)") 1552 self.assertEqual(cp.returncode, 47) 1553 with self.assertRaises(subprocess.CalledProcessError): 1554 cp.check_returncode() 1555 1556 def test_check(self): 1557 with self.assertRaises(subprocess.CalledProcessError) as c: 1558 self.run_python("import sys; sys.exit(47)", check=True) 1559 self.assertEqual(c.exception.returncode, 47) 1560 1561 def test_check_zero(self): 1562 # check_returncode shouldn't raise when returncode is zero 1563 cp = subprocess.run(ZERO_RETURN_CMD, check=True) 1564 self.assertEqual(cp.returncode, 0) 1565 1566 def test_timeout(self): 1567 # run() function with timeout argument; we want to test that the child 1568 # process gets killed when the timeout expires. If the child isn't 1569 # killed, this call will deadlock since subprocess.run waits for the 1570 # child. 1571 with self.assertRaises(subprocess.TimeoutExpired): 1572 self.run_python("while True: pass", timeout=0.0001) 1573 1574 def test_capture_stdout(self): 1575 # capture stdout with zero return code 1576 cp = self.run_python("print('BDFL')", stdout=subprocess.PIPE) 1577 self.assertIn(b'BDFL', cp.stdout) 1578 1579 def test_capture_stderr(self): 1580 cp = self.run_python("import sys; sys.stderr.write('BDFL')", 1581 stderr=subprocess.PIPE) 1582 self.assertIn(b'BDFL', cp.stderr) 1583 1584 def test_check_output_stdin_arg(self): 1585 # run() can be called with stdin set to a file 1586 tf = tempfile.TemporaryFile() 1587 self.addCleanup(tf.close) 1588 tf.write(b'pear') 1589 tf.seek(0) 1590 cp = self.run_python( 1591 "import sys; sys.stdout.write(sys.stdin.read().upper())", 1592 stdin=tf, stdout=subprocess.PIPE) 1593 self.assertIn(b'PEAR', cp.stdout) 1594 1595 def test_check_output_input_arg(self): 1596 # check_output() can be called with input set to a string 1597 cp = self.run_python( 1598 "import sys; sys.stdout.write(sys.stdin.read().upper())", 1599 input=b'pear', stdout=subprocess.PIPE) 1600 self.assertIn(b'PEAR', cp.stdout) 1601 1602 def test_check_output_stdin_with_input_arg(self): 1603 # run() refuses to accept 'stdin' with 'input' 1604 tf = tempfile.TemporaryFile() 1605 self.addCleanup(tf.close) 1606 tf.write(b'pear') 1607 tf.seek(0) 1608 with self.assertRaises(ValueError, 1609 msg="Expected ValueError when stdin and input args supplied.") as c: 1610 output = self.run_python("print('will not be run')", 1611 stdin=tf, input=b'hare') 1612 self.assertIn('stdin', c.exception.args[0]) 1613 self.assertIn('input', c.exception.args[0]) 1614 1615 def test_check_output_timeout(self): 1616 with self.assertRaises(subprocess.TimeoutExpired) as c: 1617 cp = self.run_python(( 1618 "import sys, time\n" 1619 "sys.stdout.write('BDFL')\n" 1620 "sys.stdout.flush()\n" 1621 "time.sleep(3600)"), 1622 # Some heavily loaded buildbots (sparc Debian 3.x) require 1623 # this much time to start and print. 1624 timeout=3, stdout=subprocess.PIPE) 1625 self.assertEqual(c.exception.output, b'BDFL') 1626 # output is aliased to stdout 1627 self.assertEqual(c.exception.stdout, b'BDFL') 1628 1629 def test_run_kwargs(self): 1630 newenv = os.environ.copy() 1631 newenv["FRUIT"] = "banana" 1632 cp = self.run_python(('import sys, os;' 1633 'sys.exit(33 if os.getenv("FRUIT")=="banana" else 31)'), 1634 env=newenv) 1635 self.assertEqual(cp.returncode, 33) 1636 1637 def test_run_with_pathlike_path(self): 1638 # bpo-31961: test run(pathlike_object) 1639 # the name of a command that can be run without 1640 # any arguments that exit fast 1641 prog = 'tree.com' if mswindows else 'ls' 1642 path = shutil.which(prog) 1643 if path is None: 1644 self.skipTest(f'{prog} required for this test') 1645 path = FakePath(path) 1646 res = subprocess.run(path, stdout=subprocess.DEVNULL) 1647 self.assertEqual(res.returncode, 0) 1648 with self.assertRaises(TypeError): 1649 subprocess.run(path, stdout=subprocess.DEVNULL, shell=True) 1650 1651 def test_run_with_bytes_path_and_arguments(self): 1652 # bpo-31961: test run([bytes_object, b'additional arguments']) 1653 path = os.fsencode(sys.executable) 1654 args = [path, '-c', b'import sys; sys.exit(57)'] 1655 res = subprocess.run(args) 1656 self.assertEqual(res.returncode, 57) 1657 1658 def test_run_with_pathlike_path_and_arguments(self): 1659 # bpo-31961: test run([pathlike_object, 'additional arguments']) 1660 path = FakePath(sys.executable) 1661 args = [path, '-c', 'import sys; sys.exit(57)'] 1662 res = subprocess.run(args) 1663 self.assertEqual(res.returncode, 57) 1664 1665 def test_capture_output(self): 1666 cp = self.run_python(("import sys;" 1667 "sys.stdout.write('BDFL'); " 1668 "sys.stderr.write('FLUFL')"), 1669 capture_output=True) 1670 self.assertIn(b'BDFL', cp.stdout) 1671 self.assertIn(b'FLUFL', cp.stderr) 1672 1673 def test_stdout_with_capture_output_arg(self): 1674 # run() refuses to accept 'stdout' with 'capture_output' 1675 tf = tempfile.TemporaryFile() 1676 self.addCleanup(tf.close) 1677 with self.assertRaises(ValueError, 1678 msg=("Expected ValueError when stdout and capture_output " 1679 "args supplied.")) as c: 1680 output = self.run_python("print('will not be run')", 1681 capture_output=True, stdout=tf) 1682 self.assertIn('stdout', c.exception.args[0]) 1683 self.assertIn('capture_output', c.exception.args[0]) 1684 1685 def test_stderr_with_capture_output_arg(self): 1686 # run() refuses to accept 'stderr' with 'capture_output' 1687 tf = tempfile.TemporaryFile() 1688 self.addCleanup(tf.close) 1689 with self.assertRaises(ValueError, 1690 msg=("Expected ValueError when stderr and capture_output " 1691 "args supplied.")) as c: 1692 output = self.run_python("print('will not be run')", 1693 capture_output=True, stderr=tf) 1694 self.assertIn('stderr', c.exception.args[0]) 1695 self.assertIn('capture_output', c.exception.args[0]) 1696 1697 # This test _might_ wind up a bit fragile on loaded build+test machines 1698 # as it depends on the timing with wide enough margins for normal situations 1699 # but does assert that it happened "soon enough" to believe the right thing 1700 # happened. 1701 @unittest.skipIf(mswindows, "requires posix like 'sleep' shell command") 1702 def test_run_with_shell_timeout_and_capture_output(self): 1703 """Output capturing after a timeout mustn't hang forever on open filehandles.""" 1704 before_secs = time.monotonic() 1705 try: 1706 subprocess.run('sleep 3', shell=True, timeout=0.1, 1707 capture_output=True) # New session unspecified. 1708 except subprocess.TimeoutExpired as exc: 1709 after_secs = time.monotonic() 1710 stacks = traceback.format_exc() # assertRaises doesn't give this. 1711 else: 1712 self.fail("TimeoutExpired not raised.") 1713 self.assertLess(after_secs - before_secs, 1.5, 1714 msg="TimeoutExpired was delayed! Bad traceback:\n```\n" 1715 f"{stacks}```") 1716 1717 1718def _get_test_grp_name(): 1719 for name_group in ('staff', 'nogroup', 'grp', 'nobody', 'nfsnobody'): 1720 if grp: 1721 try: 1722 grp.getgrnam(name_group) 1723 except KeyError: 1724 continue 1725 return name_group 1726 else: 1727 raise unittest.SkipTest('No identified group name to use for this test on this platform.') 1728 1729 1730@unittest.skipIf(mswindows, "POSIX specific tests") 1731class POSIXProcessTestCase(BaseTestCase): 1732 1733 def setUp(self): 1734 super().setUp() 1735 self._nonexistent_dir = "/_this/pa.th/does/not/exist" 1736 1737 def _get_chdir_exception(self): 1738 try: 1739 os.chdir(self._nonexistent_dir) 1740 except OSError as e: 1741 # This avoids hard coding the errno value or the OS perror() 1742 # string and instead capture the exception that we want to see 1743 # below for comparison. 1744 desired_exception = e 1745 else: 1746 self.fail("chdir to nonexistent directory %s succeeded." % 1747 self._nonexistent_dir) 1748 return desired_exception 1749 1750 def test_exception_cwd(self): 1751 """Test error in the child raised in the parent for a bad cwd.""" 1752 desired_exception = self._get_chdir_exception() 1753 try: 1754 p = subprocess.Popen([sys.executable, "-c", ""], 1755 cwd=self._nonexistent_dir) 1756 except OSError as e: 1757 # Test that the child process chdir failure actually makes 1758 # it up to the parent process as the correct exception. 1759 self.assertEqual(desired_exception.errno, e.errno) 1760 self.assertEqual(desired_exception.strerror, e.strerror) 1761 self.assertEqual(desired_exception.filename, e.filename) 1762 else: 1763 self.fail("Expected OSError: %s" % desired_exception) 1764 1765 def test_exception_bad_executable(self): 1766 """Test error in the child raised in the parent for a bad executable.""" 1767 desired_exception = self._get_chdir_exception() 1768 try: 1769 p = subprocess.Popen([sys.executable, "-c", ""], 1770 executable=self._nonexistent_dir) 1771 except OSError as e: 1772 # Test that the child process exec failure actually makes 1773 # it up to the parent process as the correct exception. 1774 self.assertEqual(desired_exception.errno, e.errno) 1775 self.assertEqual(desired_exception.strerror, e.strerror) 1776 self.assertEqual(desired_exception.filename, e.filename) 1777 else: 1778 self.fail("Expected OSError: %s" % desired_exception) 1779 1780 def test_exception_bad_args_0(self): 1781 """Test error in the child raised in the parent for a bad args[0].""" 1782 desired_exception = self._get_chdir_exception() 1783 try: 1784 p = subprocess.Popen([self._nonexistent_dir, "-c", ""]) 1785 except OSError as e: 1786 # Test that the child process exec failure actually makes 1787 # it up to the parent process as the correct exception. 1788 self.assertEqual(desired_exception.errno, e.errno) 1789 self.assertEqual(desired_exception.strerror, e.strerror) 1790 self.assertEqual(desired_exception.filename, e.filename) 1791 else: 1792 self.fail("Expected OSError: %s" % desired_exception) 1793 1794 # We mock the __del__ method for Popen in the next two tests 1795 # because it does cleanup based on the pid returned by fork_exec 1796 # along with issuing a resource warning if it still exists. Since 1797 # we don't actually spawn a process in these tests we can forego 1798 # the destructor. An alternative would be to set _child_created to 1799 # False before the destructor is called but there is no easy way 1800 # to do that 1801 class PopenNoDestructor(subprocess.Popen): 1802 def __del__(self): 1803 pass 1804 1805 @mock.patch("subprocess._posixsubprocess.fork_exec") 1806 def test_exception_errpipe_normal(self, fork_exec): 1807 """Test error passing done through errpipe_write in the good case""" 1808 def proper_error(*args): 1809 errpipe_write = args[13] 1810 # Write the hex for the error code EISDIR: 'is a directory' 1811 err_code = '{:x}'.format(errno.EISDIR).encode() 1812 os.write(errpipe_write, b"OSError:" + err_code + b":") 1813 return 0 1814 1815 fork_exec.side_effect = proper_error 1816 1817 with mock.patch("subprocess.os.waitpid", 1818 side_effect=ChildProcessError): 1819 with self.assertRaises(IsADirectoryError): 1820 self.PopenNoDestructor(["non_existent_command"]) 1821 1822 @mock.patch("subprocess._posixsubprocess.fork_exec") 1823 def test_exception_errpipe_bad_data(self, fork_exec): 1824 """Test error passing done through errpipe_write where its not 1825 in the expected format""" 1826 error_data = b"\xFF\x00\xDE\xAD" 1827 def bad_error(*args): 1828 errpipe_write = args[13] 1829 # Anything can be in the pipe, no assumptions should 1830 # be made about its encoding, so we'll write some 1831 # arbitrary hex bytes to test it out 1832 os.write(errpipe_write, error_data) 1833 return 0 1834 1835 fork_exec.side_effect = bad_error 1836 1837 with mock.patch("subprocess.os.waitpid", 1838 side_effect=ChildProcessError): 1839 with self.assertRaises(subprocess.SubprocessError) as e: 1840 self.PopenNoDestructor(["non_existent_command"]) 1841 1842 self.assertIn(repr(error_data), str(e.exception)) 1843 1844 @unittest.skipIf(not os.path.exists('/proc/self/status'), 1845 "need /proc/self/status") 1846 def test_restore_signals(self): 1847 # Blindly assume that cat exists on systems with /proc/self/status... 1848 default_proc_status = subprocess.check_output( 1849 ['cat', '/proc/self/status'], 1850 restore_signals=False) 1851 for line in default_proc_status.splitlines(): 1852 if line.startswith(b'SigIgn'): 1853 default_sig_ign_mask = line 1854 break 1855 else: 1856 self.skipTest("SigIgn not found in /proc/self/status.") 1857 restored_proc_status = subprocess.check_output( 1858 ['cat', '/proc/self/status'], 1859 restore_signals=True) 1860 for line in restored_proc_status.splitlines(): 1861 if line.startswith(b'SigIgn'): 1862 restored_sig_ign_mask = line 1863 break 1864 self.assertNotEqual(default_sig_ign_mask, restored_sig_ign_mask, 1865 msg="restore_signals=True should've unblocked " 1866 "SIGPIPE and friends.") 1867 1868 def test_start_new_session(self): 1869 # For code coverage of calling setsid(). We don't care if we get an 1870 # EPERM error from it depending on the test execution environment, that 1871 # still indicates that it was called. 1872 try: 1873 output = subprocess.check_output( 1874 [sys.executable, "-c", "import os; print(os.getsid(0))"], 1875 start_new_session=True) 1876 except OSError as e: 1877 if e.errno != errno.EPERM: 1878 raise 1879 else: 1880 parent_sid = os.getsid(0) 1881 child_sid = int(output) 1882 self.assertNotEqual(parent_sid, child_sid) 1883 1884 @unittest.skipUnless(hasattr(os, 'setreuid'), 'no setreuid on platform') 1885 def test_user(self): 1886 # For code coverage of the user parameter. We don't care if we get an 1887 # EPERM error from it depending on the test execution environment, that 1888 # still indicates that it was called. 1889 1890 uid = os.geteuid() 1891 test_users = [65534 if uid != 65534 else 65533, uid] 1892 name_uid = "nobody" if sys.platform != 'darwin' else "unknown" 1893 1894 if pwd is not None: 1895 try: 1896 pwd.getpwnam(name_uid) 1897 test_users.append(name_uid) 1898 except KeyError: 1899 # unknown user name 1900 name_uid = None 1901 1902 for user in test_users: 1903 # posix_spawn() may be used with close_fds=False 1904 for close_fds in (False, True): 1905 with self.subTest(user=user, close_fds=close_fds): 1906 try: 1907 output = subprocess.check_output( 1908 [sys.executable, "-c", 1909 "import os; print(os.getuid())"], 1910 user=user, 1911 close_fds=close_fds) 1912 except PermissionError: # (EACCES, EPERM) 1913 pass 1914 except OSError as e: 1915 if e.errno not in (errno.EACCES, errno.EPERM): 1916 raise 1917 else: 1918 if isinstance(user, str): 1919 user_uid = pwd.getpwnam(user).pw_uid 1920 else: 1921 user_uid = user 1922 child_user = int(output) 1923 self.assertEqual(child_user, user_uid) 1924 1925 with self.assertRaises(ValueError): 1926 subprocess.check_call(ZERO_RETURN_CMD, user=-1) 1927 1928 with self.assertRaises(OverflowError): 1929 subprocess.check_call(ZERO_RETURN_CMD, 1930 cwd=os.curdir, env=os.environ, user=2**64) 1931 1932 if pwd is None and name_uid is not None: 1933 with self.assertRaises(ValueError): 1934 subprocess.check_call(ZERO_RETURN_CMD, user=name_uid) 1935 1936 @unittest.skipIf(hasattr(os, 'setreuid'), 'setreuid() available on platform') 1937 def test_user_error(self): 1938 with self.assertRaises(ValueError): 1939 subprocess.check_call(ZERO_RETURN_CMD, user=65535) 1940 1941 @unittest.skipUnless(hasattr(os, 'setregid'), 'no setregid() on platform') 1942 def test_group(self): 1943 gid = os.getegid() 1944 group_list = [65534 if gid != 65534 else 65533] 1945 name_group = _get_test_grp_name() 1946 1947 if grp is not None: 1948 group_list.append(name_group) 1949 1950 for group in group_list + [gid]: 1951 # posix_spawn() may be used with close_fds=False 1952 for close_fds in (False, True): 1953 with self.subTest(group=group, close_fds=close_fds): 1954 try: 1955 output = subprocess.check_output( 1956 [sys.executable, "-c", 1957 "import os; print(os.getgid())"], 1958 group=group, 1959 close_fds=close_fds) 1960 except PermissionError: # (EACCES, EPERM) 1961 pass 1962 else: 1963 if isinstance(group, str): 1964 group_gid = grp.getgrnam(group).gr_gid 1965 else: 1966 group_gid = group 1967 1968 child_group = int(output) 1969 self.assertEqual(child_group, group_gid) 1970 1971 # make sure we bomb on negative values 1972 with self.assertRaises(ValueError): 1973 subprocess.check_call(ZERO_RETURN_CMD, group=-1) 1974 1975 with self.assertRaises(OverflowError): 1976 subprocess.check_call(ZERO_RETURN_CMD, 1977 cwd=os.curdir, env=os.environ, group=2**64) 1978 1979 if grp is None: 1980 with self.assertRaises(ValueError): 1981 subprocess.check_call(ZERO_RETURN_CMD, group=name_group) 1982 1983 @unittest.skipIf(hasattr(os, 'setregid'), 'setregid() available on platform') 1984 def test_group_error(self): 1985 with self.assertRaises(ValueError): 1986 subprocess.check_call(ZERO_RETURN_CMD, group=65535) 1987 1988 @unittest.skipUnless(hasattr(os, 'setgroups'), 'no setgroups() on platform') 1989 def test_extra_groups(self): 1990 gid = os.getegid() 1991 group_list = [65534 if gid != 65534 else 65533] 1992 name_group = _get_test_grp_name() 1993 perm_error = False 1994 1995 if grp is not None: 1996 group_list.append(name_group) 1997 1998 try: 1999 output = subprocess.check_output( 2000 [sys.executable, "-c", 2001 "import os, sys, json; json.dump(os.getgroups(), sys.stdout)"], 2002 extra_groups=group_list) 2003 except OSError as ex: 2004 if ex.errno != errno.EPERM: 2005 raise 2006 perm_error = True 2007 2008 else: 2009 parent_groups = os.getgroups() 2010 child_groups = json.loads(output) 2011 2012 if grp is not None: 2013 desired_gids = [grp.getgrnam(g).gr_gid if isinstance(g, str) else g 2014 for g in group_list] 2015 else: 2016 desired_gids = group_list 2017 2018 if perm_error: 2019 self.assertEqual(set(child_groups), set(parent_groups)) 2020 else: 2021 self.assertEqual(set(desired_gids), set(child_groups)) 2022 2023 # make sure we bomb on negative values 2024 with self.assertRaises(ValueError): 2025 subprocess.check_call(ZERO_RETURN_CMD, extra_groups=[-1]) 2026 2027 with self.assertRaises(ValueError): 2028 subprocess.check_call(ZERO_RETURN_CMD, 2029 cwd=os.curdir, env=os.environ, 2030 extra_groups=[2**64]) 2031 2032 if grp is None: 2033 with self.assertRaises(ValueError): 2034 subprocess.check_call(ZERO_RETURN_CMD, 2035 extra_groups=[name_group]) 2036 2037 @unittest.skipIf(hasattr(os, 'setgroups'), 'setgroups() available on platform') 2038 def test_extra_groups_error(self): 2039 with self.assertRaises(ValueError): 2040 subprocess.check_call(ZERO_RETURN_CMD, extra_groups=[]) 2041 2042 @unittest.skipIf(mswindows or not hasattr(os, 'umask'), 2043 'POSIX umask() is not available.') 2044 def test_umask(self): 2045 tmpdir = None 2046 try: 2047 tmpdir = tempfile.mkdtemp() 2048 name = os.path.join(tmpdir, "beans") 2049 # We set an unusual umask in the child so as a unique mode 2050 # for us to test the child's touched file for. 2051 subprocess.check_call( 2052 [sys.executable, "-c", f"open({name!r}, 'w').close()"], 2053 umask=0o053) 2054 # Ignore execute permissions entirely in our test, 2055 # filesystems could be mounted to ignore or force that. 2056 st_mode = os.stat(name).st_mode & 0o666 2057 expected_mode = 0o624 2058 self.assertEqual(expected_mode, st_mode, 2059 msg=f'{oct(expected_mode)} != {oct(st_mode)}') 2060 finally: 2061 if tmpdir is not None: 2062 shutil.rmtree(tmpdir) 2063 2064 def test_run_abort(self): 2065 # returncode handles signal termination 2066 with support.SuppressCrashReport(): 2067 p = subprocess.Popen([sys.executable, "-c", 2068 'import os; os.abort()']) 2069 p.wait() 2070 self.assertEqual(-p.returncode, signal.SIGABRT) 2071 2072 def test_CalledProcessError_str_signal(self): 2073 err = subprocess.CalledProcessError(-int(signal.SIGABRT), "fake cmd") 2074 error_string = str(err) 2075 # We're relying on the repr() of the signal.Signals intenum to provide 2076 # the word signal, the signal name and the numeric value. 2077 self.assertIn("signal", error_string.lower()) 2078 # We're not being specific about the signal name as some signals have 2079 # multiple names and which name is revealed can vary. 2080 self.assertIn("SIG", error_string) 2081 self.assertIn(str(signal.SIGABRT), error_string) 2082 2083 def test_CalledProcessError_str_unknown_signal(self): 2084 err = subprocess.CalledProcessError(-9876543, "fake cmd") 2085 error_string = str(err) 2086 self.assertIn("unknown signal 9876543.", error_string) 2087 2088 def test_CalledProcessError_str_non_zero(self): 2089 err = subprocess.CalledProcessError(2, "fake cmd") 2090 error_string = str(err) 2091 self.assertIn("non-zero exit status 2.", error_string) 2092 2093 def test_preexec(self): 2094 # DISCLAIMER: Setting environment variables is *not* a good use 2095 # of a preexec_fn. This is merely a test. 2096 p = subprocess.Popen([sys.executable, "-c", 2097 'import sys,os;' 2098 'sys.stdout.write(os.getenv("FRUIT"))'], 2099 stdout=subprocess.PIPE, 2100 preexec_fn=lambda: os.putenv("FRUIT", "apple")) 2101 with p: 2102 self.assertEqual(p.stdout.read(), b"apple") 2103 2104 def test_preexec_exception(self): 2105 def raise_it(): 2106 raise ValueError("What if two swallows carried a coconut?") 2107 try: 2108 p = subprocess.Popen([sys.executable, "-c", ""], 2109 preexec_fn=raise_it) 2110 except subprocess.SubprocessError as e: 2111 self.assertTrue( 2112 subprocess._posixsubprocess, 2113 "Expected a ValueError from the preexec_fn") 2114 except ValueError as e: 2115 self.assertIn("coconut", e.args[0]) 2116 else: 2117 self.fail("Exception raised by preexec_fn did not make it " 2118 "to the parent process.") 2119 2120 class _TestExecuteChildPopen(subprocess.Popen): 2121 """Used to test behavior at the end of _execute_child.""" 2122 def __init__(self, testcase, *args, **kwargs): 2123 self._testcase = testcase 2124 subprocess.Popen.__init__(self, *args, **kwargs) 2125 2126 def _execute_child(self, *args, **kwargs): 2127 try: 2128 subprocess.Popen._execute_child(self, *args, **kwargs) 2129 finally: 2130 # Open a bunch of file descriptors and verify that 2131 # none of them are the same as the ones the Popen 2132 # instance is using for stdin/stdout/stderr. 2133 devzero_fds = [os.open("/dev/zero", os.O_RDONLY) 2134 for _ in range(8)] 2135 try: 2136 for fd in devzero_fds: 2137 self._testcase.assertNotIn( 2138 fd, (self.stdin.fileno(), self.stdout.fileno(), 2139 self.stderr.fileno()), 2140 msg="At least one fd was closed early.") 2141 finally: 2142 for fd in devzero_fds: 2143 os.close(fd) 2144 2145 @unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.") 2146 def test_preexec_errpipe_does_not_double_close_pipes(self): 2147 """Issue16140: Don't double close pipes on preexec error.""" 2148 2149 def raise_it(): 2150 raise subprocess.SubprocessError( 2151 "force the _execute_child() errpipe_data path.") 2152 2153 with self.assertRaises(subprocess.SubprocessError): 2154 self._TestExecuteChildPopen( 2155 self, ZERO_RETURN_CMD, 2156 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 2157 stderr=subprocess.PIPE, preexec_fn=raise_it) 2158 2159 def test_preexec_gc_module_failure(self): 2160 # This tests the code that disables garbage collection if the child 2161 # process will execute any Python. 2162 enabled = gc.isenabled() 2163 try: 2164 gc.disable() 2165 self.assertFalse(gc.isenabled()) 2166 subprocess.call([sys.executable, '-c', ''], 2167 preexec_fn=lambda: None) 2168 self.assertFalse(gc.isenabled(), 2169 "Popen enabled gc when it shouldn't.") 2170 2171 gc.enable() 2172 self.assertTrue(gc.isenabled()) 2173 subprocess.call([sys.executable, '-c', ''], 2174 preexec_fn=lambda: None) 2175 self.assertTrue(gc.isenabled(), "Popen left gc disabled.") 2176 finally: 2177 if not enabled: 2178 gc.disable() 2179 2180 @unittest.skipIf( 2181 sys.platform == 'darwin', 'setrlimit() seems to fail on OS X') 2182 def test_preexec_fork_failure(self): 2183 # The internal code did not preserve the previous exception when 2184 # re-enabling garbage collection 2185 try: 2186 from resource import getrlimit, setrlimit, RLIMIT_NPROC 2187 except ImportError as err: 2188 self.skipTest(err) # RLIMIT_NPROC is specific to Linux and BSD 2189 limits = getrlimit(RLIMIT_NPROC) 2190 [_, hard] = limits 2191 setrlimit(RLIMIT_NPROC, (0, hard)) 2192 self.addCleanup(setrlimit, RLIMIT_NPROC, limits) 2193 try: 2194 subprocess.call([sys.executable, '-c', ''], 2195 preexec_fn=lambda: None) 2196 except BlockingIOError: 2197 # Forking should raise EAGAIN, translated to BlockingIOError 2198 pass 2199 else: 2200 self.skipTest('RLIMIT_NPROC had no effect; probably superuser') 2201 2202 def test_args_string(self): 2203 # args is a string 2204 fd, fname = tempfile.mkstemp() 2205 # reopen in text mode 2206 with open(fd, "w", errors="surrogateescape") as fobj: 2207 fobj.write("#!%s\n" % support.unix_shell) 2208 fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" % 2209 sys.executable) 2210 os.chmod(fname, 0o700) 2211 p = subprocess.Popen(fname) 2212 p.wait() 2213 os.remove(fname) 2214 self.assertEqual(p.returncode, 47) 2215 2216 def test_invalid_args(self): 2217 # invalid arguments should raise ValueError 2218 self.assertRaises(ValueError, subprocess.call, 2219 [sys.executable, "-c", 2220 "import sys; sys.exit(47)"], 2221 startupinfo=47) 2222 self.assertRaises(ValueError, subprocess.call, 2223 [sys.executable, "-c", 2224 "import sys; sys.exit(47)"], 2225 creationflags=47) 2226 2227 def test_shell_sequence(self): 2228 # Run command through the shell (sequence) 2229 newenv = os.environ.copy() 2230 newenv["FRUIT"] = "apple" 2231 p = subprocess.Popen(["echo $FRUIT"], shell=1, 2232 stdout=subprocess.PIPE, 2233 env=newenv) 2234 with p: 2235 self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple") 2236 2237 def test_shell_string(self): 2238 # Run command through the shell (string) 2239 newenv = os.environ.copy() 2240 newenv["FRUIT"] = "apple" 2241 p = subprocess.Popen("echo $FRUIT", shell=1, 2242 stdout=subprocess.PIPE, 2243 env=newenv) 2244 with p: 2245 self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple") 2246 2247 def test_call_string(self): 2248 # call() function with string argument on UNIX 2249 fd, fname = tempfile.mkstemp() 2250 # reopen in text mode 2251 with open(fd, "w", errors="surrogateescape") as fobj: 2252 fobj.write("#!%s\n" % support.unix_shell) 2253 fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" % 2254 sys.executable) 2255 os.chmod(fname, 0o700) 2256 rc = subprocess.call(fname) 2257 os.remove(fname) 2258 self.assertEqual(rc, 47) 2259 2260 def test_specific_shell(self): 2261 # Issue #9265: Incorrect name passed as arg[0]. 2262 shells = [] 2263 for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']: 2264 for name in ['bash', 'ksh']: 2265 sh = os.path.join(prefix, name) 2266 if os.path.isfile(sh): 2267 shells.append(sh) 2268 if not shells: # Will probably work for any shell but csh. 2269 self.skipTest("bash or ksh required for this test") 2270 sh = '/bin/sh' 2271 if os.path.isfile(sh) and not os.path.islink(sh): 2272 # Test will fail if /bin/sh is a symlink to csh. 2273 shells.append(sh) 2274 for sh in shells: 2275 p = subprocess.Popen("echo $0", executable=sh, shell=True, 2276 stdout=subprocess.PIPE) 2277 with p: 2278 self.assertEqual(p.stdout.read().strip(), bytes(sh, 'ascii')) 2279 2280 def _kill_process(self, method, *args): 2281 # Do not inherit file handles from the parent. 2282 # It should fix failures on some platforms. 2283 # Also set the SIGINT handler to the default to make sure it's not 2284 # being ignored (some tests rely on that.) 2285 old_handler = signal.signal(signal.SIGINT, signal.default_int_handler) 2286 try: 2287 p = subprocess.Popen([sys.executable, "-c", """if 1: 2288 import sys, time 2289 sys.stdout.write('x\\n') 2290 sys.stdout.flush() 2291 time.sleep(30) 2292 """], 2293 close_fds=True, 2294 stdin=subprocess.PIPE, 2295 stdout=subprocess.PIPE, 2296 stderr=subprocess.PIPE) 2297 finally: 2298 signal.signal(signal.SIGINT, old_handler) 2299 # Wait for the interpreter to be completely initialized before 2300 # sending any signal. 2301 p.stdout.read(1) 2302 getattr(p, method)(*args) 2303 return p 2304 2305 @unittest.skipIf(sys.platform.startswith(('netbsd', 'openbsd')), 2306 "Due to known OS bug (issue #16762)") 2307 def _kill_dead_process(self, method, *args): 2308 # Do not inherit file handles from the parent. 2309 # It should fix failures on some platforms. 2310 p = subprocess.Popen([sys.executable, "-c", """if 1: 2311 import sys, time 2312 sys.stdout.write('x\\n') 2313 sys.stdout.flush() 2314 """], 2315 close_fds=True, 2316 stdin=subprocess.PIPE, 2317 stdout=subprocess.PIPE, 2318 stderr=subprocess.PIPE) 2319 # Wait for the interpreter to be completely initialized before 2320 # sending any signal. 2321 p.stdout.read(1) 2322 # The process should end after this 2323 time.sleep(1) 2324 # This shouldn't raise even though the child is now dead 2325 getattr(p, method)(*args) 2326 p.communicate() 2327 2328 def test_send_signal(self): 2329 p = self._kill_process('send_signal', signal.SIGINT) 2330 _, stderr = p.communicate() 2331 self.assertIn(b'KeyboardInterrupt', stderr) 2332 self.assertNotEqual(p.wait(), 0) 2333 2334 def test_kill(self): 2335 p = self._kill_process('kill') 2336 _, stderr = p.communicate() 2337 self.assertEqual(stderr, b'') 2338 self.assertEqual(p.wait(), -signal.SIGKILL) 2339 2340 def test_terminate(self): 2341 p = self._kill_process('terminate') 2342 _, stderr = p.communicate() 2343 self.assertEqual(stderr, b'') 2344 self.assertEqual(p.wait(), -signal.SIGTERM) 2345 2346 def test_send_signal_dead(self): 2347 # Sending a signal to a dead process 2348 self._kill_dead_process('send_signal', signal.SIGINT) 2349 2350 def test_kill_dead(self): 2351 # Killing a dead process 2352 self._kill_dead_process('kill') 2353 2354 def test_terminate_dead(self): 2355 # Terminating a dead process 2356 self._kill_dead_process('terminate') 2357 2358 def _save_fds(self, save_fds): 2359 fds = [] 2360 for fd in save_fds: 2361 inheritable = os.get_inheritable(fd) 2362 saved = os.dup(fd) 2363 fds.append((fd, saved, inheritable)) 2364 return fds 2365 2366 def _restore_fds(self, fds): 2367 for fd, saved, inheritable in fds: 2368 os.dup2(saved, fd, inheritable=inheritable) 2369 os.close(saved) 2370 2371 def check_close_std_fds(self, fds): 2372 # Issue #9905: test that subprocess pipes still work properly with 2373 # some standard fds closed 2374 stdin = 0 2375 saved_fds = self._save_fds(fds) 2376 for fd, saved, inheritable in saved_fds: 2377 if fd == 0: 2378 stdin = saved 2379 break 2380 try: 2381 for fd in fds: 2382 os.close(fd) 2383 out, err = subprocess.Popen([sys.executable, "-c", 2384 'import sys;' 2385 'sys.stdout.write("apple");' 2386 'sys.stdout.flush();' 2387 'sys.stderr.write("orange")'], 2388 stdin=stdin, 2389 stdout=subprocess.PIPE, 2390 stderr=subprocess.PIPE).communicate() 2391 self.assertEqual(out, b'apple') 2392 self.assertEqual(err, b'orange') 2393 finally: 2394 self._restore_fds(saved_fds) 2395 2396 def test_close_fd_0(self): 2397 self.check_close_std_fds([0]) 2398 2399 def test_close_fd_1(self): 2400 self.check_close_std_fds([1]) 2401 2402 def test_close_fd_2(self): 2403 self.check_close_std_fds([2]) 2404 2405 def test_close_fds_0_1(self): 2406 self.check_close_std_fds([0, 1]) 2407 2408 def test_close_fds_0_2(self): 2409 self.check_close_std_fds([0, 2]) 2410 2411 def test_close_fds_1_2(self): 2412 self.check_close_std_fds([1, 2]) 2413 2414 def test_close_fds_0_1_2(self): 2415 # Issue #10806: test that subprocess pipes still work properly with 2416 # all standard fds closed. 2417 self.check_close_std_fds([0, 1, 2]) 2418 2419 def test_small_errpipe_write_fd(self): 2420 """Issue #15798: Popen should work when stdio fds are available.""" 2421 new_stdin = os.dup(0) 2422 new_stdout = os.dup(1) 2423 try: 2424 os.close(0) 2425 os.close(1) 2426 2427 # Side test: if errpipe_write fails to have its CLOEXEC 2428 # flag set this should cause the parent to think the exec 2429 # failed. Extremely unlikely: everyone supports CLOEXEC. 2430 subprocess.Popen([ 2431 sys.executable, "-c", 2432 "print('AssertionError:0:CLOEXEC failure.')"]).wait() 2433 finally: 2434 # Restore original stdin and stdout 2435 os.dup2(new_stdin, 0) 2436 os.dup2(new_stdout, 1) 2437 os.close(new_stdin) 2438 os.close(new_stdout) 2439 2440 def test_remapping_std_fds(self): 2441 # open up some temporary files 2442 temps = [tempfile.mkstemp() for i in range(3)] 2443 try: 2444 temp_fds = [fd for fd, fname in temps] 2445 2446 # unlink the files -- we won't need to reopen them 2447 for fd, fname in temps: 2448 os.unlink(fname) 2449 2450 # write some data to what will become stdin, and rewind 2451 os.write(temp_fds[1], b"STDIN") 2452 os.lseek(temp_fds[1], 0, 0) 2453 2454 # move the standard file descriptors out of the way 2455 saved_fds = self._save_fds(range(3)) 2456 try: 2457 # duplicate the file objects over the standard fd's 2458 for fd, temp_fd in enumerate(temp_fds): 2459 os.dup2(temp_fd, fd) 2460 2461 # now use those files in the "wrong" order, so that subprocess 2462 # has to rearrange them in the child 2463 p = subprocess.Popen([sys.executable, "-c", 2464 'import sys; got = sys.stdin.read();' 2465 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'], 2466 stdin=temp_fds[1], 2467 stdout=temp_fds[2], 2468 stderr=temp_fds[0]) 2469 p.wait() 2470 finally: 2471 self._restore_fds(saved_fds) 2472 2473 for fd in temp_fds: 2474 os.lseek(fd, 0, 0) 2475 2476 out = os.read(temp_fds[2], 1024) 2477 err = os.read(temp_fds[0], 1024).strip() 2478 self.assertEqual(out, b"got STDIN") 2479 self.assertEqual(err, b"err") 2480 2481 finally: 2482 for fd in temp_fds: 2483 os.close(fd) 2484 2485 def check_swap_fds(self, stdin_no, stdout_no, stderr_no): 2486 # open up some temporary files 2487 temps = [tempfile.mkstemp() for i in range(3)] 2488 temp_fds = [fd for fd, fname in temps] 2489 try: 2490 # unlink the files -- we won't need to reopen them 2491 for fd, fname in temps: 2492 os.unlink(fname) 2493 2494 # save a copy of the standard file descriptors 2495 saved_fds = self._save_fds(range(3)) 2496 try: 2497 # duplicate the temp files over the standard fd's 0, 1, 2 2498 for fd, temp_fd in enumerate(temp_fds): 2499 os.dup2(temp_fd, fd) 2500 2501 # write some data to what will become stdin, and rewind 2502 os.write(stdin_no, b"STDIN") 2503 os.lseek(stdin_no, 0, 0) 2504 2505 # now use those files in the given order, so that subprocess 2506 # has to rearrange them in the child 2507 p = subprocess.Popen([sys.executable, "-c", 2508 'import sys; got = sys.stdin.read();' 2509 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'], 2510 stdin=stdin_no, 2511 stdout=stdout_no, 2512 stderr=stderr_no) 2513 p.wait() 2514 2515 for fd in temp_fds: 2516 os.lseek(fd, 0, 0) 2517 2518 out = os.read(stdout_no, 1024) 2519 err = os.read(stderr_no, 1024).strip() 2520 finally: 2521 self._restore_fds(saved_fds) 2522 2523 self.assertEqual(out, b"got STDIN") 2524 self.assertEqual(err, b"err") 2525 2526 finally: 2527 for fd in temp_fds: 2528 os.close(fd) 2529 2530 # When duping fds, if there arises a situation where one of the fds is 2531 # either 0, 1 or 2, it is possible that it is overwritten (#12607). 2532 # This tests all combinations of this. 2533 def test_swap_fds(self): 2534 self.check_swap_fds(0, 1, 2) 2535 self.check_swap_fds(0, 2, 1) 2536 self.check_swap_fds(1, 0, 2) 2537 self.check_swap_fds(1, 2, 0) 2538 self.check_swap_fds(2, 0, 1) 2539 self.check_swap_fds(2, 1, 0) 2540 2541 def _check_swap_std_fds_with_one_closed(self, from_fds, to_fds): 2542 saved_fds = self._save_fds(range(3)) 2543 try: 2544 for from_fd in from_fds: 2545 with tempfile.TemporaryFile() as f: 2546 os.dup2(f.fileno(), from_fd) 2547 2548 fd_to_close = (set(range(3)) - set(from_fds)).pop() 2549 os.close(fd_to_close) 2550 2551 arg_names = ['stdin', 'stdout', 'stderr'] 2552 kwargs = {} 2553 for from_fd, to_fd in zip(from_fds, to_fds): 2554 kwargs[arg_names[to_fd]] = from_fd 2555 2556 code = textwrap.dedent(r''' 2557 import os, sys 2558 skipped_fd = int(sys.argv[1]) 2559 for fd in range(3): 2560 if fd != skipped_fd: 2561 os.write(fd, str(fd).encode('ascii')) 2562 ''') 2563 2564 skipped_fd = (set(range(3)) - set(to_fds)).pop() 2565 2566 rc = subprocess.call([sys.executable, '-c', code, str(skipped_fd)], 2567 **kwargs) 2568 self.assertEqual(rc, 0) 2569 2570 for from_fd, to_fd in zip(from_fds, to_fds): 2571 os.lseek(from_fd, 0, os.SEEK_SET) 2572 read_bytes = os.read(from_fd, 1024) 2573 read_fds = list(map(int, read_bytes.decode('ascii'))) 2574 msg = textwrap.dedent(f""" 2575 When testing {from_fds} to {to_fds} redirection, 2576 parent descriptor {from_fd} got redirected 2577 to descriptor(s) {read_fds} instead of descriptor {to_fd}. 2578 """) 2579 self.assertEqual([to_fd], read_fds, msg) 2580 finally: 2581 self._restore_fds(saved_fds) 2582 2583 # Check that subprocess can remap std fds correctly even 2584 # if one of them is closed (#32844). 2585 def test_swap_std_fds_with_one_closed(self): 2586 for from_fds in itertools.combinations(range(3), 2): 2587 for to_fds in itertools.permutations(range(3), 2): 2588 self._check_swap_std_fds_with_one_closed(from_fds, to_fds) 2589 2590 def test_surrogates_error_message(self): 2591 def prepare(): 2592 raise ValueError("surrogate:\uDCff") 2593 2594 try: 2595 subprocess.call( 2596 ZERO_RETURN_CMD, 2597 preexec_fn=prepare) 2598 except ValueError as err: 2599 # Pure Python implementations keeps the message 2600 self.assertIsNone(subprocess._posixsubprocess) 2601 self.assertEqual(str(err), "surrogate:\uDCff") 2602 except subprocess.SubprocessError as err: 2603 # _posixsubprocess uses a default message 2604 self.assertIsNotNone(subprocess._posixsubprocess) 2605 self.assertEqual(str(err), "Exception occurred in preexec_fn.") 2606 else: 2607 self.fail("Expected ValueError or subprocess.SubprocessError") 2608 2609 def test_undecodable_env(self): 2610 for key, value in (('test', 'abc\uDCFF'), ('test\uDCFF', '42')): 2611 encoded_value = value.encode("ascii", "surrogateescape") 2612 2613 # test str with surrogates 2614 script = "import os; print(ascii(os.getenv(%s)))" % repr(key) 2615 env = os.environ.copy() 2616 env[key] = value 2617 # Use C locale to get ASCII for the locale encoding to force 2618 # surrogate-escaping of \xFF in the child process 2619 env['LC_ALL'] = 'C' 2620 decoded_value = value 2621 stdout = subprocess.check_output( 2622 [sys.executable, "-c", script], 2623 env=env) 2624 stdout = stdout.rstrip(b'\n\r') 2625 self.assertEqual(stdout.decode('ascii'), ascii(decoded_value)) 2626 2627 # test bytes 2628 key = key.encode("ascii", "surrogateescape") 2629 script = "import os; print(ascii(os.getenvb(%s)))" % repr(key) 2630 env = os.environ.copy() 2631 env[key] = encoded_value 2632 stdout = subprocess.check_output( 2633 [sys.executable, "-c", script], 2634 env=env) 2635 stdout = stdout.rstrip(b'\n\r') 2636 self.assertEqual(stdout.decode('ascii'), ascii(encoded_value)) 2637 2638 def test_bytes_program(self): 2639 abs_program = os.fsencode(ZERO_RETURN_CMD[0]) 2640 args = list(ZERO_RETURN_CMD[1:]) 2641 path, program = os.path.split(ZERO_RETURN_CMD[0]) 2642 program = os.fsencode(program) 2643 2644 # absolute bytes path 2645 exitcode = subprocess.call([abs_program]+args) 2646 self.assertEqual(exitcode, 0) 2647 2648 # absolute bytes path as a string 2649 cmd = b"'%s' %s" % (abs_program, " ".join(args).encode("utf-8")) 2650 exitcode = subprocess.call(cmd, shell=True) 2651 self.assertEqual(exitcode, 0) 2652 2653 # bytes program, unicode PATH 2654 env = os.environ.copy() 2655 env["PATH"] = path 2656 exitcode = subprocess.call([program]+args, env=env) 2657 self.assertEqual(exitcode, 0) 2658 2659 # bytes program, bytes PATH 2660 envb = os.environb.copy() 2661 envb[b"PATH"] = os.fsencode(path) 2662 exitcode = subprocess.call([program]+args, env=envb) 2663 self.assertEqual(exitcode, 0) 2664 2665 def test_pipe_cloexec(self): 2666 sleeper = support.findfile("input_reader.py", subdir="subprocessdata") 2667 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2668 2669 p1 = subprocess.Popen([sys.executable, sleeper], 2670 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 2671 stderr=subprocess.PIPE, close_fds=False) 2672 2673 self.addCleanup(p1.communicate, b'') 2674 2675 p2 = subprocess.Popen([sys.executable, fd_status], 2676 stdout=subprocess.PIPE, close_fds=False) 2677 2678 output, error = p2.communicate() 2679 result_fds = set(map(int, output.split(b','))) 2680 unwanted_fds = set([p1.stdin.fileno(), p1.stdout.fileno(), 2681 p1.stderr.fileno()]) 2682 2683 self.assertFalse(result_fds & unwanted_fds, 2684 "Expected no fds from %r to be open in child, " 2685 "found %r" % 2686 (unwanted_fds, result_fds & unwanted_fds)) 2687 2688 def test_pipe_cloexec_real_tools(self): 2689 qcat = support.findfile("qcat.py", subdir="subprocessdata") 2690 qgrep = support.findfile("qgrep.py", subdir="subprocessdata") 2691 2692 subdata = b'zxcvbn' 2693 data = subdata * 4 + b'\n' 2694 2695 p1 = subprocess.Popen([sys.executable, qcat], 2696 stdin=subprocess.PIPE, stdout=subprocess.PIPE, 2697 close_fds=False) 2698 2699 p2 = subprocess.Popen([sys.executable, qgrep, subdata], 2700 stdin=p1.stdout, stdout=subprocess.PIPE, 2701 close_fds=False) 2702 2703 self.addCleanup(p1.wait) 2704 self.addCleanup(p2.wait) 2705 def kill_p1(): 2706 try: 2707 p1.terminate() 2708 except ProcessLookupError: 2709 pass 2710 def kill_p2(): 2711 try: 2712 p2.terminate() 2713 except ProcessLookupError: 2714 pass 2715 self.addCleanup(kill_p1) 2716 self.addCleanup(kill_p2) 2717 2718 p1.stdin.write(data) 2719 p1.stdin.close() 2720 2721 readfiles, ignored1, ignored2 = select.select([p2.stdout], [], [], 10) 2722 2723 self.assertTrue(readfiles, "The child hung") 2724 self.assertEqual(p2.stdout.read(), data) 2725 2726 p1.stdout.close() 2727 p2.stdout.close() 2728 2729 def test_close_fds(self): 2730 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2731 2732 fds = os.pipe() 2733 self.addCleanup(os.close, fds[0]) 2734 self.addCleanup(os.close, fds[1]) 2735 2736 open_fds = set(fds) 2737 # add a bunch more fds 2738 for _ in range(9): 2739 fd = os.open(os.devnull, os.O_RDONLY) 2740 self.addCleanup(os.close, fd) 2741 open_fds.add(fd) 2742 2743 for fd in open_fds: 2744 os.set_inheritable(fd, True) 2745 2746 p = subprocess.Popen([sys.executable, fd_status], 2747 stdout=subprocess.PIPE, close_fds=False) 2748 output, ignored = p.communicate() 2749 remaining_fds = set(map(int, output.split(b','))) 2750 2751 self.assertEqual(remaining_fds & open_fds, open_fds, 2752 "Some fds were closed") 2753 2754 p = subprocess.Popen([sys.executable, fd_status], 2755 stdout=subprocess.PIPE, close_fds=True) 2756 output, ignored = p.communicate() 2757 remaining_fds = set(map(int, output.split(b','))) 2758 2759 self.assertFalse(remaining_fds & open_fds, 2760 "Some fds were left open") 2761 self.assertIn(1, remaining_fds, "Subprocess failed") 2762 2763 # Keep some of the fd's we opened open in the subprocess. 2764 # This tests _posixsubprocess.c's proper handling of fds_to_keep. 2765 fds_to_keep = set(open_fds.pop() for _ in range(8)) 2766 p = subprocess.Popen([sys.executable, fd_status], 2767 stdout=subprocess.PIPE, close_fds=True, 2768 pass_fds=fds_to_keep) 2769 output, ignored = p.communicate() 2770 remaining_fds = set(map(int, output.split(b','))) 2771 2772 self.assertFalse((remaining_fds - fds_to_keep) & open_fds, 2773 "Some fds not in pass_fds were left open") 2774 self.assertIn(1, remaining_fds, "Subprocess failed") 2775 2776 2777 @unittest.skipIf(sys.platform.startswith("freebsd") and 2778 os.stat("/dev").st_dev == os.stat("/dev/fd").st_dev, 2779 "Requires fdescfs mounted on /dev/fd on FreeBSD.") 2780 def test_close_fds_when_max_fd_is_lowered(self): 2781 """Confirm that issue21618 is fixed (may fail under valgrind).""" 2782 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2783 2784 # This launches the meat of the test in a child process to 2785 # avoid messing with the larger unittest processes maximum 2786 # number of file descriptors. 2787 # This process launches: 2788 # +--> Process that lowers its RLIMIT_NOFILE aftr setting up 2789 # a bunch of high open fds above the new lower rlimit. 2790 # Those are reported via stdout before launching a new 2791 # process with close_fds=False to run the actual test: 2792 # +--> The TEST: This one launches a fd_status.py 2793 # subprocess with close_fds=True so we can find out if 2794 # any of the fds above the lowered rlimit are still open. 2795 p = subprocess.Popen([sys.executable, '-c', textwrap.dedent( 2796 ''' 2797 import os, resource, subprocess, sys, textwrap 2798 open_fds = set() 2799 # Add a bunch more fds to pass down. 2800 for _ in range(40): 2801 fd = os.open(os.devnull, os.O_RDONLY) 2802 open_fds.add(fd) 2803 2804 # Leave a two pairs of low ones available for use by the 2805 # internal child error pipe and the stdout pipe. 2806 # We also leave 10 more open as some Python buildbots run into 2807 # "too many open files" errors during the test if we do not. 2808 for fd in sorted(open_fds)[:14]: 2809 os.close(fd) 2810 open_fds.remove(fd) 2811 2812 for fd in open_fds: 2813 #self.addCleanup(os.close, fd) 2814 os.set_inheritable(fd, True) 2815 2816 max_fd_open = max(open_fds) 2817 2818 # Communicate the open_fds to the parent unittest.TestCase process. 2819 print(','.join(map(str, sorted(open_fds)))) 2820 sys.stdout.flush() 2821 2822 rlim_cur, rlim_max = resource.getrlimit(resource.RLIMIT_NOFILE) 2823 try: 2824 # 29 is lower than the highest fds we are leaving open. 2825 resource.setrlimit(resource.RLIMIT_NOFILE, (29, rlim_max)) 2826 # Launch a new Python interpreter with our low fd rlim_cur that 2827 # inherits open fds above that limit. It then uses subprocess 2828 # with close_fds=True to get a report of open fds in the child. 2829 # An explicit list of fds to check is passed to fd_status.py as 2830 # letting fd_status rely on its default logic would miss the 2831 # fds above rlim_cur as it normally only checks up to that limit. 2832 subprocess.Popen( 2833 [sys.executable, '-c', 2834 textwrap.dedent(""" 2835 import subprocess, sys 2836 subprocess.Popen([sys.executable, %r] + 2837 [str(x) for x in range({max_fd})], 2838 close_fds=True).wait() 2839 """.format(max_fd=max_fd_open+1))], 2840 close_fds=False).wait() 2841 finally: 2842 resource.setrlimit(resource.RLIMIT_NOFILE, (rlim_cur, rlim_max)) 2843 ''' % fd_status)], stdout=subprocess.PIPE) 2844 2845 output, unused_stderr = p.communicate() 2846 output_lines = output.splitlines() 2847 self.assertEqual(len(output_lines), 2, 2848 msg="expected exactly two lines of output:\n%r" % output) 2849 opened_fds = set(map(int, output_lines[0].strip().split(b','))) 2850 remaining_fds = set(map(int, output_lines[1].strip().split(b','))) 2851 2852 self.assertFalse(remaining_fds & opened_fds, 2853 msg="Some fds were left open.") 2854 2855 2856 # Mac OS X Tiger (10.4) has a kernel bug: sometimes, the file 2857 # descriptor of a pipe closed in the parent process is valid in the 2858 # child process according to fstat(), but the mode of the file 2859 # descriptor is invalid, and read or write raise an error. 2860 @support.requires_mac_ver(10, 5) 2861 def test_pass_fds(self): 2862 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2863 2864 open_fds = set() 2865 2866 for x in range(5): 2867 fds = os.pipe() 2868 self.addCleanup(os.close, fds[0]) 2869 self.addCleanup(os.close, fds[1]) 2870 os.set_inheritable(fds[0], True) 2871 os.set_inheritable(fds[1], True) 2872 open_fds.update(fds) 2873 2874 for fd in open_fds: 2875 p = subprocess.Popen([sys.executable, fd_status], 2876 stdout=subprocess.PIPE, close_fds=True, 2877 pass_fds=(fd, )) 2878 output, ignored = p.communicate() 2879 2880 remaining_fds = set(map(int, output.split(b','))) 2881 to_be_closed = open_fds - {fd} 2882 2883 self.assertIn(fd, remaining_fds, "fd to be passed not passed") 2884 self.assertFalse(remaining_fds & to_be_closed, 2885 "fd to be closed passed") 2886 2887 # pass_fds overrides close_fds with a warning. 2888 with self.assertWarns(RuntimeWarning) as context: 2889 self.assertFalse(subprocess.call( 2890 ZERO_RETURN_CMD, 2891 close_fds=False, pass_fds=(fd, ))) 2892 self.assertIn('overriding close_fds', str(context.warning)) 2893 2894 def test_pass_fds_inheritable(self): 2895 script = support.findfile("fd_status.py", subdir="subprocessdata") 2896 2897 inheritable, non_inheritable = os.pipe() 2898 self.addCleanup(os.close, inheritable) 2899 self.addCleanup(os.close, non_inheritable) 2900 os.set_inheritable(inheritable, True) 2901 os.set_inheritable(non_inheritable, False) 2902 pass_fds = (inheritable, non_inheritable) 2903 args = [sys.executable, script] 2904 args += list(map(str, pass_fds)) 2905 2906 p = subprocess.Popen(args, 2907 stdout=subprocess.PIPE, close_fds=True, 2908 pass_fds=pass_fds) 2909 output, ignored = p.communicate() 2910 fds = set(map(int, output.split(b','))) 2911 2912 # the inheritable file descriptor must be inherited, so its inheritable 2913 # flag must be set in the child process after fork() and before exec() 2914 self.assertEqual(fds, set(pass_fds), "output=%a" % output) 2915 2916 # inheritable flag must not be changed in the parent process 2917 self.assertEqual(os.get_inheritable(inheritable), True) 2918 self.assertEqual(os.get_inheritable(non_inheritable), False) 2919 2920 2921 # bpo-32270: Ensure that descriptors specified in pass_fds 2922 # are inherited even if they are used in redirections. 2923 # Contributed by @izbyshev. 2924 def test_pass_fds_redirected(self): 2925 """Regression test for https://bugs.python.org/issue32270.""" 2926 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 2927 pass_fds = [] 2928 for _ in range(2): 2929 fd = os.open(os.devnull, os.O_RDWR) 2930 self.addCleanup(os.close, fd) 2931 pass_fds.append(fd) 2932 2933 stdout_r, stdout_w = os.pipe() 2934 self.addCleanup(os.close, stdout_r) 2935 self.addCleanup(os.close, stdout_w) 2936 pass_fds.insert(1, stdout_w) 2937 2938 with subprocess.Popen([sys.executable, fd_status], 2939 stdin=pass_fds[0], 2940 stdout=pass_fds[1], 2941 stderr=pass_fds[2], 2942 close_fds=True, 2943 pass_fds=pass_fds): 2944 output = os.read(stdout_r, 1024) 2945 fds = {int(num) for num in output.split(b',')} 2946 2947 self.assertEqual(fds, {0, 1, 2} | frozenset(pass_fds), f"output={output!a}") 2948 2949 2950 def test_stdout_stdin_are_single_inout_fd(self): 2951 with io.open(os.devnull, "r+") as inout: 2952 p = subprocess.Popen(ZERO_RETURN_CMD, 2953 stdout=inout, stdin=inout) 2954 p.wait() 2955 2956 def test_stdout_stderr_are_single_inout_fd(self): 2957 with io.open(os.devnull, "r+") as inout: 2958 p = subprocess.Popen(ZERO_RETURN_CMD, 2959 stdout=inout, stderr=inout) 2960 p.wait() 2961 2962 def test_stderr_stdin_are_single_inout_fd(self): 2963 with io.open(os.devnull, "r+") as inout: 2964 p = subprocess.Popen(ZERO_RETURN_CMD, 2965 stderr=inout, stdin=inout) 2966 p.wait() 2967 2968 def test_wait_when_sigchild_ignored(self): 2969 # NOTE: sigchild_ignore.py may not be an effective test on all OSes. 2970 sigchild_ignore = support.findfile("sigchild_ignore.py", 2971 subdir="subprocessdata") 2972 p = subprocess.Popen([sys.executable, sigchild_ignore], 2973 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 2974 stdout, stderr = p.communicate() 2975 self.assertEqual(0, p.returncode, "sigchild_ignore.py exited" 2976 " non-zero with this error:\n%s" % 2977 stderr.decode('utf-8')) 2978 2979 def test_select_unbuffered(self): 2980 # Issue #11459: bufsize=0 should really set the pipes as 2981 # unbuffered (and therefore let select() work properly). 2982 select = import_helper.import_module("select") 2983 p = subprocess.Popen([sys.executable, "-c", 2984 'import sys;' 2985 'sys.stdout.write("apple")'], 2986 stdout=subprocess.PIPE, 2987 bufsize=0) 2988 f = p.stdout 2989 self.addCleanup(f.close) 2990 try: 2991 self.assertEqual(f.read(4), b"appl") 2992 self.assertIn(f, select.select([f], [], [], 0.0)[0]) 2993 finally: 2994 p.wait() 2995 2996 def test_zombie_fast_process_del(self): 2997 # Issue #12650: on Unix, if Popen.__del__() was called before the 2998 # process exited, it wouldn't be added to subprocess._active, and would 2999 # remain a zombie. 3000 # spawn a Popen, and delete its reference before it exits 3001 p = subprocess.Popen([sys.executable, "-c", 3002 'import sys, time;' 3003 'time.sleep(0.2)'], 3004 stdout=subprocess.PIPE, 3005 stderr=subprocess.PIPE) 3006 self.addCleanup(p.stdout.close) 3007 self.addCleanup(p.stderr.close) 3008 ident = id(p) 3009 pid = p.pid 3010 with warnings_helper.check_warnings(('', ResourceWarning)): 3011 p = None 3012 3013 if mswindows: 3014 # subprocess._active is not used on Windows and is set to None. 3015 self.assertIsNone(subprocess._active) 3016 else: 3017 # check that p is in the active processes list 3018 self.assertIn(ident, [id(o) for o in subprocess._active]) 3019 3020 def test_leak_fast_process_del_killed(self): 3021 # Issue #12650: on Unix, if Popen.__del__() was called before the 3022 # process exited, and the process got killed by a signal, it would never 3023 # be removed from subprocess._active, which triggered a FD and memory 3024 # leak. 3025 # spawn a Popen, delete its reference and kill it 3026 p = subprocess.Popen([sys.executable, "-c", 3027 'import time;' 3028 'time.sleep(3)'], 3029 stdout=subprocess.PIPE, 3030 stderr=subprocess.PIPE) 3031 self.addCleanup(p.stdout.close) 3032 self.addCleanup(p.stderr.close) 3033 ident = id(p) 3034 pid = p.pid 3035 with warnings_helper.check_warnings(('', ResourceWarning)): 3036 p = None 3037 support.gc_collect() # For PyPy or other GCs. 3038 3039 os.kill(pid, signal.SIGKILL) 3040 if mswindows: 3041 # subprocess._active is not used on Windows and is set to None. 3042 self.assertIsNone(subprocess._active) 3043 else: 3044 # check that p is in the active processes list 3045 self.assertIn(ident, [id(o) for o in subprocess._active]) 3046 3047 # let some time for the process to exit, and create a new Popen: this 3048 # should trigger the wait() of p 3049 time.sleep(0.2) 3050 with self.assertRaises(OSError): 3051 with subprocess.Popen(NONEXISTING_CMD, 3052 stdout=subprocess.PIPE, 3053 stderr=subprocess.PIPE) as proc: 3054 pass 3055 # p should have been wait()ed on, and removed from the _active list 3056 self.assertRaises(OSError, os.waitpid, pid, 0) 3057 if mswindows: 3058 # subprocess._active is not used on Windows and is set to None. 3059 self.assertIsNone(subprocess._active) 3060 else: 3061 self.assertNotIn(ident, [id(o) for o in subprocess._active]) 3062 3063 def test_close_fds_after_preexec(self): 3064 fd_status = support.findfile("fd_status.py", subdir="subprocessdata") 3065 3066 # this FD is used as dup2() target by preexec_fn, and should be closed 3067 # in the child process 3068 fd = os.dup(1) 3069 self.addCleanup(os.close, fd) 3070 3071 p = subprocess.Popen([sys.executable, fd_status], 3072 stdout=subprocess.PIPE, close_fds=True, 3073 preexec_fn=lambda: os.dup2(1, fd)) 3074 output, ignored = p.communicate() 3075 3076 remaining_fds = set(map(int, output.split(b','))) 3077 3078 self.assertNotIn(fd, remaining_fds) 3079 3080 @support.cpython_only 3081 def test_fork_exec(self): 3082 # Issue #22290: fork_exec() must not crash on memory allocation failure 3083 # or other errors 3084 import _posixsubprocess 3085 gc_enabled = gc.isenabled() 3086 try: 3087 # Use a preexec function and enable the garbage collector 3088 # to force fork_exec() to re-enable the garbage collector 3089 # on error. 3090 func = lambda: None 3091 gc.enable() 3092 3093 for args, exe_list, cwd, env_list in ( 3094 (123, [b"exe"], None, [b"env"]), 3095 ([b"arg"], 123, None, [b"env"]), 3096 ([b"arg"], [b"exe"], 123, [b"env"]), 3097 ([b"arg"], [b"exe"], None, 123), 3098 ): 3099 with self.assertRaises(TypeError) as err: 3100 _posixsubprocess.fork_exec( 3101 args, exe_list, 3102 True, (), cwd, env_list, 3103 -1, -1, -1, -1, 3104 1, 2, 3, 4, 3105 True, True, 3106 False, [], 0, -1, 3107 func) 3108 # Attempt to prevent 3109 # "TypeError: fork_exec() takes exactly N arguments (M given)" 3110 # from passing the test. More refactoring to have us start 3111 # with a valid *args list, confirm a good call with that works 3112 # before mutating it in various ways to ensure that bad calls 3113 # with individual arg type errors raise a typeerror would be 3114 # ideal. Saving that for a future PR... 3115 self.assertNotIn('takes exactly', str(err.exception)) 3116 finally: 3117 if not gc_enabled: 3118 gc.disable() 3119 3120 @support.cpython_only 3121 def test_fork_exec_sorted_fd_sanity_check(self): 3122 # Issue #23564: sanity check the fork_exec() fds_to_keep sanity check. 3123 import _posixsubprocess 3124 class BadInt: 3125 first = True 3126 def __init__(self, value): 3127 self.value = value 3128 def __int__(self): 3129 if self.first: 3130 self.first = False 3131 return self.value 3132 raise ValueError 3133 3134 gc_enabled = gc.isenabled() 3135 try: 3136 gc.enable() 3137 3138 for fds_to_keep in ( 3139 (-1, 2, 3, 4, 5), # Negative number. 3140 ('str', 4), # Not an int. 3141 (18, 23, 42, 2**63), # Out of range. 3142 (5, 4), # Not sorted. 3143 (6, 7, 7, 8), # Duplicate. 3144 (BadInt(1), BadInt(2)), 3145 ): 3146 with self.assertRaises( 3147 ValueError, 3148 msg='fds_to_keep={}'.format(fds_to_keep)) as c: 3149 _posixsubprocess.fork_exec( 3150 [b"false"], [b"false"], 3151 True, fds_to_keep, None, [b"env"], 3152 -1, -1, -1, -1, 3153 1, 2, 3, 4, 3154 True, True, 3155 None, None, None, -1, 3156 None) 3157 self.assertIn('fds_to_keep', str(c.exception)) 3158 finally: 3159 if not gc_enabled: 3160 gc.disable() 3161 3162 def test_communicate_BrokenPipeError_stdin_close(self): 3163 # By not setting stdout or stderr or a timeout we force the fast path 3164 # that just calls _stdin_write() internally due to our mock. 3165 proc = subprocess.Popen(ZERO_RETURN_CMD) 3166 with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin: 3167 mock_proc_stdin.close.side_effect = BrokenPipeError 3168 proc.communicate() # Should swallow BrokenPipeError from close. 3169 mock_proc_stdin.close.assert_called_with() 3170 3171 def test_communicate_BrokenPipeError_stdin_write(self): 3172 # By not setting stdout or stderr or a timeout we force the fast path 3173 # that just calls _stdin_write() internally due to our mock. 3174 proc = subprocess.Popen(ZERO_RETURN_CMD) 3175 with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin: 3176 mock_proc_stdin.write.side_effect = BrokenPipeError 3177 proc.communicate(b'stuff') # Should swallow the BrokenPipeError. 3178 mock_proc_stdin.write.assert_called_once_with(b'stuff') 3179 mock_proc_stdin.close.assert_called_once_with() 3180 3181 def test_communicate_BrokenPipeError_stdin_flush(self): 3182 # Setting stdin and stdout forces the ._communicate() code path. 3183 # python -h exits faster than python -c pass (but spams stdout). 3184 proc = subprocess.Popen([sys.executable, '-h'], 3185 stdin=subprocess.PIPE, 3186 stdout=subprocess.PIPE) 3187 with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin, \ 3188 open(os.devnull, 'wb') as dev_null: 3189 mock_proc_stdin.flush.side_effect = BrokenPipeError 3190 # because _communicate registers a selector using proc.stdin... 3191 mock_proc_stdin.fileno.return_value = dev_null.fileno() 3192 # _communicate() should swallow BrokenPipeError from flush. 3193 proc.communicate(b'stuff') 3194 mock_proc_stdin.flush.assert_called_once_with() 3195 3196 def test_communicate_BrokenPipeError_stdin_close_with_timeout(self): 3197 # Setting stdin and stdout forces the ._communicate() code path. 3198 # python -h exits faster than python -c pass (but spams stdout). 3199 proc = subprocess.Popen([sys.executable, '-h'], 3200 stdin=subprocess.PIPE, 3201 stdout=subprocess.PIPE) 3202 with proc, mock.patch.object(proc, 'stdin') as mock_proc_stdin: 3203 mock_proc_stdin.close.side_effect = BrokenPipeError 3204 # _communicate() should swallow BrokenPipeError from close. 3205 proc.communicate(timeout=999) 3206 mock_proc_stdin.close.assert_called_once_with() 3207 3208 @unittest.skipUnless(_testcapi is not None 3209 and hasattr(_testcapi, 'W_STOPCODE'), 3210 'need _testcapi.W_STOPCODE') 3211 def test_stopped(self): 3212 """Test wait() behavior when waitpid returns WIFSTOPPED; issue29335.""" 3213 args = ZERO_RETURN_CMD 3214 proc = subprocess.Popen(args) 3215 3216 # Wait until the real process completes to avoid zombie process 3217 support.wait_process(proc.pid, exitcode=0) 3218 3219 status = _testcapi.W_STOPCODE(3) 3220 with mock.patch('subprocess.os.waitpid', return_value=(proc.pid, status)): 3221 returncode = proc.wait() 3222 3223 self.assertEqual(returncode, -3) 3224 3225 def test_send_signal_race(self): 3226 # bpo-38630: send_signal() must poll the process exit status to reduce 3227 # the risk of sending the signal to the wrong process. 3228 proc = subprocess.Popen(ZERO_RETURN_CMD) 3229 3230 # wait until the process completes without using the Popen APIs. 3231 support.wait_process(proc.pid, exitcode=0) 3232 3233 # returncode is still None but the process completed. 3234 self.assertIsNone(proc.returncode) 3235 3236 with mock.patch("os.kill") as mock_kill: 3237 proc.send_signal(signal.SIGTERM) 3238 3239 # send_signal() didn't call os.kill() since the process already 3240 # completed. 3241 mock_kill.assert_not_called() 3242 3243 # Don't check the returncode value: the test reads the exit status, 3244 # so Popen failed to read it and uses a default returncode instead. 3245 self.assertIsNotNone(proc.returncode) 3246 3247 def test_send_signal_race2(self): 3248 # bpo-40550: the process might exist between the returncode check and 3249 # the kill operation 3250 p = subprocess.Popen([sys.executable, '-c', 'exit(1)']) 3251 3252 # wait for process to exit 3253 while not p.returncode: 3254 p.poll() 3255 3256 with mock.patch.object(p, 'poll', new=lambda: None): 3257 p.returncode = None 3258 p.send_signal(signal.SIGTERM) 3259 p.kill() 3260 3261 def test_communicate_repeated_call_after_stdout_close(self): 3262 proc = subprocess.Popen([sys.executable, '-c', 3263 'import os, time; os.close(1), time.sleep(2)'], 3264 stdout=subprocess.PIPE) 3265 while True: 3266 try: 3267 proc.communicate(timeout=0.1) 3268 return 3269 except subprocess.TimeoutExpired: 3270 pass 3271 3272 3273@unittest.skipUnless(mswindows, "Windows specific tests") 3274class Win32ProcessTestCase(BaseTestCase): 3275 3276 def test_startupinfo(self): 3277 # startupinfo argument 3278 # We uses hardcoded constants, because we do not want to 3279 # depend on win32all. 3280 STARTF_USESHOWWINDOW = 1 3281 SW_MAXIMIZE = 3 3282 startupinfo = subprocess.STARTUPINFO() 3283 startupinfo.dwFlags = STARTF_USESHOWWINDOW 3284 startupinfo.wShowWindow = SW_MAXIMIZE 3285 # Since Python is a console process, it won't be affected 3286 # by wShowWindow, but the argument should be silently 3287 # ignored 3288 subprocess.call(ZERO_RETURN_CMD, 3289 startupinfo=startupinfo) 3290 3291 def test_startupinfo_keywords(self): 3292 # startupinfo argument 3293 # We use hardcoded constants, because we do not want to 3294 # depend on win32all. 3295 STARTF_USERSHOWWINDOW = 1 3296 SW_MAXIMIZE = 3 3297 startupinfo = subprocess.STARTUPINFO( 3298 dwFlags=STARTF_USERSHOWWINDOW, 3299 wShowWindow=SW_MAXIMIZE 3300 ) 3301 # Since Python is a console process, it won't be affected 3302 # by wShowWindow, but the argument should be silently 3303 # ignored 3304 subprocess.call(ZERO_RETURN_CMD, 3305 startupinfo=startupinfo) 3306 3307 def test_startupinfo_copy(self): 3308 # bpo-34044: Popen must not modify input STARTUPINFO structure 3309 startupinfo = subprocess.STARTUPINFO() 3310 startupinfo.dwFlags = subprocess.STARTF_USESHOWWINDOW 3311 startupinfo.wShowWindow = subprocess.SW_HIDE 3312 3313 # Call Popen() twice with the same startupinfo object to make sure 3314 # that it's not modified 3315 for _ in range(2): 3316 cmd = ZERO_RETURN_CMD 3317 with open(os.devnull, 'w') as null: 3318 proc = subprocess.Popen(cmd, 3319 stdout=null, 3320 stderr=subprocess.STDOUT, 3321 startupinfo=startupinfo) 3322 with proc: 3323 proc.communicate() 3324 self.assertEqual(proc.returncode, 0) 3325 3326 self.assertEqual(startupinfo.dwFlags, 3327 subprocess.STARTF_USESHOWWINDOW) 3328 self.assertIsNone(startupinfo.hStdInput) 3329 self.assertIsNone(startupinfo.hStdOutput) 3330 self.assertIsNone(startupinfo.hStdError) 3331 self.assertEqual(startupinfo.wShowWindow, subprocess.SW_HIDE) 3332 self.assertEqual(startupinfo.lpAttributeList, {"handle_list": []}) 3333 3334 def test_creationflags(self): 3335 # creationflags argument 3336 CREATE_NEW_CONSOLE = 16 3337 sys.stderr.write(" a DOS box should flash briefly ...\n") 3338 subprocess.call(sys.executable + 3339 ' -c "import time; time.sleep(0.25)"', 3340 creationflags=CREATE_NEW_CONSOLE) 3341 3342 def test_invalid_args(self): 3343 # invalid arguments should raise ValueError 3344 self.assertRaises(ValueError, subprocess.call, 3345 [sys.executable, "-c", 3346 "import sys; sys.exit(47)"], 3347 preexec_fn=lambda: 1) 3348 3349 @support.cpython_only 3350 def test_issue31471(self): 3351 # There shouldn't be an assertion failure in Popen() in case the env 3352 # argument has a bad keys() method. 3353 class BadEnv(dict): 3354 keys = None 3355 with self.assertRaises(TypeError): 3356 subprocess.Popen(ZERO_RETURN_CMD, env=BadEnv()) 3357 3358 def test_close_fds(self): 3359 # close file descriptors 3360 rc = subprocess.call([sys.executable, "-c", 3361 "import sys; sys.exit(47)"], 3362 close_fds=True) 3363 self.assertEqual(rc, 47) 3364 3365 def test_close_fds_with_stdio(self): 3366 import msvcrt 3367 3368 fds = os.pipe() 3369 self.addCleanup(os.close, fds[0]) 3370 self.addCleanup(os.close, fds[1]) 3371 3372 handles = [] 3373 for fd in fds: 3374 os.set_inheritable(fd, True) 3375 handles.append(msvcrt.get_osfhandle(fd)) 3376 3377 p = subprocess.Popen([sys.executable, "-c", 3378 "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])], 3379 stdout=subprocess.PIPE, close_fds=False) 3380 stdout, stderr = p.communicate() 3381 self.assertEqual(p.returncode, 0) 3382 int(stdout.strip()) # Check that stdout is an integer 3383 3384 p = subprocess.Popen([sys.executable, "-c", 3385 "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])], 3386 stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True) 3387 stdout, stderr = p.communicate() 3388 self.assertEqual(p.returncode, 1) 3389 self.assertIn(b"OSError", stderr) 3390 3391 # The same as the previous call, but with an empty handle_list 3392 handle_list = [] 3393 startupinfo = subprocess.STARTUPINFO() 3394 startupinfo.lpAttributeList = {"handle_list": handle_list} 3395 p = subprocess.Popen([sys.executable, "-c", 3396 "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])], 3397 stdout=subprocess.PIPE, stderr=subprocess.PIPE, 3398 startupinfo=startupinfo, close_fds=True) 3399 stdout, stderr = p.communicate() 3400 self.assertEqual(p.returncode, 1) 3401 self.assertIn(b"OSError", stderr) 3402 3403 # Check for a warning due to using handle_list and close_fds=False 3404 with warnings_helper.check_warnings((".*overriding close_fds", 3405 RuntimeWarning)): 3406 startupinfo = subprocess.STARTUPINFO() 3407 startupinfo.lpAttributeList = {"handle_list": handles[:]} 3408 p = subprocess.Popen([sys.executable, "-c", 3409 "import msvcrt; print(msvcrt.open_osfhandle({}, 0))".format(handles[0])], 3410 stdout=subprocess.PIPE, stderr=subprocess.PIPE, 3411 startupinfo=startupinfo, close_fds=False) 3412 stdout, stderr = p.communicate() 3413 self.assertEqual(p.returncode, 0) 3414 3415 def test_empty_attribute_list(self): 3416 startupinfo = subprocess.STARTUPINFO() 3417 startupinfo.lpAttributeList = {} 3418 subprocess.call(ZERO_RETURN_CMD, 3419 startupinfo=startupinfo) 3420 3421 def test_empty_handle_list(self): 3422 startupinfo = subprocess.STARTUPINFO() 3423 startupinfo.lpAttributeList = {"handle_list": []} 3424 subprocess.call(ZERO_RETURN_CMD, 3425 startupinfo=startupinfo) 3426 3427 def test_shell_sequence(self): 3428 # Run command through the shell (sequence) 3429 newenv = os.environ.copy() 3430 newenv["FRUIT"] = "physalis" 3431 p = subprocess.Popen(["set"], shell=1, 3432 stdout=subprocess.PIPE, 3433 env=newenv) 3434 with p: 3435 self.assertIn(b"physalis", p.stdout.read()) 3436 3437 def test_shell_string(self): 3438 # Run command through the shell (string) 3439 newenv = os.environ.copy() 3440 newenv["FRUIT"] = "physalis" 3441 p = subprocess.Popen("set", shell=1, 3442 stdout=subprocess.PIPE, 3443 env=newenv) 3444 with p: 3445 self.assertIn(b"physalis", p.stdout.read()) 3446 3447 def test_shell_encodings(self): 3448 # Run command through the shell (string) 3449 for enc in ['ansi', 'oem']: 3450 newenv = os.environ.copy() 3451 newenv["FRUIT"] = "physalis" 3452 p = subprocess.Popen("set", shell=1, 3453 stdout=subprocess.PIPE, 3454 env=newenv, 3455 encoding=enc) 3456 with p: 3457 self.assertIn("physalis", p.stdout.read(), enc) 3458 3459 def test_call_string(self): 3460 # call() function with string argument on Windows 3461 rc = subprocess.call(sys.executable + 3462 ' -c "import sys; sys.exit(47)"') 3463 self.assertEqual(rc, 47) 3464 3465 def _kill_process(self, method, *args): 3466 # Some win32 buildbot raises EOFError if stdin is inherited 3467 p = subprocess.Popen([sys.executable, "-c", """if 1: 3468 import sys, time 3469 sys.stdout.write('x\\n') 3470 sys.stdout.flush() 3471 time.sleep(30) 3472 """], 3473 stdin=subprocess.PIPE, 3474 stdout=subprocess.PIPE, 3475 stderr=subprocess.PIPE) 3476 with p: 3477 # Wait for the interpreter to be completely initialized before 3478 # sending any signal. 3479 p.stdout.read(1) 3480 getattr(p, method)(*args) 3481 _, stderr = p.communicate() 3482 self.assertEqual(stderr, b'') 3483 returncode = p.wait() 3484 self.assertNotEqual(returncode, 0) 3485 3486 def _kill_dead_process(self, method, *args): 3487 p = subprocess.Popen([sys.executable, "-c", """if 1: 3488 import sys, time 3489 sys.stdout.write('x\\n') 3490 sys.stdout.flush() 3491 sys.exit(42) 3492 """], 3493 stdin=subprocess.PIPE, 3494 stdout=subprocess.PIPE, 3495 stderr=subprocess.PIPE) 3496 with p: 3497 # Wait for the interpreter to be completely initialized before 3498 # sending any signal. 3499 p.stdout.read(1) 3500 # The process should end after this 3501 time.sleep(1) 3502 # This shouldn't raise even though the child is now dead 3503 getattr(p, method)(*args) 3504 _, stderr = p.communicate() 3505 self.assertEqual(stderr, b'') 3506 rc = p.wait() 3507 self.assertEqual(rc, 42) 3508 3509 def test_send_signal(self): 3510 self._kill_process('send_signal', signal.SIGTERM) 3511 3512 def test_kill(self): 3513 self._kill_process('kill') 3514 3515 def test_terminate(self): 3516 self._kill_process('terminate') 3517 3518 def test_send_signal_dead(self): 3519 self._kill_dead_process('send_signal', signal.SIGTERM) 3520 3521 def test_kill_dead(self): 3522 self._kill_dead_process('kill') 3523 3524 def test_terminate_dead(self): 3525 self._kill_dead_process('terminate') 3526 3527class MiscTests(unittest.TestCase): 3528 3529 class RecordingPopen(subprocess.Popen): 3530 """A Popen that saves a reference to each instance for testing.""" 3531 instances_created = [] 3532 3533 def __init__(self, *args, **kwargs): 3534 super().__init__(*args, **kwargs) 3535 self.instances_created.append(self) 3536 3537 @mock.patch.object(subprocess.Popen, "_communicate") 3538 def _test_keyboardinterrupt_no_kill(self, popener, mock__communicate, 3539 **kwargs): 3540 """Fake a SIGINT happening during Popen._communicate() and ._wait(). 3541 3542 This avoids the need to actually try and get test environments to send 3543 and receive signals reliably across platforms. The net effect of a ^C 3544 happening during a blocking subprocess execution which we want to clean 3545 up from is a KeyboardInterrupt coming out of communicate() or wait(). 3546 """ 3547 3548 mock__communicate.side_effect = KeyboardInterrupt 3549 try: 3550 with mock.patch.object(subprocess.Popen, "_wait") as mock__wait: 3551 # We patch out _wait() as no signal was involved so the 3552 # child process isn't actually going to exit rapidly. 3553 mock__wait.side_effect = KeyboardInterrupt 3554 with mock.patch.object(subprocess, "Popen", 3555 self.RecordingPopen): 3556 with self.assertRaises(KeyboardInterrupt): 3557 popener([sys.executable, "-c", 3558 "import time\ntime.sleep(9)\nimport sys\n" 3559 "sys.stderr.write('\\n!runaway child!\\n')"], 3560 stdout=subprocess.DEVNULL, **kwargs) 3561 for call in mock__wait.call_args_list[1:]: 3562 self.assertNotEqual( 3563 call, mock.call(timeout=None), 3564 "no open-ended wait() after the first allowed: " 3565 f"{mock__wait.call_args_list}") 3566 sigint_calls = [] 3567 for call in mock__wait.call_args_list: 3568 if call == mock.call(timeout=0.25): # from Popen.__init__ 3569 sigint_calls.append(call) 3570 self.assertLessEqual(mock__wait.call_count, 2, 3571 msg=mock__wait.call_args_list) 3572 self.assertEqual(len(sigint_calls), 1, 3573 msg=mock__wait.call_args_list) 3574 finally: 3575 # cleanup the forgotten (due to our mocks) child process 3576 process = self.RecordingPopen.instances_created.pop() 3577 process.kill() 3578 process.wait() 3579 self.assertEqual([], self.RecordingPopen.instances_created) 3580 3581 def test_call_keyboardinterrupt_no_kill(self): 3582 self._test_keyboardinterrupt_no_kill(subprocess.call, timeout=6.282) 3583 3584 def test_run_keyboardinterrupt_no_kill(self): 3585 self._test_keyboardinterrupt_no_kill(subprocess.run, timeout=6.282) 3586 3587 def test_context_manager_keyboardinterrupt_no_kill(self): 3588 def popen_via_context_manager(*args, **kwargs): 3589 with subprocess.Popen(*args, **kwargs) as unused_process: 3590 raise KeyboardInterrupt # Test how __exit__ handles ^C. 3591 self._test_keyboardinterrupt_no_kill(popen_via_context_manager) 3592 3593 def test_getoutput(self): 3594 self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy') 3595 self.assertEqual(subprocess.getstatusoutput('echo xyzzy'), 3596 (0, 'xyzzy')) 3597 3598 # we use mkdtemp in the next line to create an empty directory 3599 # under our exclusive control; from that, we can invent a pathname 3600 # that we _know_ won't exist. This is guaranteed to fail. 3601 dir = None 3602 try: 3603 dir = tempfile.mkdtemp() 3604 name = os.path.join(dir, "foo") 3605 status, output = subprocess.getstatusoutput( 3606 ("type " if mswindows else "cat ") + name) 3607 self.assertNotEqual(status, 0) 3608 finally: 3609 if dir is not None: 3610 os.rmdir(dir) 3611 3612 def test__all__(self): 3613 """Ensure that __all__ is populated properly.""" 3614 intentionally_excluded = {"list2cmdline", "Handle", "pwd", "grp", "fcntl"} 3615 exported = set(subprocess.__all__) 3616 possible_exports = set() 3617 import types 3618 for name, value in subprocess.__dict__.items(): 3619 if name.startswith('_'): 3620 continue 3621 if isinstance(value, (types.ModuleType,)): 3622 continue 3623 possible_exports.add(name) 3624 self.assertEqual(exported, possible_exports - intentionally_excluded) 3625 3626 3627@unittest.skipUnless(hasattr(selectors, 'PollSelector'), 3628 "Test needs selectors.PollSelector") 3629class ProcessTestCaseNoPoll(ProcessTestCase): 3630 def setUp(self): 3631 self.orig_selector = subprocess._PopenSelector 3632 subprocess._PopenSelector = selectors.SelectSelector 3633 ProcessTestCase.setUp(self) 3634 3635 def tearDown(self): 3636 subprocess._PopenSelector = self.orig_selector 3637 ProcessTestCase.tearDown(self) 3638 3639 3640@unittest.skipUnless(mswindows, "Windows-specific tests") 3641class CommandsWithSpaces (BaseTestCase): 3642 3643 def setUp(self): 3644 super().setUp() 3645 f, fname = tempfile.mkstemp(".py", "te st") 3646 self.fname = fname.lower () 3647 os.write(f, b"import sys;" 3648 b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))" 3649 ) 3650 os.close(f) 3651 3652 def tearDown(self): 3653 os.remove(self.fname) 3654 super().tearDown() 3655 3656 def with_spaces(self, *args, **kwargs): 3657 kwargs['stdout'] = subprocess.PIPE 3658 p = subprocess.Popen(*args, **kwargs) 3659 with p: 3660 self.assertEqual( 3661 p.stdout.read ().decode("mbcs"), 3662 "2 [%r, 'ab cd']" % self.fname 3663 ) 3664 3665 def test_shell_string_with_spaces(self): 3666 # call() function with string argument with spaces on Windows 3667 self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, 3668 "ab cd"), shell=1) 3669 3670 def test_shell_sequence_with_spaces(self): 3671 # call() function with sequence argument with spaces on Windows 3672 self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1) 3673 3674 def test_noshell_string_with_spaces(self): 3675 # call() function with string argument with spaces on Windows 3676 self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname, 3677 "ab cd")) 3678 3679 def test_noshell_sequence_with_spaces(self): 3680 # call() function with sequence argument with spaces on Windows 3681 self.with_spaces([sys.executable, self.fname, "ab cd"]) 3682 3683 3684class ContextManagerTests(BaseTestCase): 3685 3686 def test_pipe(self): 3687 with subprocess.Popen([sys.executable, "-c", 3688 "import sys;" 3689 "sys.stdout.write('stdout');" 3690 "sys.stderr.write('stderr');"], 3691 stdout=subprocess.PIPE, 3692 stderr=subprocess.PIPE) as proc: 3693 self.assertEqual(proc.stdout.read(), b"stdout") 3694 self.assertEqual(proc.stderr.read(), b"stderr") 3695 3696 self.assertTrue(proc.stdout.closed) 3697 self.assertTrue(proc.stderr.closed) 3698 3699 def test_returncode(self): 3700 with subprocess.Popen([sys.executable, "-c", 3701 "import sys; sys.exit(100)"]) as proc: 3702 pass 3703 # __exit__ calls wait(), so the returncode should be set 3704 self.assertEqual(proc.returncode, 100) 3705 3706 def test_communicate_stdin(self): 3707 with subprocess.Popen([sys.executable, "-c", 3708 "import sys;" 3709 "sys.exit(sys.stdin.read() == 'context')"], 3710 stdin=subprocess.PIPE) as proc: 3711 proc.communicate(b"context") 3712 self.assertEqual(proc.returncode, 1) 3713 3714 def test_invalid_args(self): 3715 with self.assertRaises(NONEXISTING_ERRORS): 3716 with subprocess.Popen(NONEXISTING_CMD, 3717 stdout=subprocess.PIPE, 3718 stderr=subprocess.PIPE) as proc: 3719 pass 3720 3721 def test_broken_pipe_cleanup(self): 3722 """Broken pipe error should not prevent wait() (Issue 21619)""" 3723 proc = subprocess.Popen(ZERO_RETURN_CMD, 3724 stdin=subprocess.PIPE, 3725 bufsize=support.PIPE_MAX_SIZE*2) 3726 proc = proc.__enter__() 3727 # Prepare to send enough data to overflow any OS pipe buffering and 3728 # guarantee a broken pipe error. Data is held in BufferedWriter 3729 # buffer until closed. 3730 proc.stdin.write(b'x' * support.PIPE_MAX_SIZE) 3731 self.assertIsNone(proc.returncode) 3732 # EPIPE expected under POSIX; EINVAL under Windows 3733 self.assertRaises(OSError, proc.__exit__, None, None, None) 3734 self.assertEqual(proc.returncode, 0) 3735 self.assertTrue(proc.stdin.closed) 3736 3737 3738if __name__ == "__main__": 3739 unittest.main() 3740