1# Copyright (c) Twisted Matrix Laboratories. 2# See LICENSE for details. 3 4""" 5Test running processes. 6""" 7 8import gzip 9import os 10import sys 11import signal 12import StringIO 13import errno 14import gc 15import stat 16import operator 17try: 18 import fcntl 19except ImportError: 20 fcntl = process = None 21else: 22 from twisted.internet import process 23 24 25from zope.interface.verify import verifyObject 26 27from twisted.python.log import msg 28from twisted.internet import reactor, protocol, error, interfaces, defer 29from twisted.trial import unittest 30from twisted.python import util, runtime, procutils 31 32 33 34class StubProcessProtocol(protocol.ProcessProtocol): 35 """ 36 ProcessProtocol counter-implementation: all methods on this class raise an 37 exception, so instances of this may be used to verify that only certain 38 methods are called. 39 """ 40 def outReceived(self, data): 41 raise NotImplementedError() 42 43 def errReceived(self, data): 44 raise NotImplementedError() 45 46 def inConnectionLost(self): 47 raise NotImplementedError() 48 49 def outConnectionLost(self): 50 raise NotImplementedError() 51 52 def errConnectionLost(self): 53 raise NotImplementedError() 54 55 56 57class ProcessProtocolTests(unittest.TestCase): 58 """ 59 Tests for behavior provided by the process protocol base class, 60 L{protocol.ProcessProtocol}. 61 """ 62 def test_interface(self): 63 """ 64 L{ProcessProtocol} implements L{IProcessProtocol}. 65 """ 66 verifyObject(interfaces.IProcessProtocol, protocol.ProcessProtocol()) 67 68 69 def test_outReceived(self): 70 """ 71 Verify that when stdout is delivered to 72 L{ProcessProtocol.childDataReceived}, it is forwarded to 73 L{ProcessProtocol.outReceived}. 74 """ 75 received = [] 76 class OutProtocol(StubProcessProtocol): 77 def outReceived(self, data): 78 received.append(data) 79 80 bytes = "bytes" 81 p = OutProtocol() 82 p.childDataReceived(1, bytes) 83 self.assertEqual(received, [bytes]) 84 85 86 def test_errReceived(self): 87 """ 88 Similar to L{test_outReceived}, but for stderr. 89 """ 90 received = [] 91 class ErrProtocol(StubProcessProtocol): 92 def errReceived(self, data): 93 received.append(data) 94 95 bytes = "bytes" 96 p = ErrProtocol() 97 p.childDataReceived(2, bytes) 98 self.assertEqual(received, [bytes]) 99 100 101 def test_inConnectionLost(self): 102 """ 103 Verify that when stdin close notification is delivered to 104 L{ProcessProtocol.childConnectionLost}, it is forwarded to 105 L{ProcessProtocol.inConnectionLost}. 106 """ 107 lost = [] 108 class InLostProtocol(StubProcessProtocol): 109 def inConnectionLost(self): 110 lost.append(None) 111 112 p = InLostProtocol() 113 p.childConnectionLost(0) 114 self.assertEqual(lost, [None]) 115 116 117 def test_outConnectionLost(self): 118 """ 119 Similar to L{test_inConnectionLost}, but for stdout. 120 """ 121 lost = [] 122 class OutLostProtocol(StubProcessProtocol): 123 def outConnectionLost(self): 124 lost.append(None) 125 126 p = OutLostProtocol() 127 p.childConnectionLost(1) 128 self.assertEqual(lost, [None]) 129 130 131 def test_errConnectionLost(self): 132 """ 133 Similar to L{test_inConnectionLost}, but for stderr. 134 """ 135 lost = [] 136 class ErrLostProtocol(StubProcessProtocol): 137 def errConnectionLost(self): 138 lost.append(None) 139 140 p = ErrLostProtocol() 141 p.childConnectionLost(2) 142 self.assertEqual(lost, [None]) 143 144 145 146class TrivialProcessProtocol(protocol.ProcessProtocol): 147 """ 148 Simple process protocol for tests purpose. 149 150 @ivar outData: data received from stdin 151 @ivar errData: data received from stderr 152 """ 153 154 def __init__(self, d): 155 """ 156 Create the deferred that will be fired at the end, and initialize 157 data structures. 158 """ 159 self.deferred = d 160 self.outData = [] 161 self.errData = [] 162 163 def processEnded(self, reason): 164 self.reason = reason 165 self.deferred.callback(None) 166 167 def outReceived(self, data): 168 self.outData.append(data) 169 170 def errReceived(self, data): 171 self.errData.append(data) 172 173 174class TestProcessProtocol(protocol.ProcessProtocol): 175 176 def connectionMade(self): 177 self.stages = [1] 178 self.data = '' 179 self.err = '' 180 self.transport.write("abcd") 181 182 def childDataReceived(self, childFD, data): 183 """ 184 Override and disable the dispatch provided by the base class to ensure 185 that it is really this method which is being called, and the transport 186 is not going directly to L{outReceived} or L{errReceived}. 187 """ 188 if childFD == 1: 189 self.data += data 190 elif childFD == 2: 191 self.err += data 192 193 194 def childConnectionLost(self, childFD): 195 """ 196 Similarly to L{childDataReceived}, disable the automatic dispatch 197 provided by the base implementation to verify that the transport is 198 calling this method directly. 199 """ 200 if childFD == 1: 201 self.stages.append(2) 202 if self.data != "abcd": 203 raise RuntimeError( 204 "Data was %r instead of 'abcd'" % (self.data,)) 205 self.transport.write("1234") 206 elif childFD == 2: 207 self.stages.append(3) 208 if self.err != "1234": 209 raise RuntimeError( 210 "Err was %r instead of '1234'" % (self.err,)) 211 self.transport.write("abcd") 212 self.stages.append(4) 213 elif childFD == 0: 214 self.stages.append(5) 215 216 def processEnded(self, reason): 217 self.reason = reason 218 self.deferred.callback(None) 219 220 221class EchoProtocol(protocol.ProcessProtocol): 222 223 s = "1234567" * 1001 224 n = 10 225 finished = 0 226 227 failure = None 228 229 def __init__(self, onEnded): 230 self.onEnded = onEnded 231 self.count = 0 232 233 def connectionMade(self): 234 assert self.n > 2 235 for i in range(self.n - 2): 236 self.transport.write(self.s) 237 # test writeSequence 238 self.transport.writeSequence([self.s, self.s]) 239 self.buffer = self.s * self.n 240 241 def outReceived(self, data): 242 if buffer(self.buffer, self.count, len(data)) != buffer(data): 243 self.failure = ("wrong bytes received", data, self.count) 244 self.transport.closeStdin() 245 else: 246 self.count += len(data) 247 if self.count == len(self.buffer): 248 self.transport.closeStdin() 249 250 def processEnded(self, reason): 251 self.finished = 1 252 if not reason.check(error.ProcessDone): 253 self.failure = "process didn't terminate normally: " + str(reason) 254 self.onEnded.callback(self) 255 256 257 258class SignalProtocol(protocol.ProcessProtocol): 259 """ 260 A process protocol that sends a signal when data is first received. 261 262 @ivar deferred: deferred firing on C{processEnded}. 263 @type deferred: L{defer.Deferred} 264 265 @ivar signal: the signal to send to the process. 266 @type signal: C{str} 267 268 @ivar signaled: A flag tracking whether the signal has been sent to the 269 child or not yet. C{False} until it is sent, then C{True}. 270 @type signaled: C{bool} 271 """ 272 273 def __init__(self, deferred, sig): 274 self.deferred = deferred 275 self.signal = sig 276 self.signaled = False 277 278 279 def outReceived(self, data): 280 """ 281 Handle the first output from the child process (which indicates it 282 is set up and ready to receive the signal) by sending the signal to 283 it. Also log all output to help with debugging. 284 """ 285 msg("Received %r from child stdout" % (data,)) 286 if not self.signaled: 287 self.signaled = True 288 self.transport.signalProcess(self.signal) 289 290 291 def errReceived(self, data): 292 """ 293 Log all data received from the child's stderr to help with 294 debugging. 295 """ 296 msg("Received %r from child stderr" % (data,)) 297 298 299 def processEnded(self, reason): 300 """ 301 Callback C{self.deferred} with C{None} if C{reason} is a 302 L{error.ProcessTerminated} failure with C{exitCode} set to C{None}, 303 C{signal} set to C{self.signal}, and C{status} holding the status code 304 of the exited process. Otherwise, errback with a C{ValueError} 305 describing the problem. 306 """ 307 msg("Child exited: %r" % (reason.getTraceback(),)) 308 if not reason.check(error.ProcessTerminated): 309 return self.deferred.errback( 310 ValueError("wrong termination: %s" % (reason,))) 311 v = reason.value 312 if isinstance(self.signal, str): 313 signalValue = getattr(signal, 'SIG' + self.signal) 314 else: 315 signalValue = self.signal 316 if v.exitCode is not None: 317 return self.deferred.errback( 318 ValueError("SIG%s: exitCode is %s, not None" % 319 (self.signal, v.exitCode))) 320 if v.signal != signalValue: 321 return self.deferred.errback( 322 ValueError("SIG%s: .signal was %s, wanted %s" % 323 (self.signal, v.signal, signalValue))) 324 if os.WTERMSIG(v.status) != signalValue: 325 return self.deferred.errback( 326 ValueError('SIG%s: %s' % (self.signal, os.WTERMSIG(v.status)))) 327 self.deferred.callback(None) 328 329 330 331class TestManyProcessProtocol(TestProcessProtocol): 332 def __init__(self): 333 self.deferred = defer.Deferred() 334 335 def processEnded(self, reason): 336 self.reason = reason 337 if reason.check(error.ProcessDone): 338 self.deferred.callback(None) 339 else: 340 self.deferred.errback(reason) 341 342 343 344class UtilityProcessProtocol(protocol.ProcessProtocol): 345 """ 346 Helper class for launching a Python process and getting a result from it. 347 348 @ivar program: A string giving a Python program for the child process to 349 run. 350 """ 351 program = None 352 353 def run(cls, reactor, argv, env): 354 """ 355 Run a Python process connected to a new instance of this protocol 356 class. Return the protocol instance. 357 358 The Python process is given C{self.program} on the command line to 359 execute, in addition to anything specified by C{argv}. C{env} is 360 the complete environment. 361 """ 362 exe = sys.executable 363 self = cls() 364 reactor.spawnProcess( 365 self, exe, [exe, "-c", self.program] + argv, env=env) 366 return self 367 run = classmethod(run) 368 369 370 def __init__(self): 371 self.bytes = [] 372 self.requests = [] 373 374 375 def parseChunks(self, bytes): 376 """ 377 Called with all bytes received on stdout when the process exits. 378 """ 379 raise NotImplementedError() 380 381 382 def getResult(self): 383 """ 384 Return a Deferred which will fire with the result of L{parseChunks} 385 when the child process exits. 386 """ 387 d = defer.Deferred() 388 self.requests.append(d) 389 return d 390 391 392 def _fireResultDeferreds(self, result): 393 """ 394 Callback all Deferreds returned up until now by L{getResult} 395 with the given result object. 396 """ 397 requests = self.requests 398 self.requests = None 399 for d in requests: 400 d.callback(result) 401 402 403 def outReceived(self, bytes): 404 """ 405 Accumulate output from the child process in a list. 406 """ 407 self.bytes.append(bytes) 408 409 410 def processEnded(self, reason): 411 """ 412 Handle process termination by parsing all received output and firing 413 any waiting Deferreds. 414 """ 415 self._fireResultDeferreds(self.parseChunks(self.bytes)) 416 417 418 419 420class GetArgumentVector(UtilityProcessProtocol): 421 """ 422 Protocol which will read a serialized argv from a process and 423 expose it to interested parties. 424 """ 425 program = ( 426 "from sys import stdout, argv\n" 427 "stdout.write(chr(0).join(argv))\n" 428 "stdout.flush()\n") 429 430 def parseChunks(self, chunks): 431 """ 432 Parse the output from the process to which this protocol was 433 connected, which is a single unterminated line of \\0-separated 434 strings giving the argv of that process. Return this as a list of 435 str objects. 436 """ 437 return ''.join(chunks).split('\0') 438 439 440 441class GetEnvironmentDictionary(UtilityProcessProtocol): 442 """ 443 Protocol which will read a serialized environment dict from a process 444 and expose it to interested parties. 445 """ 446 program = ( 447 "from sys import stdout\n" 448 "from os import environ\n" 449 "items = environ.iteritems()\n" 450 "stdout.write(chr(0).join([k + chr(0) + v for k, v in items]))\n" 451 "stdout.flush()\n") 452 453 def parseChunks(self, chunks): 454 """ 455 Parse the output from the process to which this protocol was 456 connected, which is a single unterminated line of \\0-separated 457 strings giving key value pairs of the environment from that process. 458 Return this as a dictionary. 459 """ 460 environString = ''.join(chunks) 461 if not environString: 462 return {} 463 environ = iter(environString.split('\0')) 464 d = {} 465 while 1: 466 try: 467 k = environ.next() 468 except StopIteration: 469 break 470 else: 471 v = environ.next() 472 d[k] = v 473 return d 474 475 476 477class ProcessTestCase(unittest.TestCase): 478 """Test running a process.""" 479 480 usePTY = False 481 482 def testStdio(self): 483 """twisted.internet.stdio test.""" 484 exe = sys.executable 485 scriptPath = util.sibpath(__file__, "process_twisted.py") 486 p = Accumulator() 487 d = p.endedDeferred = defer.Deferred() 488 env = {"PYTHONPATH": os.pathsep.join(sys.path)} 489 reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=env, 490 path=None, usePTY=self.usePTY) 491 p.transport.write("hello, world") 492 p.transport.write("abc") 493 p.transport.write("123") 494 p.transport.closeStdin() 495 496 def processEnded(ign): 497 self.assertEqual(p.outF.getvalue(), "hello, worldabc123", 498 "Output follows:\n" 499 "%s\n" 500 "Error message from process_twisted follows:\n" 501 "%s\n" % (p.outF.getvalue(), p.errF.getvalue())) 502 return d.addCallback(processEnded) 503 504 505 def test_unsetPid(self): 506 """ 507 Test if pid is None/non-None before/after process termination. This 508 reuses process_echoer.py to get a process that blocks on stdin. 509 """ 510 finished = defer.Deferred() 511 p = TrivialProcessProtocol(finished) 512 exe = sys.executable 513 scriptPath = util.sibpath(__file__, "process_echoer.py") 514 procTrans = reactor.spawnProcess(p, exe, 515 [exe, scriptPath], env=None) 516 self.failUnless(procTrans.pid) 517 518 def afterProcessEnd(ignored): 519 self.assertEqual(procTrans.pid, None) 520 521 p.transport.closeStdin() 522 return finished.addCallback(afterProcessEnd) 523 524 525 def test_process(self): 526 """ 527 Test running a process: check its output, it exitCode, some property of 528 signalProcess. 529 """ 530 exe = sys.executable 531 scriptPath = util.sibpath(__file__, "process_tester.py") 532 d = defer.Deferred() 533 p = TestProcessProtocol() 534 p.deferred = d 535 reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None) 536 def check(ignored): 537 self.assertEqual(p.stages, [1, 2, 3, 4, 5]) 538 f = p.reason 539 f.trap(error.ProcessTerminated) 540 self.assertEqual(f.value.exitCode, 23) 541 # would .signal be available on non-posix? 542 # self.assertEqual(f.value.signal, None) 543 self.assertRaises( 544 error.ProcessExitedAlready, p.transport.signalProcess, 'INT') 545 try: 546 import process_tester, glob 547 for f in glob.glob(process_tester.test_file_match): 548 os.remove(f) 549 except: 550 pass 551 d.addCallback(check) 552 return d 553 554 def testManyProcesses(self): 555 556 def _check(results, protocols): 557 for p in protocols: 558 self.assertEqual(p.stages, [1, 2, 3, 4, 5], "[%d] stages = %s" % (id(p.transport), str(p.stages))) 559 # test status code 560 f = p.reason 561 f.trap(error.ProcessTerminated) 562 self.assertEqual(f.value.exitCode, 23) 563 564 exe = sys.executable 565 scriptPath = util.sibpath(__file__, "process_tester.py") 566 args = [exe, "-u", scriptPath] 567 protocols = [] 568 deferreds = [] 569 570 for i in xrange(50): 571 p = TestManyProcessProtocol() 572 protocols.append(p) 573 reactor.spawnProcess(p, exe, args, env=None) 574 deferreds.append(p.deferred) 575 576 deferredList = defer.DeferredList(deferreds, consumeErrors=True) 577 deferredList.addCallback(_check, protocols) 578 return deferredList 579 580 581 def test_echo(self): 582 """ 583 A spawning a subprocess which echoes its stdin to its stdout via 584 C{reactor.spawnProcess} will result in that echoed output being 585 delivered to outReceived. 586 """ 587 finished = defer.Deferred() 588 p = EchoProtocol(finished) 589 590 exe = sys.executable 591 scriptPath = util.sibpath(__file__, "process_echoer.py") 592 reactor.spawnProcess(p, exe, [exe, scriptPath], env=None) 593 594 def asserts(ignored): 595 self.failIf(p.failure, p.failure) 596 self.failUnless(hasattr(p, 'buffer')) 597 self.assertEqual(len(''.join(p.buffer)), len(p.s * p.n)) 598 599 def takedownProcess(err): 600 p.transport.closeStdin() 601 return err 602 603 return finished.addCallback(asserts).addErrback(takedownProcess) 604 605 606 def testCommandLine(self): 607 args = [r'a\"b ', r'a\b ', r' a\\"b', r' a\\b', r'"foo bar" "', '\tab', '"\\', 'a"b', "a'b"] 608 pyExe = sys.executable 609 scriptPath = util.sibpath(__file__, "process_cmdline.py") 610 p = Accumulator() 611 d = p.endedDeferred = defer.Deferred() 612 reactor.spawnProcess(p, pyExe, [pyExe, "-u", scriptPath]+args, env=None, 613 path=None) 614 615 def processEnded(ign): 616 self.assertEqual(p.errF.getvalue(), "") 617 recvdArgs = p.outF.getvalue().splitlines() 618 self.assertEqual(recvdArgs, args) 619 return d.addCallback(processEnded) 620 621 622 def test_wrongArguments(self): 623 """ 624 Test invalid arguments to spawnProcess: arguments and environment 625 must only contains string or unicode, and not null bytes. 626 """ 627 exe = sys.executable 628 p = protocol.ProcessProtocol() 629 630 badEnvs = [ 631 {"foo": 2}, 632 {"foo": "egg\0a"}, 633 {3: "bar"}, 634 {"bar\0foo": "bar"}] 635 636 badArgs = [ 637 [exe, 2], 638 "spam", 639 [exe, "foo\0bar"]] 640 641 # Sanity check - this will fail for people who have mucked with 642 # their site configuration in a stupid way, but there's nothing we 643 # can do about that. 644 badUnicode = u'\N{SNOWMAN}' 645 try: 646 badUnicode.encode(sys.getdefaultencoding()) 647 except UnicodeEncodeError: 648 # Okay, that unicode doesn't encode, put it in as a bad environment 649 # key. 650 badEnvs.append({badUnicode: 'value for bad unicode key'}) 651 badEnvs.append({'key for bad unicode value': badUnicode}) 652 badArgs.append([exe, badUnicode]) 653 else: 654 # It _did_ encode. Most likely, Gtk2 is being used and the 655 # default system encoding is UTF-8, which can encode anything. 656 # In any case, if implicit unicode -> str conversion works for 657 # that string, we can't test that TypeError gets raised instead, 658 # so just leave it off. 659 pass 660 661 for env in badEnvs: 662 self.assertRaises( 663 TypeError, 664 reactor.spawnProcess, p, exe, [exe, "-c", ""], env=env) 665 666 for args in badArgs: 667 self.assertRaises( 668 TypeError, 669 reactor.spawnProcess, p, exe, args, env=None) 670 671 672 # Use upper-case so that the environment key test uses an upper case 673 # name: some versions of Windows only support upper case environment 674 # variable names, and I think Python (as of 2.5) doesn't use the right 675 # syscall for lowercase or mixed case names to work anyway. 676 okayUnicode = u"UNICODE" 677 encodedValue = "UNICODE" 678 679 def _deprecatedUnicodeSupportTest(self, processProtocolClass, argv=[], env={}): 680 """ 681 Check that a deprecation warning is emitted when passing unicode to 682 spawnProcess for an argv value or an environment key or value. 683 Check that the warning is of the right type, has the right message, 684 and refers to the correct file. Unfortunately, don't check that the 685 line number is correct, because that is too hard for me to figure 686 out. 687 688 @param processProtocolClass: A L{UtilityProcessProtocol} subclass 689 which will be instantiated to communicate with the child process. 690 691 @param argv: The argv argument to spawnProcess. 692 693 @param env: The env argument to spawnProcess. 694 695 @return: A Deferred which fires when the test is complete. 696 """ 697 # Sanity to check to make sure we can actually encode this unicode 698 # with the default system encoding. This may be excessively 699 # paranoid. -exarkun 700 self.assertEqual( 701 self.okayUnicode.encode(sys.getdefaultencoding()), 702 self.encodedValue) 703 704 p = self.assertWarns(DeprecationWarning, 705 "Argument strings and environment keys/values passed to " 706 "reactor.spawnProcess should be str, not unicode.", __file__, 707 processProtocolClass.run, reactor, argv, env) 708 return p.getResult() 709 710 711 def test_deprecatedUnicodeArgvSupport(self): 712 """ 713 Test that a unicode string passed for an argument value is allowed 714 if it can be encoded with the default system encoding, but that a 715 deprecation warning is emitted. 716 """ 717 d = self._deprecatedUnicodeSupportTest(GetArgumentVector, argv=[self.okayUnicode]) 718 def gotArgVector(argv): 719 self.assertEqual(argv, ['-c', self.encodedValue]) 720 d.addCallback(gotArgVector) 721 return d 722 723 724 def test_deprecatedUnicodeEnvKeySupport(self): 725 """ 726 Test that a unicode string passed for the key of the environment 727 dictionary is allowed if it can be encoded with the default system 728 encoding, but that a deprecation warning is emitted. 729 """ 730 d = self._deprecatedUnicodeSupportTest( 731 GetEnvironmentDictionary, env={self.okayUnicode: self.encodedValue}) 732 def gotEnvironment(environ): 733 self.assertEqual(environ[self.encodedValue], self.encodedValue) 734 d.addCallback(gotEnvironment) 735 return d 736 737 738 def test_deprecatedUnicodeEnvValueSupport(self): 739 """ 740 Test that a unicode string passed for the value of the environment 741 dictionary is allowed if it can be encoded with the default system 742 encoding, but that a deprecation warning is emitted. 743 """ 744 d = self._deprecatedUnicodeSupportTest( 745 GetEnvironmentDictionary, env={self.encodedValue: self.okayUnicode}) 746 def gotEnvironment(environ): 747 # On Windows, the environment contains more things than we 748 # specified, so only make sure that at least the key we wanted 749 # is there, rather than testing the dictionary for exact 750 # equality. 751 self.assertEqual(environ[self.encodedValue], self.encodedValue) 752 d.addCallback(gotEnvironment) 753 return d 754 755 756 757class TwoProcessProtocol(protocol.ProcessProtocol): 758 num = -1 759 finished = 0 760 def __init__(self): 761 self.deferred = defer.Deferred() 762 def outReceived(self, data): 763 pass 764 def processEnded(self, reason): 765 self.finished = 1 766 self.deferred.callback(None) 767 768class TestTwoProcessesBase: 769 def setUp(self): 770 self.processes = [None, None] 771 self.pp = [None, None] 772 self.done = 0 773 self.verbose = 0 774 775 def createProcesses(self, usePTY=0): 776 exe = sys.executable 777 scriptPath = util.sibpath(__file__, "process_reader.py") 778 for num in (0,1): 779 self.pp[num] = TwoProcessProtocol() 780 self.pp[num].num = num 781 p = reactor.spawnProcess(self.pp[num], 782 exe, [exe, "-u", scriptPath], env=None, 783 usePTY=usePTY) 784 self.processes[num] = p 785 786 def close(self, num): 787 if self.verbose: print "closing stdin [%d]" % num 788 p = self.processes[num] 789 pp = self.pp[num] 790 self.failIf(pp.finished, "Process finished too early") 791 p.loseConnection() 792 if self.verbose: print self.pp[0].finished, self.pp[1].finished 793 794 def _onClose(self): 795 return defer.gatherResults([ p.deferred for p in self.pp ]) 796 797 def testClose(self): 798 if self.verbose: print "starting processes" 799 self.createProcesses() 800 reactor.callLater(1, self.close, 0) 801 reactor.callLater(2, self.close, 1) 802 return self._onClose() 803 804class TestTwoProcessesNonPosix(TestTwoProcessesBase, unittest.TestCase): 805 pass 806 807class TestTwoProcessesPosix(TestTwoProcessesBase, unittest.TestCase): 808 def tearDown(self): 809 for pp, pr in zip(self.pp, self.processes): 810 if not pp.finished: 811 try: 812 os.kill(pr.pid, signal.SIGTERM) 813 except OSError: 814 # If the test failed the process may already be dead 815 # The error here is only noise 816 pass 817 return self._onClose() 818 819 def kill(self, num): 820 if self.verbose: print "kill [%d] with SIGTERM" % num 821 p = self.processes[num] 822 pp = self.pp[num] 823 self.failIf(pp.finished, "Process finished too early") 824 os.kill(p.pid, signal.SIGTERM) 825 if self.verbose: print self.pp[0].finished, self.pp[1].finished 826 827 def testKill(self): 828 if self.verbose: print "starting processes" 829 self.createProcesses(usePTY=0) 830 reactor.callLater(1, self.kill, 0) 831 reactor.callLater(2, self.kill, 1) 832 return self._onClose() 833 834 def testClosePty(self): 835 if self.verbose: print "starting processes" 836 self.createProcesses(usePTY=1) 837 reactor.callLater(1, self.close, 0) 838 reactor.callLater(2, self.close, 1) 839 return self._onClose() 840 841 def testKillPty(self): 842 if self.verbose: print "starting processes" 843 self.createProcesses(usePTY=1) 844 reactor.callLater(1, self.kill, 0) 845 reactor.callLater(2, self.kill, 1) 846 return self._onClose() 847 848class FDChecker(protocol.ProcessProtocol): 849 state = 0 850 data = "" 851 failed = None 852 853 def __init__(self, d): 854 self.deferred = d 855 856 def fail(self, why): 857 self.failed = why 858 self.deferred.callback(None) 859 860 def connectionMade(self): 861 self.transport.writeToChild(0, "abcd") 862 self.state = 1 863 864 def childDataReceived(self, childFD, data): 865 if self.state == 1: 866 if childFD != 1: 867 self.fail("read '%s' on fd %d (not 1) during state 1" \ 868 % (childFD, data)) 869 return 870 self.data += data 871 #print "len", len(self.data) 872 if len(self.data) == 6: 873 if self.data != "righto": 874 self.fail("got '%s' on fd1, expected 'righto'" \ 875 % self.data) 876 return 877 self.data = "" 878 self.state = 2 879 #print "state2", self.state 880 self.transport.writeToChild(3, "efgh") 881 return 882 if self.state == 2: 883 self.fail("read '%s' on fd %s during state 2" % (childFD, data)) 884 return 885 if self.state == 3: 886 if childFD != 1: 887 self.fail("read '%s' on fd %s (not 1) during state 3" \ 888 % (childFD, data)) 889 return 890 self.data += data 891 if len(self.data) == 6: 892 if self.data != "closed": 893 self.fail("got '%s' on fd1, expected 'closed'" \ 894 % self.data) 895 return 896 self.state = 4 897 return 898 if self.state == 4: 899 self.fail("read '%s' on fd %s during state 4" % (childFD, data)) 900 return 901 902 def childConnectionLost(self, childFD): 903 if self.state == 1: 904 self.fail("got connectionLost(%d) during state 1" % childFD) 905 return 906 if self.state == 2: 907 if childFD != 4: 908 self.fail("got connectionLost(%d) (not 4) during state 2" \ 909 % childFD) 910 return 911 self.state = 3 912 self.transport.closeChildFD(5) 913 return 914 915 def processEnded(self, status): 916 rc = status.value.exitCode 917 if self.state != 4: 918 self.fail("processEnded early, rc %d" % rc) 919 return 920 if status.value.signal != None: 921 self.fail("processEnded with signal %s" % status.value.signal) 922 return 923 if rc != 0: 924 self.fail("processEnded with rc %d" % rc) 925 return 926 self.deferred.callback(None) 927 928 929class FDTest(unittest.TestCase): 930 931 def testFD(self): 932 exe = sys.executable 933 scriptPath = util.sibpath(__file__, "process_fds.py") 934 d = defer.Deferred() 935 p = FDChecker(d) 936 reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None, 937 path=None, 938 childFDs={0:"w", 1:"r", 2:2, 939 3:"w", 4:"r", 5:"w"}) 940 d.addCallback(lambda x : self.failIf(p.failed, p.failed)) 941 return d 942 943 def testLinger(self): 944 # See what happens when all the pipes close before the process 945 # actually stops. This test *requires* SIGCHLD catching to work, 946 # as there is no other way to find out the process is done. 947 exe = sys.executable 948 scriptPath = util.sibpath(__file__, "process_linger.py") 949 p = Accumulator() 950 d = p.endedDeferred = defer.Deferred() 951 reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None, 952 path=None, 953 childFDs={1:"r", 2:2}, 954 ) 955 def processEnded(ign): 956 self.assertEqual(p.outF.getvalue(), 957 "here is some text\ngoodbye\n") 958 return d.addCallback(processEnded) 959 960 961 962class Accumulator(protocol.ProcessProtocol): 963 """Accumulate data from a process.""" 964 965 closed = 0 966 endedDeferred = None 967 968 def connectionMade(self): 969 self.outF = StringIO.StringIO() 970 self.errF = StringIO.StringIO() 971 972 def outReceived(self, d): 973 self.outF.write(d) 974 975 def errReceived(self, d): 976 self.errF.write(d) 977 978 def outConnectionLost(self): 979 pass 980 981 def errConnectionLost(self): 982 pass 983 984 def processEnded(self, reason): 985 self.closed = 1 986 if self.endedDeferred is not None: 987 d, self.endedDeferred = self.endedDeferred, None 988 d.callback(None) 989 990 991class PosixProcessBase: 992 """ 993 Test running processes. 994 """ 995 usePTY = False 996 997 def getCommand(self, commandName): 998 """ 999 Return the path of the shell command named C{commandName}, looking at 1000 common locations. 1001 """ 1002 if os.path.exists('/bin/%s' % (commandName,)): 1003 cmd = '/bin/%s' % (commandName,) 1004 elif os.path.exists('/usr/bin/%s' % (commandName,)): 1005 cmd = '/usr/bin/%s' % (commandName,) 1006 else: 1007 raise RuntimeError( 1008 "%s not found in /bin or /usr/bin" % (commandName,)) 1009 return cmd 1010 1011 def testNormalTermination(self): 1012 cmd = self.getCommand('true') 1013 1014 d = defer.Deferred() 1015 p = TrivialProcessProtocol(d) 1016 reactor.spawnProcess(p, cmd, ['true'], env=None, 1017 usePTY=self.usePTY) 1018 def check(ignored): 1019 p.reason.trap(error.ProcessDone) 1020 self.assertEqual(p.reason.value.exitCode, 0) 1021 self.assertEqual(p.reason.value.signal, None) 1022 d.addCallback(check) 1023 return d 1024 1025 1026 def test_abnormalTermination(self): 1027 """ 1028 When a process terminates with a system exit code set to 1, 1029 C{processEnded} is called with a L{error.ProcessTerminated} error, 1030 the C{exitCode} attribute reflecting the system exit code. 1031 """ 1032 exe = sys.executable 1033 1034 d = defer.Deferred() 1035 p = TrivialProcessProtocol(d) 1036 reactor.spawnProcess(p, exe, [exe, '-c', 'import sys; sys.exit(1)'], 1037 env=None, usePTY=self.usePTY) 1038 1039 def check(ignored): 1040 p.reason.trap(error.ProcessTerminated) 1041 self.assertEqual(p.reason.value.exitCode, 1) 1042 self.assertEqual(p.reason.value.signal, None) 1043 d.addCallback(check) 1044 return d 1045 1046 1047 def _testSignal(self, sig): 1048 exe = sys.executable 1049 scriptPath = util.sibpath(__file__, "process_signal.py") 1050 d = defer.Deferred() 1051 p = SignalProtocol(d, sig) 1052 reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None, 1053 usePTY=self.usePTY) 1054 return d 1055 1056 1057 def test_signalHUP(self): 1058 """ 1059 Sending the SIGHUP signal to a running process interrupts it, and 1060 C{processEnded} is called with a L{error.ProcessTerminated} instance 1061 with the C{exitCode} set to C{None} and the C{signal} attribute set to 1062 C{signal.SIGHUP}. C{os.WTERMSIG} can also be used on the C{status} 1063 attribute to extract the signal value. 1064 """ 1065 return self._testSignal('HUP') 1066 1067 1068 def test_signalINT(self): 1069 """ 1070 Sending the SIGINT signal to a running process interrupts it, and 1071 C{processEnded} is called with a L{error.ProcessTerminated} instance 1072 with the C{exitCode} set to C{None} and the C{signal} attribute set to 1073 C{signal.SIGINT}. C{os.WTERMSIG} can also be used on the C{status} 1074 attribute to extract the signal value. 1075 """ 1076 return self._testSignal('INT') 1077 1078 1079 def test_signalKILL(self): 1080 """ 1081 Sending the SIGKILL signal to a running process interrupts it, and 1082 C{processEnded} is called with a L{error.ProcessTerminated} instance 1083 with the C{exitCode} set to C{None} and the C{signal} attribute set to 1084 C{signal.SIGKILL}. C{os.WTERMSIG} can also be used on the C{status} 1085 attribute to extract the signal value. 1086 """ 1087 return self._testSignal('KILL') 1088 1089 1090 def test_signalTERM(self): 1091 """ 1092 Sending the SIGTERM signal to a running process interrupts it, and 1093 C{processEnded} is called with a L{error.ProcessTerminated} instance 1094 with the C{exitCode} set to C{None} and the C{signal} attribute set to 1095 C{signal.SIGTERM}. C{os.WTERMSIG} can also be used on the C{status} 1096 attribute to extract the signal value. 1097 """ 1098 return self._testSignal('TERM') 1099 1100 1101 def test_childSignalHandling(self): 1102 """ 1103 The disposition of signals which are ignored in the parent 1104 process is reset to the default behavior for the child 1105 process. 1106 """ 1107 # Somewhat arbitrarily select SIGUSR1 here. It satisfies our 1108 # requirements that: 1109 # - The interpreter not fiddle around with the handler 1110 # behind our backs at startup time (this disqualifies 1111 # signals like SIGINT and SIGPIPE). 1112 # - The default behavior is to exit. 1113 # 1114 # This lets us send the signal to the child and then verify 1115 # that it exits with a status code indicating that it was 1116 # indeed the signal which caused it to exit. 1117 which = signal.SIGUSR1 1118 1119 # Ignore the signal in the parent (and make sure we clean it 1120 # up). 1121 handler = signal.signal(which, signal.SIG_IGN) 1122 self.addCleanup(signal.signal, signal.SIGUSR1, handler) 1123 1124 # Now do the test. 1125 return self._testSignal(signal.SIGUSR1) 1126 1127 1128 def test_executionError(self): 1129 """ 1130 Raise an error during execvpe to check error management. 1131 """ 1132 cmd = self.getCommand('false') 1133 1134 d = defer.Deferred() 1135 p = TrivialProcessProtocol(d) 1136 def buggyexecvpe(command, args, environment): 1137 raise RuntimeError("Ouch") 1138 oldexecvpe = os.execvpe 1139 os.execvpe = buggyexecvpe 1140 try: 1141 reactor.spawnProcess(p, cmd, ['false'], env=None, 1142 usePTY=self.usePTY) 1143 1144 def check(ignored): 1145 errData = "".join(p.errData + p.outData) 1146 self.assertIn("Upon execvpe", errData) 1147 self.assertIn("Ouch", errData) 1148 d.addCallback(check) 1149 finally: 1150 os.execvpe = oldexecvpe 1151 return d 1152 1153 1154 def test_errorInProcessEnded(self): 1155 """ 1156 The handler which reaps a process is removed when the process is 1157 reaped, even if the protocol's C{processEnded} method raises an 1158 exception. 1159 """ 1160 connected = defer.Deferred() 1161 ended = defer.Deferred() 1162 1163 # This script runs until we disconnect its transport. 1164 pythonExecutable = sys.executable 1165 scriptPath = util.sibpath(__file__, "process_echoer.py") 1166 1167 class ErrorInProcessEnded(protocol.ProcessProtocol): 1168 """ 1169 A protocol that raises an error in C{processEnded}. 1170 """ 1171 def makeConnection(self, transport): 1172 connected.callback(transport) 1173 1174 def processEnded(self, reason): 1175 reactor.callLater(0, ended.callback, None) 1176 raise RuntimeError("Deliberate error") 1177 1178 # Launch the process. 1179 reactor.spawnProcess( 1180 ErrorInProcessEnded(), pythonExecutable, 1181 [pythonExecutable, scriptPath], 1182 env=None, path=None) 1183 1184 pid = [] 1185 def cbConnected(transport): 1186 pid.append(transport.pid) 1187 # There's now a reap process handler registered. 1188 self.assertIn(transport.pid, process.reapProcessHandlers) 1189 1190 # Kill the process cleanly, triggering an error in the protocol. 1191 transport.loseConnection() 1192 connected.addCallback(cbConnected) 1193 1194 def checkTerminated(ignored): 1195 # The exception was logged. 1196 excs = self.flushLoggedErrors(RuntimeError) 1197 self.assertEqual(len(excs), 1) 1198 # The process is no longer scheduled for reaping. 1199 self.assertNotIn(pid[0], process.reapProcessHandlers) 1200 ended.addCallback(checkTerminated) 1201 1202 return ended 1203 1204 1205 1206class MockSignal(object): 1207 """ 1208 Neuter L{signal.signal}, but pass other attributes unscathed 1209 """ 1210 def signal(self, sig, action): 1211 return signal.getsignal(sig) 1212 1213 def __getattr__(self, attr): 1214 return getattr(signal, attr) 1215 1216 1217class MockOS(object): 1218 """ 1219 The mock OS: overwrite L{os}, L{fcntl} and {sys} functions with fake ones. 1220 1221 @ivar exited: set to True when C{_exit} is called. 1222 @type exited: C{bool} 1223 1224 @ivar O_RDWR: dumb value faking C{os.O_RDWR}. 1225 @type O_RDWR: C{int} 1226 1227 @ivar O_NOCTTY: dumb value faking C{os.O_NOCTTY}. 1228 @type O_NOCTTY: C{int} 1229 1230 @ivar WNOHANG: dumb value faking C{os.WNOHANG}. 1231 @type WNOHANG: C{int} 1232 1233 @ivar raiseFork: if not C{None}, subsequent calls to fork will raise this 1234 object. 1235 @type raiseFork: C{NoneType} or C{Exception} 1236 1237 @ivar raiseExec: if set, subsequent calls to execvpe will raise an error. 1238 @type raiseExec: C{bool} 1239 1240 @ivar fdio: fake file object returned by calls to fdopen. 1241 @type fdio: C{StringIO.StringIO} 1242 1243 @ivar actions: hold names of some actions executed by the object, in order 1244 of execution. 1245 1246 @type actions: C{list} of C{str} 1247 1248 @ivar closed: keep track of the file descriptor closed. 1249 @type closed: C{list} of C{int} 1250 1251 @ivar child: whether fork return for the child or the parent. 1252 @type child: C{bool} 1253 1254 @ivar pipeCount: count the number of time that C{os.pipe} has been called. 1255 @type pipeCount: C{int} 1256 1257 @ivar raiseWaitPid: if set, subsequent calls to waitpid will raise 1258 the error specified. 1259 @type raiseWaitPid: C{None} or a class 1260 1261 @ivar waitChild: if set, subsequent calls to waitpid will return it. 1262 @type waitChild: C{None} or a tuple 1263 1264 @ivar euid: the uid returned by the fake C{os.geteuid} 1265 @type euid: C{int} 1266 1267 @ivar egid: the gid returned by the fake C{os.getegid} 1268 @type egid: C{int} 1269 1270 @ivar seteuidCalls: stored results of C{os.seteuid} calls. 1271 @type seteuidCalls: C{list} 1272 1273 @ivar setegidCalls: stored results of C{os.setegid} calls. 1274 @type setegidCalls: C{list} 1275 1276 @ivar path: the path returned by C{os.path.expanduser}. 1277 @type path: C{str} 1278 1279 @ivar raiseKill: if set, subsequent call to kill will raise the error 1280 specified. 1281 @type raiseKill: C{None} or an exception instance. 1282 1283 @ivar readData: data returned by C{os.read}. 1284 @type readData: C{str} 1285 """ 1286 exited = False 1287 raiseExec = False 1288 fdio = None 1289 child = True 1290 raiseWaitPid = None 1291 raiseFork = None 1292 waitChild = None 1293 euid = 0 1294 egid = 0 1295 path = None 1296 raiseKill = None 1297 readData = "" 1298 1299 def __init__(self): 1300 """ 1301 Initialize data structures. 1302 """ 1303 self.actions = [] 1304 self.closed = [] 1305 self.pipeCount = 0 1306 self.O_RDWR = -1 1307 self.O_NOCTTY = -2 1308 self.WNOHANG = -4 1309 self.WEXITSTATUS = lambda x: 0 1310 self.WIFEXITED = lambda x: 1 1311 self.seteuidCalls = [] 1312 self.setegidCalls = [] 1313 1314 1315 def open(self, dev, flags): 1316 """ 1317 Fake C{os.open}. Return a non fd number to be sure it's not used 1318 elsewhere. 1319 """ 1320 return -3 1321 1322 1323 def fstat(self, fd): 1324 """ 1325 Fake C{os.fstat}. Return a C{os.stat_result} filled with garbage. 1326 """ 1327 return os.stat_result((0,) * 10) 1328 1329 1330 def fdopen(self, fd, flag): 1331 """ 1332 Fake C{os.fdopen}. Return a StringIO object whose content can be tested 1333 later via C{self.fdio}. 1334 """ 1335 self.fdio = StringIO.StringIO() 1336 return self.fdio 1337 1338 1339 def setsid(self): 1340 """ 1341 Fake C{os.setsid}. Save action. 1342 """ 1343 self.actions.append('setsid') 1344 1345 1346 def fork(self): 1347 """ 1348 Fake C{os.fork}. Save the action in C{self.actions}, and return 0 if 1349 C{self.child} is set, or a dumb number. 1350 """ 1351 self.actions.append(('fork', gc.isenabled())) 1352 if self.raiseFork is not None: 1353 raise self.raiseFork 1354 elif self.child: 1355 # Child result is 0 1356 return 0 1357 else: 1358 return 21 1359 1360 1361 def close(self, fd): 1362 """ 1363 Fake C{os.close}, saving the closed fd in C{self.closed}. 1364 """ 1365 self.closed.append(fd) 1366 1367 1368 def dup2(self, fd1, fd2): 1369 """ 1370 Fake C{os.dup2}. Do nothing. 1371 """ 1372 1373 1374 def write(self, fd, data): 1375 """ 1376 Fake C{os.write}. Save action. 1377 """ 1378 self.actions.append(("write", fd, data)) 1379 1380 1381 def read(self, fd, size): 1382 """ 1383 Fake C{os.read}: save action, and return C{readData} content. 1384 1385 @param fd: The file descriptor to read. 1386 1387 @param size: The maximum number of bytes to read. 1388 1389 @return: A fixed C{bytes} buffer. 1390 """ 1391 self.actions.append(('read', fd, size)) 1392 return self.readData 1393 1394 1395 def execvpe(self, command, args, env): 1396 """ 1397 Fake C{os.execvpe}. Save the action, and raise an error if 1398 C{self.raiseExec} is set. 1399 """ 1400 self.actions.append('exec') 1401 if self.raiseExec: 1402 raise RuntimeError("Bar") 1403 1404 1405 def pipe(self): 1406 """ 1407 Fake C{os.pipe}. Return non fd numbers to be sure it's not used 1408 elsewhere, and increment C{self.pipeCount}. This is used to uniquify 1409 the result. 1410 """ 1411 self.pipeCount += 1 1412 return - 2 * self.pipeCount + 1, - 2 * self.pipeCount 1413 1414 1415 def ttyname(self, fd): 1416 """ 1417 Fake C{os.ttyname}. Return a dumb string. 1418 """ 1419 return "foo" 1420 1421 1422 def _exit(self, code): 1423 """ 1424 Fake C{os._exit}. Save the action, set the C{self.exited} flag, and 1425 raise C{SystemError}. 1426 """ 1427 self.actions.append(('exit', code)) 1428 self.exited = True 1429 # Don't forget to raise an error, or you'll end up in parent 1430 # code path. 1431 raise SystemError() 1432 1433 1434 def ioctl(self, fd, flags, arg): 1435 """ 1436 Override C{fcntl.ioctl}. Do nothing. 1437 """ 1438 1439 1440 def setNonBlocking(self, fd): 1441 """ 1442 Override C{fdesc.setNonBlocking}. Do nothing. 1443 """ 1444 1445 1446 def waitpid(self, pid, options): 1447 """ 1448 Override C{os.waitpid}. Return values meaning that the child process 1449 has exited, save executed action. 1450 """ 1451 self.actions.append('waitpid') 1452 if self.raiseWaitPid is not None: 1453 raise self.raiseWaitPid 1454 if self.waitChild is not None: 1455 return self.waitChild 1456 return 1, 0 1457 1458 1459 def settrace(self, arg): 1460 """ 1461 Override C{sys.settrace} to keep coverage working. 1462 """ 1463 1464 1465 def getgid(self): 1466 """ 1467 Override C{os.getgid}. Return a dumb number. 1468 """ 1469 return 1235 1470 1471 1472 def getuid(self): 1473 """ 1474 Override C{os.getuid}. Return a dumb number. 1475 """ 1476 return 1237 1477 1478 1479 def setuid(self, val): 1480 """ 1481 Override C{os.setuid}. Do nothing. 1482 """ 1483 self.actions.append(('setuid', val)) 1484 1485 1486 def setgid(self, val): 1487 """ 1488 Override C{os.setgid}. Do nothing. 1489 """ 1490 self.actions.append(('setgid', val)) 1491 1492 1493 def setregid(self, val1, val2): 1494 """ 1495 Override C{os.setregid}. Do nothing. 1496 """ 1497 self.actions.append(('setregid', val1, val2)) 1498 1499 1500 def setreuid(self, val1, val2): 1501 """ 1502 Override C{os.setreuid}. Save the action. 1503 """ 1504 self.actions.append(('setreuid', val1, val2)) 1505 1506 1507 def switchUID(self, uid, gid): 1508 """ 1509 Override C{util.switchuid}. Save the action. 1510 """ 1511 self.actions.append(('switchuid', uid, gid)) 1512 1513 1514 def openpty(self): 1515 """ 1516 Override C{pty.openpty}, returning fake file descriptors. 1517 """ 1518 return -12, -13 1519 1520 1521 def chdir(self, path): 1522 """ 1523 Override C{os.chdir}. Save the action. 1524 1525 @param path: The path to change the current directory to. 1526 """ 1527 self.actions.append(('chdir', path)) 1528 1529 1530 def geteuid(self): 1531 """ 1532 Mock C{os.geteuid}, returning C{self.euid} instead. 1533 """ 1534 return self.euid 1535 1536 1537 def getegid(self): 1538 """ 1539 Mock C{os.getegid}, returning C{self.egid} instead. 1540 """ 1541 return self.egid 1542 1543 1544 def seteuid(self, egid): 1545 """ 1546 Mock C{os.seteuid}, store result. 1547 """ 1548 self.seteuidCalls.append(egid) 1549 1550 1551 def setegid(self, egid): 1552 """ 1553 Mock C{os.setegid}, store result. 1554 """ 1555 self.setegidCalls.append(egid) 1556 1557 1558 def expanduser(self, path): 1559 """ 1560 Mock C{os.path.expanduser}. 1561 """ 1562 return self.path 1563 1564 1565 def getpwnam(self, user): 1566 """ 1567 Mock C{pwd.getpwnam}. 1568 """ 1569 return 0, 0, 1, 2 1570 1571 1572 def listdir(self, path): 1573 """ 1574 Override C{os.listdir}, returning fake contents of '/dev/fd' 1575 """ 1576 return "-1", "-2" 1577 1578 1579 def kill(self, pid, signalID): 1580 """ 1581 Override C{os.kill}: save the action and raise C{self.raiseKill} if 1582 specified. 1583 """ 1584 self.actions.append(('kill', pid, signalID)) 1585 if self.raiseKill is not None: 1586 raise self.raiseKill 1587 1588 1589 def unlink(self, filename): 1590 """ 1591 Override C{os.unlink}. Save the action. 1592 1593 @param filename: The file name to remove. 1594 """ 1595 self.actions.append(('unlink', filename)) 1596 1597 1598 def umask(self, mask): 1599 """ 1600 Override C{os.umask}. Save the action. 1601 1602 @param mask: The new file mode creation mask. 1603 """ 1604 self.actions.append(('umask', mask)) 1605 1606 1607 def getpid(self): 1608 """ 1609 Return a fixed PID value. 1610 1611 @return: A fixed value. 1612 """ 1613 return 6789 1614 1615 1616if process is not None: 1617 class DumbProcessWriter(process.ProcessWriter): 1618 """ 1619 A fake L{process.ProcessWriter} used for tests. 1620 """ 1621 1622 def startReading(self): 1623 """ 1624 Here's the faking: don't do anything here. 1625 """ 1626 1627 1628 1629 class DumbProcessReader(process.ProcessReader): 1630 """ 1631 A fake L{process.ProcessReader} used for tests. 1632 """ 1633 1634 def startReading(self): 1635 """ 1636 Here's the faking: don't do anything here. 1637 """ 1638 1639 1640 1641 class DumbPTYProcess(process.PTYProcess): 1642 """ 1643 A fake L{process.PTYProcess} used for tests. 1644 """ 1645 1646 def startReading(self): 1647 """ 1648 Here's the faking: don't do anything here. 1649 """ 1650 1651 1652 1653class MockProcessTestCase(unittest.TestCase): 1654 """ 1655 Mock a process runner to test forked child code path. 1656 """ 1657 if process is None: 1658 skip = "twisted.internet.process is never used on Windows" 1659 1660 def setUp(self): 1661 """ 1662 Replace L{process} os, fcntl, sys, switchUID, fdesc and pty modules 1663 with the mock class L{MockOS}. 1664 """ 1665 if gc.isenabled(): 1666 self.addCleanup(gc.enable) 1667 else: 1668 self.addCleanup(gc.disable) 1669 self.mockos = MockOS() 1670 self.mockos.euid = 1236 1671 self.mockos.egid = 1234 1672 self.patch(process, "os", self.mockos) 1673 self.patch(process, "fcntl", self.mockos) 1674 self.patch(process, "sys", self.mockos) 1675 self.patch(process, "switchUID", self.mockos.switchUID) 1676 self.patch(process, "fdesc", self.mockos) 1677 self.patch(process.Process, "processReaderFactory", DumbProcessReader) 1678 self.patch(process.Process, "processWriterFactory", DumbProcessWriter) 1679 self.patch(process, "pty", self.mockos) 1680 1681 self.mocksig = MockSignal() 1682 self.patch(process, "signal", self.mocksig) 1683 1684 1685 def tearDown(self): 1686 """ 1687 Reset processes registered for reap. 1688 """ 1689 process.reapProcessHandlers = {} 1690 1691 1692 def test_mockFork(self): 1693 """ 1694 Test a classic spawnProcess. Check the path of the client code: 1695 fork, exec, exit. 1696 """ 1697 gc.enable() 1698 1699 cmd = '/mock/ouch' 1700 1701 d = defer.Deferred() 1702 p = TrivialProcessProtocol(d) 1703 try: 1704 reactor.spawnProcess(p, cmd, ['ouch'], env=None, 1705 usePTY=False) 1706 except SystemError: 1707 self.assert_(self.mockos.exited) 1708 self.assertEqual( 1709 self.mockos.actions, [("fork", False), "exec", ("exit", 1)]) 1710 else: 1711 self.fail("Should not be here") 1712 1713 # It should leave the garbage collector disabled. 1714 self.assertFalse(gc.isenabled()) 1715 1716 1717 def _mockForkInParentTest(self): 1718 """ 1719 Assert that in the main process, spawnProcess disables the garbage 1720 collector, calls fork, closes the pipe file descriptors it created for 1721 the child process, and calls waitpid. 1722 """ 1723 self.mockos.child = False 1724 cmd = '/mock/ouch' 1725 1726 d = defer.Deferred() 1727 p = TrivialProcessProtocol(d) 1728 reactor.spawnProcess(p, cmd, ['ouch'], env=None, 1729 usePTY=False) 1730 # It should close the first read pipe, and the 2 last write pipes 1731 self.assertEqual(set(self.mockos.closed), set([-1, -4, -6])) 1732 self.assertEqual(self.mockos.actions, [("fork", False), "waitpid"]) 1733 1734 1735 def test_mockForkInParentGarbageCollectorEnabled(self): 1736 """ 1737 The garbage collector should be enabled when L{reactor.spawnProcess} 1738 returns if it was initially enabled. 1739 1740 @see L{_mockForkInParentTest} 1741 """ 1742 gc.enable() 1743 self._mockForkInParentTest() 1744 self.assertTrue(gc.isenabled()) 1745 1746 1747 def test_mockForkInParentGarbageCollectorDisabled(self): 1748 """ 1749 The garbage collector should be disabled when L{reactor.spawnProcess} 1750 returns if it was initially disabled. 1751 1752 @see L{_mockForkInParentTest} 1753 """ 1754 gc.disable() 1755 self._mockForkInParentTest() 1756 self.assertFalse(gc.isenabled()) 1757 1758 1759 def test_mockForkTTY(self): 1760 """ 1761 Test a TTY spawnProcess: check the path of the client code: 1762 fork, exec, exit. 1763 """ 1764 cmd = '/mock/ouch' 1765 1766 d = defer.Deferred() 1767 p = TrivialProcessProtocol(d) 1768 self.assertRaises(SystemError, reactor.spawnProcess, p, cmd, ['ouch'], 1769 env=None, usePTY=True) 1770 self.assertTrue(self.mockos.exited) 1771 self.assertEqual( 1772 self.mockos.actions, 1773 [("fork", False), "setsid", "exec", ("exit", 1)]) 1774 1775 1776 def _mockWithForkError(self): 1777 """ 1778 Assert that if the fork call fails, no other process setup calls are 1779 made and that spawnProcess raises the exception fork raised. 1780 """ 1781 self.mockos.raiseFork = OSError(errno.EAGAIN, None) 1782 protocol = TrivialProcessProtocol(None) 1783 self.assertRaises(OSError, reactor.spawnProcess, protocol, None) 1784 self.assertEqual(self.mockos.actions, [("fork", False)]) 1785 1786 1787 def test_mockWithForkErrorGarbageCollectorEnabled(self): 1788 """ 1789 The garbage collector should be enabled when L{reactor.spawnProcess} 1790 raises because L{os.fork} raised, if it was initially enabled. 1791 """ 1792 gc.enable() 1793 self._mockWithForkError() 1794 self.assertTrue(gc.isenabled()) 1795 1796 1797 def test_mockWithForkErrorGarbageCollectorDisabled(self): 1798 """ 1799 The garbage collector should be disabled when 1800 L{reactor.spawnProcess} raises because L{os.fork} raised, if it was 1801 initially disabled. 1802 """ 1803 gc.disable() 1804 self._mockWithForkError() 1805 self.assertFalse(gc.isenabled()) 1806 1807 1808 def test_mockForkErrorCloseFDs(self): 1809 """ 1810 When C{os.fork} raises an exception, the file descriptors created 1811 before are closed and don't leak. 1812 """ 1813 self._mockWithForkError() 1814 self.assertEqual(set(self.mockos.closed), set([-1, -4, -6, -2, -3, -5])) 1815 1816 1817 def test_mockForkErrorGivenFDs(self): 1818 """ 1819 When C{os.forks} raises an exception and that file descriptors have 1820 been specified with the C{childFDs} arguments of 1821 L{reactor.spawnProcess}, they are not closed. 1822 """ 1823 self.mockos.raiseFork = OSError(errno.EAGAIN, None) 1824 protocol = TrivialProcessProtocol(None) 1825 self.assertRaises(OSError, reactor.spawnProcess, protocol, None, 1826 childFDs={0: -10, 1: -11, 2: -13}) 1827 self.assertEqual(self.mockos.actions, [("fork", False)]) 1828 self.assertEqual(self.mockos.closed, []) 1829 1830 # We can also put "r" or "w" to let twisted create the pipes 1831 self.assertRaises(OSError, reactor.spawnProcess, protocol, None, 1832 childFDs={0: "r", 1: -11, 2: -13}) 1833 self.assertEqual(set(self.mockos.closed), set([-1, -2])) 1834 1835 1836 def test_mockForkErrorClosePTY(self): 1837 """ 1838 When C{os.fork} raises an exception, the file descriptors created by 1839 C{pty.openpty} are closed and don't leak, when C{usePTY} is set to 1840 C{True}. 1841 """ 1842 self.mockos.raiseFork = OSError(errno.EAGAIN, None) 1843 protocol = TrivialProcessProtocol(None) 1844 self.assertRaises(OSError, reactor.spawnProcess, protocol, None, 1845 usePTY=True) 1846 self.assertEqual(self.mockos.actions, [("fork", False)]) 1847 self.assertEqual(set(self.mockos.closed), set([-12, -13])) 1848 1849 1850 def test_mockForkErrorPTYGivenFDs(self): 1851 """ 1852 If a tuple is passed to C{usePTY} to specify slave and master file 1853 descriptors and that C{os.fork} raises an exception, these file 1854 descriptors aren't closed. 1855 """ 1856 self.mockos.raiseFork = OSError(errno.EAGAIN, None) 1857 protocol = TrivialProcessProtocol(None) 1858 self.assertRaises(OSError, reactor.spawnProcess, protocol, None, 1859 usePTY=(-20, -21, 'foo')) 1860 self.assertEqual(self.mockos.actions, [("fork", False)]) 1861 self.assertEqual(self.mockos.closed, []) 1862 1863 1864 def test_mockWithExecError(self): 1865 """ 1866 Spawn a process but simulate an error during execution in the client 1867 path: C{os.execvpe} raises an error. It should close all the standard 1868 fds, try to print the error encountered, and exit cleanly. 1869 """ 1870 cmd = '/mock/ouch' 1871 1872 d = defer.Deferred() 1873 p = TrivialProcessProtocol(d) 1874 self.mockos.raiseExec = True 1875 try: 1876 reactor.spawnProcess(p, cmd, ['ouch'], env=None, 1877 usePTY=False) 1878 except SystemError: 1879 self.assert_(self.mockos.exited) 1880 self.assertEqual( 1881 self.mockos.actions, [("fork", False), "exec", ("exit", 1)]) 1882 # Check that fd have been closed 1883 self.assertIn(0, self.mockos.closed) 1884 self.assertIn(1, self.mockos.closed) 1885 self.assertIn(2, self.mockos.closed) 1886 # Check content of traceback 1887 self.assertIn("RuntimeError: Bar", self.mockos.fdio.getvalue()) 1888 else: 1889 self.fail("Should not be here") 1890 1891 1892 def test_mockSetUid(self): 1893 """ 1894 Try creating a process with setting its uid: it's almost the same path 1895 as the standard path, but with a C{switchUID} call before the exec. 1896 """ 1897 cmd = '/mock/ouch' 1898 1899 d = defer.Deferred() 1900 p = TrivialProcessProtocol(d) 1901 try: 1902 reactor.spawnProcess(p, cmd, ['ouch'], env=None, 1903 usePTY=False, uid=8080) 1904 except SystemError: 1905 self.assert_(self.mockos.exited) 1906 self.assertEqual( 1907 self.mockos.actions, 1908 [('fork', False), ('setuid', 0), ('setgid', 0), 1909 ('switchuid', 8080, 1234), 'exec', ('exit', 1)]) 1910 else: 1911 self.fail("Should not be here") 1912 1913 1914 def test_mockSetUidInParent(self): 1915 """ 1916 When spawning a child process with a UID different from the UID of the 1917 current process, the current process does not have its UID changed. 1918 """ 1919 self.mockos.child = False 1920 cmd = '/mock/ouch' 1921 1922 d = defer.Deferred() 1923 p = TrivialProcessProtocol(d) 1924 reactor.spawnProcess(p, cmd, ['ouch'], env=None, 1925 usePTY=False, uid=8080) 1926 self.assertEqual(self.mockos.actions, [('fork', False), 'waitpid']) 1927 1928 1929 def test_mockPTYSetUid(self): 1930 """ 1931 Try creating a PTY process with setting its uid: it's almost the same 1932 path as the standard path, but with a C{switchUID} call before the 1933 exec. 1934 """ 1935 cmd = '/mock/ouch' 1936 1937 d = defer.Deferred() 1938 p = TrivialProcessProtocol(d) 1939 try: 1940 reactor.spawnProcess(p, cmd, ['ouch'], env=None, 1941 usePTY=True, uid=8081) 1942 except SystemError: 1943 self.assertTrue(self.mockos.exited) 1944 self.assertEqual( 1945 self.mockos.actions, 1946 [('fork', False), 'setsid', ('setuid', 0), ('setgid', 0), 1947 ('switchuid', 8081, 1234), 'exec', ('exit', 1)]) 1948 else: 1949 self.fail("Should not be here") 1950 1951 1952 def test_mockPTYSetUidInParent(self): 1953 """ 1954 When spawning a child process with PTY and a UID different from the UID 1955 of the current process, the current process does not have its UID 1956 changed. 1957 """ 1958 self.mockos.child = False 1959 cmd = '/mock/ouch' 1960 1961 d = defer.Deferred() 1962 p = TrivialProcessProtocol(d) 1963 oldPTYProcess = process.PTYProcess 1964 try: 1965 process.PTYProcess = DumbPTYProcess 1966 reactor.spawnProcess(p, cmd, ['ouch'], env=None, 1967 usePTY=True, uid=8080) 1968 finally: 1969 process.PTYProcess = oldPTYProcess 1970 self.assertEqual(self.mockos.actions, [('fork', False), 'waitpid']) 1971 1972 1973 def test_mockWithWaitError(self): 1974 """ 1975 Test that reapProcess logs errors raised. 1976 """ 1977 self.mockos.child = False 1978 cmd = '/mock/ouch' 1979 self.mockos.waitChild = (0, 0) 1980 1981 d = defer.Deferred() 1982 p = TrivialProcessProtocol(d) 1983 proc = reactor.spawnProcess(p, cmd, ['ouch'], env=None, 1984 usePTY=False) 1985 self.assertEqual(self.mockos.actions, [("fork", False), "waitpid"]) 1986 1987 self.mockos.raiseWaitPid = OSError() 1988 proc.reapProcess() 1989 errors = self.flushLoggedErrors() 1990 self.assertEqual(len(errors), 1) 1991 errors[0].trap(OSError) 1992 1993 1994 def test_mockErrorECHILDInReapProcess(self): 1995 """ 1996 Test that reapProcess doesn't log anything when waitpid raises a 1997 C{OSError} with errno C{ECHILD}. 1998 """ 1999 self.mockos.child = False 2000 cmd = '/mock/ouch' 2001 self.mockos.waitChild = (0, 0) 2002 2003 d = defer.Deferred() 2004 p = TrivialProcessProtocol(d) 2005 proc = reactor.spawnProcess(p, cmd, ['ouch'], env=None, 2006 usePTY=False) 2007 self.assertEqual(self.mockos.actions, [("fork", False), "waitpid"]) 2008 2009 self.mockos.raiseWaitPid = OSError() 2010 self.mockos.raiseWaitPid.errno = errno.ECHILD 2011 # This should not produce any errors 2012 proc.reapProcess() 2013 2014 2015 def test_mockErrorInPipe(self): 2016 """ 2017 If C{os.pipe} raises an exception after some pipes where created, the 2018 created pipes are closed and don't leak. 2019 """ 2020 pipes = [-1, -2, -3, -4] 2021 def pipe(): 2022 try: 2023 return pipes.pop(0), pipes.pop(0) 2024 except IndexError: 2025 raise OSError() 2026 self.mockos.pipe = pipe 2027 protocol = TrivialProcessProtocol(None) 2028 self.assertRaises(OSError, reactor.spawnProcess, protocol, None) 2029 self.assertEqual(self.mockos.actions, []) 2030 self.assertEqual(set(self.mockos.closed), set([-4, -3, -2, -1])) 2031 2032 2033 def test_kill(self): 2034 """ 2035 L{process.Process.signalProcess} calls C{os.kill} translating the given 2036 signal string to the PID. 2037 """ 2038 self.mockos.child = False 2039 self.mockos.waitChild = (0, 0) 2040 cmd = '/mock/ouch' 2041 p = TrivialProcessProtocol(None) 2042 proc = reactor.spawnProcess(p, cmd, ['ouch'], env=None, usePTY=False) 2043 proc.signalProcess("KILL") 2044 self.assertEqual(self.mockos.actions, 2045 [('fork', False), 'waitpid', ('kill', 21, signal.SIGKILL)]) 2046 2047 2048 def test_killExited(self): 2049 """ 2050 L{process.Process.signalProcess} raises L{error.ProcessExitedAlready} 2051 if the process has exited. 2052 """ 2053 self.mockos.child = False 2054 cmd = '/mock/ouch' 2055 p = TrivialProcessProtocol(None) 2056 proc = reactor.spawnProcess(p, cmd, ['ouch'], env=None, usePTY=False) 2057 # We didn't specify a waitpid value, so the waitpid call in 2058 # registerReapProcessHandler has already reaped the process 2059 self.assertRaises(error.ProcessExitedAlready, 2060 proc.signalProcess, "KILL") 2061 2062 2063 def test_killExitedButNotDetected(self): 2064 """ 2065 L{process.Process.signalProcess} raises L{error.ProcessExitedAlready} 2066 if the process has exited but that twisted hasn't seen it (for example, 2067 if the process has been waited outside of twisted): C{os.kill} then 2068 raise C{OSError} with C{errno.ESRCH} as errno. 2069 """ 2070 self.mockos.child = False 2071 self.mockos.waitChild = (0, 0) 2072 cmd = '/mock/ouch' 2073 p = TrivialProcessProtocol(None) 2074 proc = reactor.spawnProcess(p, cmd, ['ouch'], env=None, usePTY=False) 2075 self.mockos.raiseKill = OSError(errno.ESRCH, "Not found") 2076 self.assertRaises(error.ProcessExitedAlready, 2077 proc.signalProcess, "KILL") 2078 2079 2080 def test_killErrorInKill(self): 2081 """ 2082 L{process.Process.signalProcess} doesn't mask C{OSError} exceptions if 2083 the errno is different from C{errno.ESRCH}. 2084 """ 2085 self.mockos.child = False 2086 self.mockos.waitChild = (0, 0) 2087 cmd = '/mock/ouch' 2088 p = TrivialProcessProtocol(None) 2089 proc = reactor.spawnProcess(p, cmd, ['ouch'], env=None, usePTY=False) 2090 self.mockos.raiseKill = OSError(errno.EINVAL, "Invalid signal") 2091 err = self.assertRaises(OSError, 2092 proc.signalProcess, "KILL") 2093 self.assertEquals(err.errno, errno.EINVAL) 2094 2095 2096 2097class PosixProcessTestCase(unittest.TestCase, PosixProcessBase): 2098 # add two non-pty test cases 2099 2100 def test_stderr(self): 2101 """ 2102 Bytes written to stderr by the spawned process are passed to the 2103 C{errReceived} callback on the C{ProcessProtocol} passed to 2104 C{spawnProcess}. 2105 """ 2106 cmd = sys.executable 2107 2108 value = "42" 2109 2110 p = Accumulator() 2111 d = p.endedDeferred = defer.Deferred() 2112 reactor.spawnProcess(p, cmd, 2113 [cmd, "-c", 2114 "import sys; sys.stderr.write('%s')" % (value,)], 2115 env=None, path="/tmp", 2116 usePTY=self.usePTY) 2117 2118 def processEnded(ign): 2119 self.assertEqual(value, p.errF.getvalue()) 2120 return d.addCallback(processEnded) 2121 2122 2123 def testProcess(self): 2124 cmd = self.getCommand('gzip') 2125 s = "there's no place like home!\n" * 3 2126 p = Accumulator() 2127 d = p.endedDeferred = defer.Deferred() 2128 reactor.spawnProcess(p, cmd, [cmd, "-c"], env=None, path="/tmp", 2129 usePTY=self.usePTY) 2130 p.transport.write(s) 2131 p.transport.closeStdin() 2132 2133 def processEnded(ign): 2134 f = p.outF 2135 f.seek(0, 0) 2136 gf = gzip.GzipFile(fileobj=f) 2137 self.assertEqual(gf.read(), s) 2138 return d.addCallback(processEnded) 2139 2140 2141 2142class PosixProcessTestCasePTY(unittest.TestCase, PosixProcessBase): 2143 """ 2144 Just like PosixProcessTestCase, but use ptys instead of pipes. 2145 """ 2146 usePTY = True 2147 # PTYs only offer one input and one output. What still makes sense? 2148 # testNormalTermination 2149 # test_abnormalTermination 2150 # testSignal 2151 # testProcess, but not without p.transport.closeStdin 2152 # might be solveable: TODO: add test if so 2153 2154 def testOpeningTTY(self): 2155 exe = sys.executable 2156 scriptPath = util.sibpath(__file__, "process_tty.py") 2157 p = Accumulator() 2158 d = p.endedDeferred = defer.Deferred() 2159 reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None, 2160 path=None, usePTY=self.usePTY) 2161 p.transport.write("hello world!\n") 2162 2163 def processEnded(ign): 2164 self.assertRaises( 2165 error.ProcessExitedAlready, p.transport.signalProcess, 'HUP') 2166 self.assertEqual( 2167 p.outF.getvalue(), 2168 "hello world!\r\nhello world!\r\n", 2169 "Error message from process_tty follows:\n\n%s\n\n" % p.outF.getvalue()) 2170 return d.addCallback(processEnded) 2171 2172 2173 def testBadArgs(self): 2174 pyExe = sys.executable 2175 pyArgs = [pyExe, "-u", "-c", "print 'hello'"] 2176 p = Accumulator() 2177 self.assertRaises(ValueError, reactor.spawnProcess, p, pyExe, pyArgs, 2178 usePTY=1, childFDs={1:'r'}) 2179 2180 2181 2182class Win32SignalProtocol(SignalProtocol): 2183 """ 2184 A win32-specific process protocol that handles C{processEnded} 2185 differently: processes should exit with exit code 1. 2186 """ 2187 2188 def processEnded(self, reason): 2189 """ 2190 Callback C{self.deferred} with C{None} if C{reason} is a 2191 L{error.ProcessTerminated} failure with C{exitCode} set to 1. 2192 Otherwise, errback with a C{ValueError} describing the problem. 2193 """ 2194 if not reason.check(error.ProcessTerminated): 2195 return self.deferred.errback( 2196 ValueError("wrong termination: %s" % (reason,))) 2197 v = reason.value 2198 if v.exitCode != 1: 2199 return self.deferred.errback( 2200 ValueError("Wrong exit code: %s" % (reason.exitCode,))) 2201 self.deferred.callback(None) 2202 2203 2204 2205class Win32ProcessTestCase(unittest.TestCase): 2206 """ 2207 Test process programs that are packaged with twisted. 2208 """ 2209 2210 def testStdinReader(self): 2211 pyExe = sys.executable 2212 scriptPath = util.sibpath(__file__, "process_stdinreader.py") 2213 p = Accumulator() 2214 d = p.endedDeferred = defer.Deferred() 2215 reactor.spawnProcess(p, pyExe, [pyExe, "-u", scriptPath], env=None, 2216 path=None) 2217 p.transport.write("hello, world") 2218 p.transport.closeStdin() 2219 2220 def processEnded(ign): 2221 self.assertEqual(p.errF.getvalue(), "err\nerr\n") 2222 self.assertEqual(p.outF.getvalue(), "out\nhello, world\nout\n") 2223 return d.addCallback(processEnded) 2224 2225 2226 def testBadArgs(self): 2227 pyExe = sys.executable 2228 pyArgs = [pyExe, "-u", "-c", "print 'hello'"] 2229 p = Accumulator() 2230 self.assertRaises(ValueError, 2231 reactor.spawnProcess, p, pyExe, pyArgs, uid=1) 2232 self.assertRaises(ValueError, 2233 reactor.spawnProcess, p, pyExe, pyArgs, gid=1) 2234 self.assertRaises(ValueError, 2235 reactor.spawnProcess, p, pyExe, pyArgs, usePTY=1) 2236 self.assertRaises(ValueError, 2237 reactor.spawnProcess, p, pyExe, pyArgs, childFDs={1:'r'}) 2238 2239 2240 def _testSignal(self, sig): 2241 exe = sys.executable 2242 scriptPath = util.sibpath(__file__, "process_signal.py") 2243 d = defer.Deferred() 2244 p = Win32SignalProtocol(d, sig) 2245 reactor.spawnProcess(p, exe, [exe, "-u", scriptPath], env=None) 2246 return d 2247 2248 2249 def test_signalTERM(self): 2250 """ 2251 Sending the SIGTERM signal terminates a created process, and 2252 C{processEnded} is called with a L{error.ProcessTerminated} instance 2253 with the C{exitCode} attribute set to 1. 2254 """ 2255 return self._testSignal('TERM') 2256 2257 2258 def test_signalINT(self): 2259 """ 2260 Sending the SIGINT signal terminates a created process, and 2261 C{processEnded} is called with a L{error.ProcessTerminated} instance 2262 with the C{exitCode} attribute set to 1. 2263 """ 2264 return self._testSignal('INT') 2265 2266 2267 def test_signalKILL(self): 2268 """ 2269 Sending the SIGKILL signal terminates a created process, and 2270 C{processEnded} is called with a L{error.ProcessTerminated} instance 2271 with the C{exitCode} attribute set to 1. 2272 """ 2273 return self._testSignal('KILL') 2274 2275 2276 def test_closeHandles(self): 2277 """ 2278 The win32 handles should be properly closed when the process exits. 2279 """ 2280 import win32api 2281 2282 connected = defer.Deferred() 2283 ended = defer.Deferred() 2284 2285 class SimpleProtocol(protocol.ProcessProtocol): 2286 """ 2287 A protocol that fires deferreds when connected and disconnected. 2288 """ 2289 def makeConnection(self, transport): 2290 connected.callback(transport) 2291 2292 def processEnded(self, reason): 2293 ended.callback(None) 2294 2295 p = SimpleProtocol() 2296 2297 pyExe = sys.executable 2298 pyArgs = [pyExe, "-u", "-c", "print 'hello'"] 2299 proc = reactor.spawnProcess(p, pyExe, pyArgs) 2300 2301 def cbConnected(transport): 2302 self.assertIdentical(transport, proc) 2303 # perform a basic validity test on the handles 2304 win32api.GetHandleInformation(proc.hProcess) 2305 win32api.GetHandleInformation(proc.hThread) 2306 # And save their values for later 2307 self.hProcess = proc.hProcess 2308 self.hThread = proc.hThread 2309 connected.addCallback(cbConnected) 2310 2311 def checkTerminated(ignored): 2312 # The attributes on the process object must be reset... 2313 self.assertIdentical(proc.pid, None) 2314 self.assertIdentical(proc.hProcess, None) 2315 self.assertIdentical(proc.hThread, None) 2316 # ...and the handles must be closed. 2317 self.assertRaises(win32api.error, 2318 win32api.GetHandleInformation, self.hProcess) 2319 self.assertRaises(win32api.error, 2320 win32api.GetHandleInformation, self.hThread) 2321 ended.addCallback(checkTerminated) 2322 2323 return defer.gatherResults([connected, ended]) 2324 2325 2326 2327class Win32UnicodeEnvironmentTest(unittest.TestCase): 2328 """ 2329 Tests for Unicode environment on Windows 2330 """ 2331 goodKey = u'UNICODE' 2332 goodValue = u'UNICODE' 2333 2334 def test_encodableUnicodeEnvironment(self): 2335 """ 2336 Test C{os.environ} (inherited by every subprocess on Windows) that 2337 contains an ascii-encodable Unicode string. This is different from 2338 passing Unicode environment explicitly to spawnProcess (which is not 2339 supported). 2340 """ 2341 os.environ[self.goodKey] = self.goodValue 2342 self.addCleanup(operator.delitem, os.environ, self.goodKey) 2343 2344 p = GetEnvironmentDictionary.run(reactor, [], {}) 2345 def gotEnvironment(environ): 2346 self.assertEqual( 2347 environ[self.goodKey.encode('ascii')], 2348 self.goodValue.encode('ascii')) 2349 return p.getResult().addCallback(gotEnvironment) 2350 2351 2352 2353class Dumbwin32procPidTest(unittest.TestCase): 2354 """ 2355 Simple test for the pid attribute of Process on win32. 2356 """ 2357 2358 def test_pid(self): 2359 """ 2360 Launch process with mock win32process. The only mock aspect of this 2361 module is that the pid of the process created will always be 42. 2362 """ 2363 from twisted.internet import _dumbwin32proc 2364 from twisted.test import mock_win32process 2365 self.patch(_dumbwin32proc, "win32process", mock_win32process) 2366 exe = sys.executable 2367 scriptPath = util.sibpath(__file__, "process_cmdline.py") 2368 2369 d = defer.Deferred() 2370 processProto = TrivialProcessProtocol(d) 2371 comspec = str(os.environ["COMSPEC"]) 2372 cmd = [comspec, "/c", exe, scriptPath] 2373 2374 p = _dumbwin32proc.Process(reactor, 2375 processProto, 2376 None, 2377 cmd, 2378 {}, 2379 None) 2380 self.assertEqual(42, p.pid) 2381 self.assertEqual("<Process pid=42>", repr(p)) 2382 2383 def pidCompleteCb(result): 2384 self.assertEqual(None, p.pid) 2385 return d.addCallback(pidCompleteCb) 2386 2387 2388 2389class UtilTestCase(unittest.TestCase): 2390 """ 2391 Tests for process-related helper functions (currently only 2392 L{procutils.which}. 2393 """ 2394 def setUp(self): 2395 """ 2396 Create several directories and files, some of which are executable 2397 and some of which are not. Save the current PATH setting. 2398 """ 2399 j = os.path.join 2400 2401 base = self.mktemp() 2402 2403 self.foo = j(base, "foo") 2404 self.baz = j(base, "baz") 2405 self.foobar = j(self.foo, "bar") 2406 self.foobaz = j(self.foo, "baz") 2407 self.bazfoo = j(self.baz, "foo") 2408 self.bazbar = j(self.baz, "bar") 2409 2410 for d in self.foobar, self.foobaz, self.bazfoo, self.bazbar: 2411 os.makedirs(d) 2412 2413 for name, mode in [(j(self.foobaz, "executable"), 0700), 2414 (j(self.foo, "executable"), 0700), 2415 (j(self.bazfoo, "executable"), 0700), 2416 (j(self.bazfoo, "executable.bin"), 0700), 2417 (j(self.bazbar, "executable"), 0)]: 2418 f = file(name, "w") 2419 f.close() 2420 os.chmod(name, mode) 2421 2422 self.oldPath = os.environ.get('PATH', None) 2423 os.environ['PATH'] = os.pathsep.join(( 2424 self.foobar, self.foobaz, self.bazfoo, self.bazbar)) 2425 2426 2427 def tearDown(self): 2428 """ 2429 Restore the saved PATH setting, and set all created files readable 2430 again so that they can be deleted easily. 2431 """ 2432 os.chmod(os.path.join(self.bazbar, "executable"), stat.S_IWUSR) 2433 if self.oldPath is None: 2434 try: 2435 del os.environ['PATH'] 2436 except KeyError: 2437 pass 2438 else: 2439 os.environ['PATH'] = self.oldPath 2440 2441 2442 def test_whichWithoutPATH(self): 2443 """ 2444 Test that if C{os.environ} does not have a C{'PATH'} key, 2445 L{procutils.which} returns an empty list. 2446 """ 2447 del os.environ['PATH'] 2448 self.assertEqual(procutils.which("executable"), []) 2449 2450 2451 def testWhich(self): 2452 j = os.path.join 2453 paths = procutils.which("executable") 2454 expectedPaths = [j(self.foobaz, "executable"), 2455 j(self.bazfoo, "executable")] 2456 if runtime.platform.isWindows(): 2457 expectedPaths.append(j(self.bazbar, "executable")) 2458 self.assertEqual(paths, expectedPaths) 2459 2460 2461 def testWhichPathExt(self): 2462 j = os.path.join 2463 old = os.environ.get('PATHEXT', None) 2464 os.environ['PATHEXT'] = os.pathsep.join(('.bin', '.exe', '.sh')) 2465 try: 2466 paths = procutils.which("executable") 2467 finally: 2468 if old is None: 2469 del os.environ['PATHEXT'] 2470 else: 2471 os.environ['PATHEXT'] = old 2472 expectedPaths = [j(self.foobaz, "executable"), 2473 j(self.bazfoo, "executable"), 2474 j(self.bazfoo, "executable.bin")] 2475 if runtime.platform.isWindows(): 2476 expectedPaths.append(j(self.bazbar, "executable")) 2477 self.assertEqual(paths, expectedPaths) 2478 2479 2480 2481class ClosingPipesProcessProtocol(protocol.ProcessProtocol): 2482 output = '' 2483 errput = '' 2484 2485 def __init__(self, outOrErr): 2486 self.deferred = defer.Deferred() 2487 self.outOrErr = outOrErr 2488 2489 def processEnded(self, reason): 2490 self.deferred.callback(reason) 2491 2492 def outReceived(self, data): 2493 self.output += data 2494 2495 def errReceived(self, data): 2496 self.errput += data 2497 2498 2499 2500class ClosingPipes(unittest.TestCase): 2501 2502 def doit(self, fd): 2503 """ 2504 Create a child process and close one of its output descriptors using 2505 L{IProcessTransport.closeStdout} or L{IProcessTransport.closeStderr}. 2506 Return a L{Deferred} which fires after verifying that the descriptor was 2507 really closed. 2508 """ 2509 p = ClosingPipesProcessProtocol(True) 2510 self.assertFailure(p.deferred, error.ProcessTerminated) 2511 p.deferred.addCallback(self._endProcess, p) 2512 reactor.spawnProcess( 2513 p, sys.executable, [ 2514 sys.executable, '-u', '-c', 2515 'raw_input()\n' 2516 'import sys, os, time\n' 2517 # Give the system a bit of time to notice the closed 2518 # descriptor. Another option would be to poll() for HUP 2519 # instead of relying on an os.write to fail with SIGPIPE. 2520 # However, that wouldn't work on OS X (or Windows?). 2521 'for i in range(1000):\n' 2522 ' os.write(%d, "foo\\n")\n' 2523 ' time.sleep(0.01)\n' 2524 'sys.exit(42)\n' % (fd,) 2525 ], 2526 env=None) 2527 2528 if fd == 1: 2529 p.transport.closeStdout() 2530 elif fd == 2: 2531 p.transport.closeStderr() 2532 else: 2533 raise RuntimeError 2534 2535 # Give the close time to propagate 2536 p.transport.write('go\n') 2537 2538 # make the buggy case not hang 2539 p.transport.closeStdin() 2540 return p.deferred 2541 2542 2543 def _endProcess(self, reason, p): 2544 """ 2545 Check that a failed write prevented the process from getting to its 2546 custom exit code. 2547 """ 2548 # child must not get past that write without raising 2549 self.assertNotEquals( 2550 reason.exitCode, 42, 'process reason was %r' % reason) 2551 self.assertEqual(p.output, '') 2552 return p.errput 2553 2554 2555 def test_stdout(self): 2556 """ 2557 ProcessProtocol.transport.closeStdout actually closes the pipe. 2558 """ 2559 d = self.doit(1) 2560 def _check(errput): 2561 self.assertIn('OSError', errput) 2562 if runtime.platform.getType() != 'win32': 2563 self.assertIn('Broken pipe', errput) 2564 d.addCallback(_check) 2565 return d 2566 2567 2568 def test_stderr(self): 2569 """ 2570 ProcessProtocol.transport.closeStderr actually closes the pipe. 2571 """ 2572 d = self.doit(2) 2573 def _check(errput): 2574 # there should be no stderr open, so nothing for it to 2575 # write the error to. 2576 self.assertEqual(errput, '') 2577 d.addCallback(_check) 2578 return d 2579 2580 2581skipMessage = "wrong platform or reactor doesn't support IReactorProcess" 2582if (runtime.platform.getType() != 'posix') or (not interfaces.IReactorProcess(reactor, None)): 2583 PosixProcessTestCase.skip = skipMessage 2584 PosixProcessTestCasePTY.skip = skipMessage 2585 TestTwoProcessesPosix.skip = skipMessage 2586 FDTest.skip = skipMessage 2587 2588if (runtime.platform.getType() != 'win32') or (not interfaces.IReactorProcess(reactor, None)): 2589 Win32ProcessTestCase.skip = skipMessage 2590 TestTwoProcessesNonPosix.skip = skipMessage 2591 Dumbwin32procPidTest.skip = skipMessage 2592 Win32UnicodeEnvironmentTest.skip = skipMessage 2593 2594if not interfaces.IReactorProcess(reactor, None): 2595 ProcessTestCase.skip = skipMessage 2596 ClosingPipes.skip = skipMessage 2597 2598