1# Copyright (c) Twisted Matrix Laboratories. 2# See LICENSE for details. 3 4""" 5Tests for implementations of L{IReactorProcess}. 6 7@var properEnv: A copy of L{os.environ} which has L{bytes} keys/values on POSIX 8 platforms and native L{str} keys/values on Windows. 9""" 10 11 12import io 13import os 14import signal 15import subprocess 16import sys 17import threading 18from unittest import skipIf 19 20import hamcrest 21 22import twisted 23from twisted.internet import utils 24from twisted.internet.defer import Deferred, inlineCallbacks, succeed 25from twisted.internet.error import ProcessDone, ProcessTerminated 26from twisted.internet.interfaces import IProcessTransport, IReactorProcess 27from twisted.internet.protocol import ProcessProtocol 28from twisted.internet.test.reactormixins import ReactorBuilder 29from twisted.python.compat import networkString 30from twisted.python.filepath import FilePath, _asFilesystemBytes 31from twisted.python.log import err, msg 32from twisted.python.runtime import platform 33from twisted.trial.unittest import TestCase 34 35# Get the current Python executable as a bytestring. 36pyExe = FilePath(sys.executable)._asBytesPath() 37twistedRoot = FilePath(twisted.__file__).parent().parent() 38 39_uidgidSkip = False 40_uidgidSkipReason = "" 41properEnv = dict(os.environ) 42properEnv["PYTHONPATH"] = os.pathsep.join(sys.path) 43try: 44 import resource as _resource 45 46 from twisted.internet import process as _process 47 48 if os.getuid() != 0: 49 _uidgidSkip = True 50 _uidgidSkipReason = "Cannot change UID/GID except as root" 51except ImportError: 52 resource = None 53 process = None 54 _uidgidSkip = True 55 _uidgidSkipReason = "Cannot change UID/GID on Windows" 56else: 57 resource = _resource 58 process = _process 59 60 61def onlyOnPOSIX(testMethod): 62 """ 63 Only run this test on POSIX platforms. 64 65 @param testMethod: A test function, being decorated. 66 67 @return: the C{testMethod} argument. 68 """ 69 if resource is None: 70 testMethod.skip = "Test only applies to POSIX platforms." 71 return testMethod 72 73 74class _ShutdownCallbackProcessProtocol(ProcessProtocol): 75 """ 76 An L{IProcessProtocol} which fires a Deferred when the process it is 77 associated with ends. 78 79 @ivar received: A C{dict} mapping file descriptors to lists of bytes 80 received from the child process on those file descriptors. 81 """ 82 83 def __init__(self, whenFinished): 84 self.whenFinished = whenFinished 85 self.received = {} 86 87 def childDataReceived(self, fd, bytes): 88 self.received.setdefault(fd, []).append(bytes) 89 90 def processEnded(self, reason): 91 self.whenFinished.callback(None) 92 93 94class ProcessTestsBuilderBase(ReactorBuilder): 95 """ 96 Base class for L{IReactorProcess} tests which defines some tests which 97 can be applied to PTY or non-PTY uses of C{spawnProcess}. 98 99 Subclasses are expected to set the C{usePTY} attribute to C{True} or 100 C{False}. 101 """ 102 103 requiredInterfaces = [IReactorProcess] 104 105 def test_processTransportInterface(self): 106 """ 107 L{IReactorProcess.spawnProcess} connects the protocol passed to it 108 to a transport which provides L{IProcessTransport}. 109 """ 110 ended = Deferred() 111 protocol = _ShutdownCallbackProcessProtocol(ended) 112 113 reactor = self.buildReactor() 114 transport = reactor.spawnProcess( 115 protocol, pyExe, [pyExe, b"-c", b""], usePTY=self.usePTY 116 ) 117 118 # The transport is available synchronously, so we can check it right 119 # away (unlike many transport-based tests). This is convenient even 120 # though it's probably not how the spawnProcess interface should really 121 # work. 122 # We're not using verifyObject here because part of 123 # IProcessTransport is a lie - there are no getHost or getPeer 124 # methods. See #1124. 125 self.assertTrue(IProcessTransport.providedBy(transport)) 126 127 # Let the process run and exit so we don't leave a zombie around. 128 ended.addCallback(lambda ignored: reactor.stop()) 129 self.runReactor(reactor) 130 131 def _writeTest(self, write): 132 """ 133 Helper for testing L{IProcessTransport} write functionality. This 134 method spawns a child process and gives C{write} a chance to write some 135 bytes to it. It then verifies that the bytes were actually written to 136 it (by relying on the child process to echo them back). 137 138 @param write: A two-argument callable. This is invoked with a process 139 transport and some bytes to write to it. 140 """ 141 reactor = self.buildReactor() 142 143 ended = Deferred() 144 protocol = _ShutdownCallbackProcessProtocol(ended) 145 146 bytesToSend = b"hello, world" + networkString(os.linesep) 147 program = b"import sys\n" b"sys.stdout.write(sys.stdin.readline())\n" 148 149 def startup(): 150 transport = reactor.spawnProcess(protocol, pyExe, [pyExe, b"-c", program]) 151 try: 152 write(transport, bytesToSend) 153 except BaseException: 154 err(None, "Unhandled exception while writing") 155 transport.signalProcess("KILL") 156 157 reactor.callWhenRunning(startup) 158 159 ended.addCallback(lambda ignored: reactor.stop()) 160 161 self.runReactor(reactor) 162 self.assertEqual(bytesToSend, b"".join(protocol.received[1])) 163 164 def test_write(self): 165 """ 166 L{IProcessTransport.write} writes the specified C{bytes} to the standard 167 input of the child process. 168 """ 169 170 def write(transport, bytesToSend): 171 transport.write(bytesToSend) 172 173 self._writeTest(write) 174 175 def test_writeSequence(self): 176 """ 177 L{IProcessTransport.writeSequence} writes the specified C{list} of 178 C{bytes} to the standard input of the child process. 179 """ 180 181 def write(transport, bytesToSend): 182 transport.writeSequence([bytesToSend]) 183 184 self._writeTest(write) 185 186 def test_writeToChild(self): 187 """ 188 L{IProcessTransport.writeToChild} writes the specified C{bytes} to the 189 specified file descriptor of the child process. 190 """ 191 192 def write(transport, bytesToSend): 193 transport.writeToChild(0, bytesToSend) 194 195 self._writeTest(write) 196 197 def test_writeToChildBadFileDescriptor(self): 198 """ 199 L{IProcessTransport.writeToChild} raises L{KeyError} if passed a file 200 descriptor which is was not set up by L{IReactorProcess.spawnProcess}. 201 """ 202 203 def write(transport, bytesToSend): 204 try: 205 self.assertRaises(KeyError, transport.writeToChild, 13, bytesToSend) 206 finally: 207 # Just get the process to exit so the test can complete 208 transport.write(bytesToSend) 209 210 self._writeTest(write) 211 212 @skipIf( 213 getattr(signal, "SIGCHLD", None) is None, 214 "Platform lacks SIGCHLD, early-spawnProcess test can't work.", 215 ) 216 def test_spawnProcessEarlyIsReaped(self): 217 """ 218 If, before the reactor is started with L{IReactorCore.run}, a 219 process is started with L{IReactorProcess.spawnProcess} and 220 terminates, the process is reaped once the reactor is started. 221 """ 222 reactor = self.buildReactor() 223 224 # Create the process with no shared file descriptors, so that there 225 # are no other events for the reactor to notice and "cheat" with. 226 # We want to be sure it's really dealing with the process exiting, 227 # not some associated event. 228 if self.usePTY: 229 childFDs = None 230 else: 231 childFDs = {} 232 233 # Arrange to notice the SIGCHLD. 234 signaled = threading.Event() 235 236 def handler(*args): 237 signaled.set() 238 239 signal.signal(signal.SIGCHLD, handler) 240 241 # Start a process - before starting the reactor! 242 ended = Deferred() 243 reactor.spawnProcess( 244 _ShutdownCallbackProcessProtocol(ended), 245 pyExe, 246 [pyExe, b"-c", b""], 247 usePTY=self.usePTY, 248 childFDs=childFDs, 249 ) 250 251 # Wait for the SIGCHLD (which might have been delivered before we got 252 # here, but that's okay because the signal handler was installed above, 253 # before we could have gotten it). 254 signaled.wait(120) 255 if not signaled.isSet(): 256 self.fail("Timed out waiting for child process to exit.") 257 258 # Capture the processEnded callback. 259 result = [] 260 ended.addCallback(result.append) 261 262 if result: 263 # The synchronous path through spawnProcess / Process.__init__ / 264 # registerReapProcessHandler was encountered. There's no reason to 265 # start the reactor, because everything is done already. 266 return 267 268 # Otherwise, though, start the reactor so it can tell us the process 269 # exited. 270 ended.addCallback(lambda ignored: reactor.stop()) 271 self.runReactor(reactor) 272 273 # Make sure the reactor stopped because the Deferred fired. 274 self.assertTrue(result) 275 276 def test_processExitedWithSignal(self): 277 """ 278 The C{reason} argument passed to L{IProcessProtocol.processExited} is a 279 L{ProcessTerminated} instance if the child process exits with a signal. 280 """ 281 sigName = "TERM" 282 sigNum = getattr(signal, "SIG" + sigName) 283 exited = Deferred() 284 source = ( 285 b"import sys\n" 286 # Talk so the parent process knows the process is running. This is 287 # necessary because ProcessProtocol.makeConnection may be called 288 # before this process is exec'd. It would be unfortunate if we 289 # SIGTERM'd the Twisted process while it was on its way to doing 290 # the exec. 291 b"sys.stdout.write('x')\n" 292 b"sys.stdout.flush()\n" 293 b"sys.stdin.read()\n" 294 ) 295 296 class Exiter(ProcessProtocol): 297 def childDataReceived(self, fd, data): 298 msg("childDataReceived(%d, %r)" % (fd, data)) 299 self.transport.signalProcess(sigName) 300 301 def childConnectionLost(self, fd): 302 msg("childConnectionLost(%d)" % (fd,)) 303 304 def processExited(self, reason): 305 msg(f"processExited({reason!r})") 306 # Protect the Deferred from the failure so that it follows 307 # the callback chain. This doesn't use the errback chain 308 # because it wants to make sure reason is a Failure. An 309 # Exception would also make an errback-based test pass, and 310 # that would be wrong. 311 exited.callback([reason]) 312 313 def processEnded(self, reason): 314 msg(f"processEnded({reason!r})") 315 316 reactor = self.buildReactor() 317 reactor.callWhenRunning( 318 reactor.spawnProcess, 319 Exiter(), 320 pyExe, 321 [pyExe, b"-c", source], 322 usePTY=self.usePTY, 323 ) 324 325 def cbExited(args): 326 (failure,) = args 327 # Trapping implicitly verifies that it's a Failure (rather than 328 # an exception) and explicitly makes sure it's the right type. 329 failure.trap(ProcessTerminated) 330 err = failure.value 331 if platform.isWindows(): 332 # Windows can't really /have/ signals, so it certainly can't 333 # report them as the reason for termination. Maybe there's 334 # something better we could be doing here, anyway? Hard to 335 # say. Anyway, this inconsistency between different platforms 336 # is extremely unfortunate and I would remove it if I 337 # could. -exarkun 338 self.assertIsNone(err.signal) 339 self.assertEqual(err.exitCode, 1) 340 else: 341 self.assertEqual(err.signal, sigNum) 342 self.assertIsNone(err.exitCode) 343 344 exited.addCallback(cbExited) 345 exited.addErrback(err) 346 exited.addCallback(lambda ign: reactor.stop()) 347 348 self.runReactor(reactor) 349 350 def test_systemCallUninterruptedByChildExit(self): 351 """ 352 If a child process exits while a system call is in progress, the system 353 call should not be interfered with. In particular, it should not fail 354 with EINTR. 355 356 Older versions of Twisted installed a SIGCHLD handler on POSIX without 357 using the feature exposed by the SA_RESTART flag to sigaction(2). The 358 most noticeable problem this caused was for blocking reads and writes to 359 sometimes fail with EINTR. 360 """ 361 reactor = self.buildReactor() 362 result = [] 363 364 def f(): 365 try: 366 exe = pyExe.decode(sys.getfilesystemencoding()) 367 368 subprocess.Popen([exe, "-c", "import time; time.sleep(0.1)"]) 369 f2 = subprocess.Popen( 370 [exe, "-c", ("import time; time.sleep(0.5);" "print('Foo')")], 371 stdout=subprocess.PIPE, 372 ) 373 # The read call below will blow up with an EINTR from the 374 # SIGCHLD from the first process exiting if we install a 375 # SIGCHLD handler without SA_RESTART. (which we used to do) 376 with f2.stdout: 377 result.append(f2.stdout.read()) 378 finally: 379 reactor.stop() 380 381 reactor.callWhenRunning(f) 382 self.runReactor(reactor) 383 self.assertEqual(result, [b"Foo" + os.linesep.encode("ascii")]) 384 385 @onlyOnPOSIX 386 def test_openFileDescriptors(self): 387 """ 388 Processes spawned with spawnProcess() close all extraneous file 389 descriptors in the parent. They do have a stdin, stdout, and stderr 390 open. 391 """ 392 393 # To test this, we are going to open a file descriptor in the parent 394 # that is unlikely to be opened in the child, then verify that it's not 395 # open in the child. 396 source = networkString( 397 """ 398import sys 399sys.path.insert(0, '{}') 400from twisted.internet import process 401sys.stdout.write(repr(process._listOpenFDs())) 402sys.stdout.flush()""".format( 403 twistedRoot.path 404 ) 405 ) 406 407 r, w = os.pipe() 408 self.addCleanup(os.close, r) 409 self.addCleanup(os.close, w) 410 411 # The call to "os.listdir()" (in _listOpenFDs's implementation) opens a 412 # file descriptor (with "opendir"), which shows up in _listOpenFDs's 413 # result. And speaking of "random" file descriptors, the code required 414 # for _listOpenFDs itself imports logger, which imports random, which 415 # (depending on your Python version) might leave /dev/urandom open. 416 417 # More generally though, even if we were to use an extremely minimal C 418 # program, the operating system would be within its rights to open file 419 # descriptors we might not know about in the C library's 420 # initialization; things like debuggers, profilers, or nsswitch plugins 421 # might open some and this test should pass in those environments. 422 423 # Although some of these file descriptors aren't predictable, we should 424 # at least be able to select a very large file descriptor which is very 425 # unlikely to be opened automatically in the subprocess. (Apply a 426 # fudge factor to avoid hard-coding something too near a limit 427 # condition like the maximum possible file descriptor, which a library 428 # might at least hypothetically select.) 429 430 fudgeFactor = 17 431 unlikelyFD = resource.getrlimit(resource.RLIMIT_NOFILE)[0] - fudgeFactor 432 433 os.dup2(w, unlikelyFD) 434 self.addCleanup(os.close, unlikelyFD) 435 436 output = io.BytesIO() 437 438 class GatheringProtocol(ProcessProtocol): 439 outReceived = output.write 440 441 def processEnded(self, reason): 442 reactor.stop() 443 444 reactor = self.buildReactor() 445 446 reactor.callWhenRunning( 447 reactor.spawnProcess, 448 GatheringProtocol(), 449 pyExe, 450 [pyExe, b"-Wignore", b"-c", source], 451 usePTY=self.usePTY, 452 ) 453 454 self.runReactor(reactor) 455 reportedChildFDs = set(eval(output.getvalue())) 456 457 stdFDs = [0, 1, 2] 458 459 # Unfortunately this assertion is still not *entirely* deterministic, 460 # since hypothetically, any library could open any file descriptor at 461 # any time. See comment above. 462 self.assertEqual( 463 reportedChildFDs.intersection(set(stdFDs + [unlikelyFD])), set(stdFDs) 464 ) 465 466 @onlyOnPOSIX 467 def test_errorDuringExec(self): 468 """ 469 When L{os.execvpe} raises an exception, it will format that exception 470 on stderr as UTF-8, regardless of system encoding information. 471 """ 472 473 def execvpe(*args, **kw): 474 # Ensure that real traceback formatting has some non-ASCII in it, 475 # by forcing the filename of the last frame to contain non-ASCII. 476 filename = "<\N{SNOWMAN}>" 477 if not isinstance(filename, str): 478 filename = filename.encode("utf-8") 479 codeobj = compile("1/0", filename, "single") 480 eval(codeobj) 481 482 self.patch(os, "execvpe", execvpe) 483 self.patch(sys, "getfilesystemencoding", lambda: "ascii") 484 485 reactor = self.buildReactor() 486 output = io.BytesIO() 487 488 @reactor.callWhenRunning 489 def whenRunning(): 490 class TracebackCatcher(ProcessProtocol): 491 errReceived = output.write 492 493 def processEnded(self, reason): 494 reactor.stop() 495 496 reactor.spawnProcess(TracebackCatcher(), pyExe, [pyExe, b"-c", b""]) 497 498 self.runReactor(reactor, timeout=30) 499 self.assertIn("\N{SNOWMAN}".encode(), output.getvalue()) 500 501 def test_timelyProcessExited(self): 502 """ 503 If a spawned process exits, C{processExited} will be called in a 504 timely manner. 505 """ 506 reactor = self.buildReactor() 507 508 class ExitingProtocol(ProcessProtocol): 509 exited = False 510 511 def processExited(protoSelf, reason): 512 protoSelf.exited = True 513 reactor.stop() 514 self.assertEqual(reason.value.exitCode, 0) 515 516 protocol = ExitingProtocol() 517 reactor.callWhenRunning( 518 reactor.spawnProcess, 519 protocol, 520 pyExe, 521 [pyExe, b"-c", b"raise SystemExit(0)"], 522 usePTY=self.usePTY, 523 ) 524 525 # This will timeout if processExited isn't called: 526 self.runReactor(reactor, timeout=30) 527 self.assertTrue(protocol.exited) 528 529 def _changeIDTest(self, which): 530 """ 531 Launch a child process, using either the C{uid} or C{gid} argument to 532 L{IReactorProcess.spawnProcess} to change either its UID or GID to a 533 different value. If the child process reports this hasn't happened, 534 raise an exception to fail the test. 535 536 @param which: Either C{b"uid"} or C{b"gid"}. 537 """ 538 program = ["import os", f"raise SystemExit(os.get{which}() != 1)"] 539 540 container = [] 541 542 class CaptureExitStatus(ProcessProtocol): 543 def processEnded(self, reason): 544 container.append(reason) 545 reactor.stop() 546 547 reactor = self.buildReactor() 548 protocol = CaptureExitStatus() 549 reactor.callWhenRunning( 550 reactor.spawnProcess, 551 protocol, 552 pyExe, 553 [pyExe, "-c", "\n".join(program)], 554 **{which: 1}, 555 ) 556 557 self.runReactor(reactor) 558 559 self.assertEqual(0, container[0].value.exitCode) 560 561 @skipIf(_uidgidSkip, _uidgidSkipReason) 562 def test_changeUID(self): 563 """ 564 If a value is passed for L{IReactorProcess.spawnProcess}'s C{uid}, the 565 child process is run with that UID. 566 """ 567 self._changeIDTest("uid") 568 569 @skipIf(_uidgidSkip, _uidgidSkipReason) 570 def test_changeGID(self): 571 """ 572 If a value is passed for L{IReactorProcess.spawnProcess}'s C{gid}, the 573 child process is run with that GID. 574 """ 575 self._changeIDTest("gid") 576 577 def test_processExitedRaises(self): 578 """ 579 If L{IProcessProtocol.processExited} raises an exception, it is logged. 580 """ 581 # Ideally we wouldn't need to poke the process module; see 582 # https://twistedmatrix.com/trac/ticket/6889 583 reactor = self.buildReactor() 584 585 class TestException(Exception): 586 pass 587 588 class Protocol(ProcessProtocol): 589 def processExited(self, reason): 590 reactor.stop() 591 raise TestException("processedExited raised") 592 593 protocol = Protocol() 594 transport = reactor.spawnProcess( 595 protocol, pyExe, [pyExe, b"-c", b""], usePTY=self.usePTY 596 ) 597 self.runReactor(reactor) 598 599 # Manually clean-up broken process handler. 600 # Only required if the test fails on systems that support 601 # the process module. 602 if process is not None: 603 for pid, handler in list(process.reapProcessHandlers.items()): 604 if handler is not transport: 605 continue 606 process.unregisterReapProcessHandler(pid, handler) 607 self.fail( 608 "After processExited raised, transport was left in" 609 " reapProcessHandlers" 610 ) 611 612 self.assertEqual(1, len(self.flushLoggedErrors(TestException))) 613 614 615class ProcessTestsBuilder(ProcessTestsBuilderBase): 616 """ 617 Builder defining tests relating to L{IReactorProcess} for child processes 618 which do not have a PTY. 619 """ 620 621 usePTY = False 622 623 keepStdioOpenProgram = b"twisted.internet.test.process_helper" 624 if platform.isWindows(): 625 keepStdioOpenArg = b"windows" 626 else: 627 # Just a value that doesn't equal "windows" 628 keepStdioOpenArg = b"" 629 630 # Define this test here because PTY-using processes only have stdin and 631 # stdout and the test would need to be different for that to work. 632 def test_childConnectionLost(self): 633 """ 634 L{IProcessProtocol.childConnectionLost} is called each time a file 635 descriptor associated with a child process is closed. 636 """ 637 connected = Deferred() 638 lost = {0: Deferred(), 1: Deferred(), 2: Deferred()} 639 640 class Closer(ProcessProtocol): 641 def makeConnection(self, transport): 642 connected.callback(transport) 643 644 def childConnectionLost(self, childFD): 645 lost[childFD].callback(None) 646 647 target = b"twisted.internet.test.process_loseconnection" 648 649 reactor = self.buildReactor() 650 reactor.callWhenRunning( 651 reactor.spawnProcess, 652 Closer(), 653 pyExe, 654 [pyExe, b"-m", target], 655 env=properEnv, 656 usePTY=self.usePTY, 657 ) 658 659 def cbConnected(transport): 660 transport.write(b"2\n") 661 return lost[2].addCallback(lambda ign: transport) 662 663 connected.addCallback(cbConnected) 664 665 def lostSecond(transport): 666 transport.write(b"1\n") 667 return lost[1].addCallback(lambda ign: transport) 668 669 connected.addCallback(lostSecond) 670 671 def lostFirst(transport): 672 transport.write(b"\n") 673 674 connected.addCallback(lostFirst) 675 connected.addErrback(err) 676 677 def cbEnded(ignored): 678 reactor.stop() 679 680 connected.addCallback(cbEnded) 681 682 self.runReactor(reactor) 683 684 # This test is here because PTYProcess never delivers childConnectionLost. 685 def test_processEnded(self): 686 """ 687 L{IProcessProtocol.processEnded} is called after the child process 688 exits and L{IProcessProtocol.childConnectionLost} is called for each of 689 its file descriptors. 690 """ 691 ended = Deferred() 692 lost = [] 693 694 class Ender(ProcessProtocol): 695 def childDataReceived(self, fd, data): 696 msg("childDataReceived(%d, %r)" % (fd, data)) 697 self.transport.loseConnection() 698 699 def childConnectionLost(self, childFD): 700 msg("childConnectionLost(%d)" % (childFD,)) 701 lost.append(childFD) 702 703 def processExited(self, reason): 704 msg(f"processExited({reason!r})") 705 706 def processEnded(self, reason): 707 msg(f"processEnded({reason!r})") 708 ended.callback([reason]) 709 710 reactor = self.buildReactor() 711 reactor.callWhenRunning( 712 reactor.spawnProcess, 713 Ender(), 714 pyExe, 715 [pyExe, b"-m", self.keepStdioOpenProgram, b"child", self.keepStdioOpenArg], 716 env=properEnv, 717 usePTY=self.usePTY, 718 ) 719 720 def cbEnded(args): 721 (failure,) = args 722 failure.trap(ProcessDone) 723 self.assertEqual(set(lost), {0, 1, 2}) 724 725 ended.addCallback(cbEnded) 726 727 ended.addErrback(err) 728 ended.addCallback(lambda ign: reactor.stop()) 729 730 self.runReactor(reactor) 731 732 # This test is here because PTYProcess.loseConnection does not actually 733 # close the file descriptors to the child process. This test needs to be 734 # written fairly differently for PTYProcess. 735 def test_processExited(self): 736 """ 737 L{IProcessProtocol.processExited} is called when the child process 738 exits, even if file descriptors associated with the child are still 739 open. 740 """ 741 exited = Deferred() 742 allLost = Deferred() 743 lost = [] 744 745 class Waiter(ProcessProtocol): 746 def childDataReceived(self, fd, data): 747 msg("childDataReceived(%d, %r)" % (fd, data)) 748 749 def childConnectionLost(self, childFD): 750 msg("childConnectionLost(%d)" % (childFD,)) 751 lost.append(childFD) 752 if len(lost) == 3: 753 allLost.callback(None) 754 755 def processExited(self, reason): 756 msg(f"processExited({reason!r})") 757 # See test_processExitedWithSignal 758 exited.callback([reason]) 759 self.transport.loseConnection() 760 761 reactor = self.buildReactor() 762 reactor.callWhenRunning( 763 reactor.spawnProcess, 764 Waiter(), 765 pyExe, 766 [ 767 pyExe, 768 b"-u", 769 b"-m", 770 self.keepStdioOpenProgram, 771 b"child", 772 self.keepStdioOpenArg, 773 ], 774 env=properEnv, 775 usePTY=self.usePTY, 776 ) 777 778 def cbExited(args): 779 (failure,) = args 780 failure.trap(ProcessDone) 781 msg(f"cbExited; lost = {lost}") 782 self.assertEqual(lost, []) 783 return allLost 784 785 exited.addCallback(cbExited) 786 787 def cbAllLost(ignored): 788 self.assertEqual(set(lost), {0, 1, 2}) 789 790 exited.addCallback(cbAllLost) 791 792 exited.addErrback(err) 793 exited.addCallback(lambda ign: reactor.stop()) 794 795 self.runReactor(reactor) 796 797 def makeSourceFile(self, sourceLines): 798 """ 799 Write the given list of lines to a text file and return the absolute 800 path to it. 801 """ 802 script = _asFilesystemBytes(self.mktemp()) 803 with open(script, "wt") as scriptFile: 804 scriptFile.write(os.linesep.join(sourceLines) + os.linesep) 805 return os.path.abspath(script) 806 807 def test_shebang(self): 808 """ 809 Spawning a process with an executable which is a script starting 810 with an interpreter definition line (#!) uses that interpreter to 811 evaluate the script. 812 """ 813 shebangOutput = b"this is the shebang output" 814 815 scriptFile = self.makeSourceFile( 816 [ 817 "#!{}".format(pyExe.decode("ascii")), 818 "import sys", 819 "sys.stdout.write('{}')".format(shebangOutput.decode("ascii")), 820 "sys.stdout.flush()", 821 ] 822 ) 823 os.chmod(scriptFile, 0o700) 824 825 reactor = self.buildReactor() 826 827 def cbProcessExited(args): 828 out, err, code = args 829 msg("cbProcessExited((%r, %r, %d))" % (out, err, code)) 830 self.assertEqual(out, shebangOutput) 831 self.assertEqual(err, b"") 832 self.assertEqual(code, 0) 833 834 def shutdown(passthrough): 835 reactor.stop() 836 return passthrough 837 838 def start(): 839 d = utils.getProcessOutputAndValue(scriptFile, reactor=reactor) 840 d.addBoth(shutdown) 841 d.addCallback(cbProcessExited) 842 d.addErrback(err) 843 844 reactor.callWhenRunning(start) 845 self.runReactor(reactor) 846 847 def test_pauseAndResumeProducing(self): 848 """ 849 Pause producing and then resume producing. 850 """ 851 852 def pauseAndResume(reactor): 853 try: 854 protocol = ProcessProtocol() 855 transport = reactor.spawnProcess( 856 protocol, pyExe, [pyExe, b"-c", b""], usePTY=self.usePTY 857 ) 858 transport.pauseProducing() 859 transport.resumeProducing() 860 finally: 861 reactor.stop() 862 863 reactor = self.buildReactor() 864 reactor.callWhenRunning(pauseAndResume, reactor) 865 self.runReactor(reactor) 866 867 def test_processCommandLineArguments(self): 868 """ 869 Arguments given to spawnProcess are passed to the child process as 870 originally intended. 871 """ 872 us = b"twisted.internet.test.process_cli" 873 874 args = [b"hello", b'"', b" \t|<>^&", br'"\\"hello\\"', br'"foo\ bar baz\""'] 875 # Ensure that all non-NUL characters can be passed too. 876 allChars = "".join(map(chr, range(1, 255))) 877 if isinstance(allChars, str): 878 allChars.encode("utf-8") 879 880 reactor = self.buildReactor() 881 882 def processFinished(finishedArgs): 883 output, err, code = finishedArgs 884 output = output.split(b"\0") 885 # Drop the trailing \0. 886 output.pop() 887 self.assertEqual(args, output) 888 889 def shutdown(result): 890 reactor.stop() 891 return result 892 893 def spawnChild(): 894 d = succeed(None) 895 d.addCallback( 896 lambda dummy: utils.getProcessOutputAndValue( 897 pyExe, [b"-m", us] + args, env=properEnv, reactor=reactor 898 ) 899 ) 900 d.addCallback(processFinished) 901 d.addBoth(shutdown) 902 903 reactor.callWhenRunning(spawnChild) 904 self.runReactor(reactor) 905 906 @onlyOnPOSIX 907 def test_process_unregistered_before_protocol_ended_callback(self): 908 """ 909 Process is removed from reapProcessHandler dict before running 910 ProcessProtocol.processEnded() callback. 911 """ 912 results = [] 913 914 class TestProcessProtocol(ProcessProtocol): 915 """ 916 Process protocol captures own presence in 917 process.reapProcessHandlers at time of .processEnded() callback. 918 919 @ivar deferred: A deferred fired when the .processEnded() callback 920 has completed. 921 @type deferred: L{Deferred<defer.Deferred>} 922 """ 923 924 def __init__(self): 925 self.deferred = Deferred() 926 927 def processEnded(self, status): 928 """ 929 Capture whether the process has already been removed 930 from process.reapProcessHandlers. 931 932 @param status: unused 933 """ 934 from twisted.internet import process 935 936 handlers = process.reapProcessHandlers 937 processes = handlers.values() 938 939 if self.transport in processes: 940 results.append("process present but should not be") 941 else: 942 results.append("process already removed as desired") 943 944 self.deferred.callback(None) 945 946 @inlineCallbacks 947 def launchProcessAndWait(reactor): 948 """ 949 Launch and wait for a subprocess and allow the TestProcessProtocol 950 to capture the order of the .processEnded() callback vs. removal 951 from process.reapProcessHandlers. 952 953 @param reactor: Reactor used to spawn the test process and to be 954 stopped when checks are complete. 955 @type reactor: object providing 956 L{twisted.internet.interfaces.IReactorProcess} and 957 L{twisted.internet.interfaces.IReactorCore}. 958 """ 959 try: 960 testProcessProtocol = TestProcessProtocol() 961 reactor.spawnProcess( 962 testProcessProtocol, 963 pyExe, 964 [pyExe, "--version"], 965 ) 966 yield testProcessProtocol.deferred 967 except Exception as e: 968 results.append(e) 969 finally: 970 reactor.stop() 971 972 reactor = self.buildReactor() 973 reactor.callWhenRunning(launchProcessAndWait, reactor) 974 self.runReactor(reactor) 975 976 hamcrest.assert_that( 977 results, 978 hamcrest.equal_to(["process already removed as desired"]), 979 ) 980 981 982globals().update(ProcessTestsBuilder.makeTestCaseClasses()) 983 984 985class PTYProcessTestsBuilder(ProcessTestsBuilderBase): 986 """ 987 Builder defining tests relating to L{IReactorProcess} for child processes 988 which have a PTY. 989 """ 990 991 usePTY = True 992 993 if platform.isWindows(): 994 skip = "PTYs are not supported on Windows." 995 elif platform.isMacOSX(): 996 skip = "PTYs are flaky from a Darwin bug. See #8840." 997 998 skippedReactors = { 999 "twisted.internet.pollreactor.PollReactor": "macOS's poll() does not support PTYs" 1000 } 1001 1002 1003globals().update(PTYProcessTestsBuilder.makeTestCaseClasses()) 1004 1005 1006class PotentialZombieWarningTests(TestCase): 1007 """ 1008 Tests for L{twisted.internet.error.PotentialZombieWarning}. 1009 """ 1010 1011 def test_deprecated(self): 1012 """ 1013 Accessing L{PotentialZombieWarning} via the 1014 I{PotentialZombieWarning} attribute of L{twisted.internet.error} 1015 results in a deprecation warning being emitted. 1016 """ 1017 from twisted.internet import error 1018 1019 error.PotentialZombieWarning 1020 1021 warnings = self.flushWarnings([self.test_deprecated]) 1022 self.assertEqual(warnings[0]["category"], DeprecationWarning) 1023 self.assertEqual( 1024 warnings[0]["message"], 1025 "twisted.internet.error.PotentialZombieWarning was deprecated in " 1026 "Twisted 10.0.0: There is no longer any potential for zombie " 1027 "process.", 1028 ) 1029 self.assertEqual(len(warnings), 1) 1030 1031 1032class ProcessIsUnimportableOnUnsupportedPlatormsTests(TestCase): 1033 """ 1034 Tests to ensure that L{twisted.internet.process} is unimportable on 1035 platforms where it does not work (namely Windows). 1036 """ 1037 1038 @skipIf(not platform.isWindows(), "Only relevant on Windows.") 1039 def test_unimportableOnWindows(self): 1040 """ 1041 L{twisted.internet.process} is unimportable on Windows. 1042 """ 1043 with self.assertRaises(ImportError): 1044 import twisted.internet.process 1045 1046 twisted.internet.process # shh pyflakes 1047 1048 1049class ReapingNonePidsLogsProperly(TestCase): 1050 try: 1051 # ignore mypy error, since we are testing passing 1052 # the wrong type to waitpid 1053 os.waitpid(None, None) # type: ignore[arg-type] 1054 except Exception as e: 1055 expected_message = str(e) 1056 expected_type = type(e) 1057 1058 @onlyOnPOSIX 1059 def test_registerReapProcessHandler(self): 1060 process.registerReapProcessHandler(None, None) 1061 1062 [error] = self.flushLoggedErrors() 1063 self.assertEqual( 1064 type(error.value), 1065 self.expected_type, 1066 "Wrong error type logged", 1067 ) 1068 self.assertEqual( 1069 str(error.value), 1070 self.expected_message, 1071 "Wrong error message logged", 1072 ) 1073 1074 @onlyOnPOSIX 1075 def test__BaseProcess_reapProcess(self): 1076 _baseProcess = process._BaseProcess(None) 1077 _baseProcess.reapProcess() 1078 1079 [error] = self.flushLoggedErrors() 1080 self.assertEqual( 1081 type(error.value), 1082 self.expected_type, 1083 "Wrong error type logged", 1084 ) 1085 self.assertEqual( 1086 str(error.value), 1087 self.expected_message, 1088 "Wrong error message logged", 1089 ) 1090