1# -*- test-case-name: twisted.test.test_process -*- 2# Copyright (c) Twisted Matrix Laboratories. 3# See LICENSE for details. 4 5""" 6UNIX Process management. 7 8Do NOT use this module directly - use reactor.spawnProcess() instead. 9 10Maintainer: Itamar Shtull-Trauring 11""" 12 13import errno 14import gc 15import io 16import os 17import signal 18import stat 19import sys 20import traceback 21from typing import Callable, Dict, Optional 22 23from zope.interface import implementer 24 25from twisted.internet import abstract, error, fdesc 26from twisted.internet._baseprocess import BaseProcess 27from twisted.internet.interfaces import IProcessTransport 28from twisted.internet.main import CONNECTION_DONE, CONNECTION_LOST 29from twisted.python import failure, log 30from twisted.python.runtime import platform 31from twisted.python.util import switchUID 32 33if platform.isWindows(): 34 raise ImportError( 35 "twisted.internet.process does not work on Windows. " 36 "Use the reactor.spawnProcess() API instead." 37 ) 38 39try: 40 import pty as _pty 41except ImportError: 42 pty = None 43else: 44 pty = _pty 45 46try: 47 import fcntl as _fcntl 48 import termios 49except ImportError: 50 fcntl = None 51else: 52 fcntl = _fcntl 53 54# Some people were importing this, which is incorrect, just keeping it 55# here for backwards compatibility: 56ProcessExitedAlready = error.ProcessExitedAlready 57 58reapProcessHandlers: Dict[int, Callable] = {} 59 60 61def reapAllProcesses(): 62 """ 63 Reap all registered processes. 64 """ 65 # Coerce this to a list, as reaping the process changes the dictionary and 66 # causes a "size changed during iteration" exception 67 for process in list(reapProcessHandlers.values()): 68 process.reapProcess() 69 70 71def registerReapProcessHandler(pid, process): 72 """ 73 Register a process handler for the given pid, in case L{reapAllProcesses} 74 is called. 75 76 @param pid: the pid of the process. 77 @param process: a process handler. 78 """ 79 if pid in reapProcessHandlers: 80 raise RuntimeError("Try to register an already registered process.") 81 try: 82 auxPID, status = os.waitpid(pid, os.WNOHANG) 83 except BaseException: 84 log.msg(f"Failed to reap {pid}:") 85 log.err() 86 87 if pid is None: 88 return 89 90 auxPID = None 91 if auxPID: 92 process.processEnded(status) 93 else: 94 # if auxPID is 0, there are children but none have exited 95 reapProcessHandlers[pid] = process 96 97 98def unregisterReapProcessHandler(pid, process): 99 """ 100 Unregister a process handler previously registered with 101 L{registerReapProcessHandler}. 102 """ 103 if not (pid in reapProcessHandlers and reapProcessHandlers[pid] == process): 104 raise RuntimeError("Try to unregister a process not registered.") 105 del reapProcessHandlers[pid] 106 107 108class ProcessWriter(abstract.FileDescriptor): 109 """ 110 (Internal) Helper class to write into a Process's input pipe. 111 112 I am a helper which describes a selectable asynchronous writer to a 113 process's input pipe, including stdin. 114 115 @ivar enableReadHack: A flag which determines how readability on this 116 write descriptor will be handled. If C{True}, then readability may 117 indicate the reader for this write descriptor has been closed (ie, 118 the connection has been lost). If C{False}, then readability events 119 are ignored. 120 """ 121 122 connected = 1 123 ic = 0 124 enableReadHack = False 125 126 def __init__(self, reactor, proc, name, fileno, forceReadHack=False): 127 """ 128 Initialize, specifying a Process instance to connect to. 129 """ 130 abstract.FileDescriptor.__init__(self, reactor) 131 fdesc.setNonBlocking(fileno) 132 self.proc = proc 133 self.name = name 134 self.fd = fileno 135 136 if not stat.S_ISFIFO(os.fstat(self.fileno()).st_mode): 137 # If the fd is not a pipe, then the read hack is never 138 # applicable. This case arises when ProcessWriter is used by 139 # StandardIO and stdout is redirected to a normal file. 140 self.enableReadHack = False 141 elif forceReadHack: 142 self.enableReadHack = True 143 else: 144 # Detect if this fd is actually a write-only fd. If it's 145 # valid to read, don't try to detect closing via read. 146 # This really only means that we cannot detect a TTY's write 147 # pipe being closed. 148 try: 149 os.read(self.fileno(), 0) 150 except OSError: 151 # It's a write-only pipe end, enable hack 152 self.enableReadHack = True 153 154 if self.enableReadHack: 155 self.startReading() 156 157 def fileno(self): 158 """ 159 Return the fileno() of my process's stdin. 160 """ 161 return self.fd 162 163 def writeSomeData(self, data): 164 """ 165 Write some data to the open process. 166 """ 167 rv = fdesc.writeToFD(self.fd, data) 168 if rv == len(data) and self.enableReadHack: 169 # If the send buffer is now empty and it is necessary to monitor 170 # this descriptor for readability to detect close, try detecting 171 # readability now. 172 self.startReading() 173 return rv 174 175 def write(self, data): 176 self.stopReading() 177 abstract.FileDescriptor.write(self, data) 178 179 def doRead(self): 180 """ 181 The only way a write pipe can become "readable" is at EOF, because the 182 child has closed it, and we're using a reactor which doesn't 183 distinguish between readable and closed (such as the select reactor). 184 185 Except that's not true on linux < 2.6.11. It has the following 186 characteristics: write pipe is completely empty => POLLOUT (writable in 187 select), write pipe is not completely empty => POLLIN (readable in 188 select), write pipe's reader closed => POLLIN|POLLERR (readable and 189 writable in select) 190 191 That's what this funky code is for. If linux was not broken, this 192 function could be simply "return CONNECTION_LOST". 193 """ 194 if self.enableReadHack: 195 return CONNECTION_LOST 196 else: 197 self.stopReading() 198 199 def connectionLost(self, reason): 200 """ 201 See abstract.FileDescriptor.connectionLost. 202 """ 203 # At least on macOS 10.4, exiting while stdout is non-blocking can 204 # result in data loss. For some reason putting the file descriptor 205 # back into blocking mode seems to resolve this issue. 206 fdesc.setBlocking(self.fd) 207 208 abstract.FileDescriptor.connectionLost(self, reason) 209 self.proc.childConnectionLost(self.name, reason) 210 211 212class ProcessReader(abstract.FileDescriptor): 213 """ 214 ProcessReader 215 216 I am a selectable representation of a process's output pipe, such as 217 stdout and stderr. 218 """ 219 220 connected = True 221 222 def __init__(self, reactor, proc, name, fileno): 223 """ 224 Initialize, specifying a process to connect to. 225 """ 226 abstract.FileDescriptor.__init__(self, reactor) 227 fdesc.setNonBlocking(fileno) 228 self.proc = proc 229 self.name = name 230 self.fd = fileno 231 self.startReading() 232 233 def fileno(self): 234 """ 235 Return the fileno() of my process's stderr. 236 """ 237 return self.fd 238 239 def writeSomeData(self, data): 240 # the only time this is actually called is after .loseConnection Any 241 # actual write attempt would fail, so we must avoid that. This hack 242 # allows us to use .loseConnection on both readers and writers. 243 assert data == b"" 244 return CONNECTION_LOST 245 246 def doRead(self): 247 """ 248 This is called when the pipe becomes readable. 249 """ 250 return fdesc.readFromFD(self.fd, self.dataReceived) 251 252 def dataReceived(self, data): 253 self.proc.childDataReceived(self.name, data) 254 255 def loseConnection(self): 256 if self.connected and not self.disconnecting: 257 self.disconnecting = 1 258 self.stopReading() 259 self.reactor.callLater( 260 0, self.connectionLost, failure.Failure(CONNECTION_DONE) 261 ) 262 263 def connectionLost(self, reason): 264 """ 265 Close my end of the pipe, signal the Process (which signals the 266 ProcessProtocol). 267 """ 268 abstract.FileDescriptor.connectionLost(self, reason) 269 self.proc.childConnectionLost(self.name, reason) 270 271 272class _BaseProcess(BaseProcess): 273 """ 274 Base class for Process and PTYProcess. 275 """ 276 277 status: Optional[int] = None 278 pid = None 279 280 def reapProcess(self): 281 """ 282 Try to reap a process (without blocking) via waitpid. 283 284 This is called when sigchild is caught or a Process object loses its 285 "connection" (stdout is closed) This ought to result in reaping all 286 zombie processes, since it will be called twice as often as it needs 287 to be. 288 289 (Unfortunately, this is a slightly experimental approach, since 290 UNIX has no way to be really sure that your process is going to 291 go away w/o blocking. I don't want to block.) 292 """ 293 try: 294 try: 295 pid, status = os.waitpid(self.pid, os.WNOHANG) 296 except OSError as e: 297 if e.errno == errno.ECHILD: 298 # no child process 299 pid = None 300 else: 301 raise 302 except BaseException: 303 log.msg(f"Failed to reap {self.pid}:") 304 log.err() 305 pid = None 306 if pid: 307 unregisterReapProcessHandler(pid, self) 308 self.processEnded(status) 309 310 def _getReason(self, status): 311 exitCode = sig = None 312 if os.WIFEXITED(status): 313 exitCode = os.WEXITSTATUS(status) 314 else: 315 sig = os.WTERMSIG(status) 316 if exitCode or sig: 317 return error.ProcessTerminated(exitCode, sig, status) 318 return error.ProcessDone(status) 319 320 def signalProcess(self, signalID): 321 """ 322 Send the given signal C{signalID} to the process. It'll translate a 323 few signals ('HUP', 'STOP', 'INT', 'KILL', 'TERM') from a string 324 representation to its int value, otherwise it'll pass directly the 325 value provided 326 327 @type signalID: C{str} or C{int} 328 """ 329 if signalID in ("HUP", "STOP", "INT", "KILL", "TERM"): 330 signalID = getattr(signal, f"SIG{signalID}") 331 if self.pid is None: 332 raise ProcessExitedAlready() 333 try: 334 os.kill(self.pid, signalID) 335 except OSError as e: 336 if e.errno == errno.ESRCH: 337 raise ProcessExitedAlready() 338 else: 339 raise 340 341 def _resetSignalDisposition(self): 342 # The Python interpreter ignores some signals, and our child 343 # process will inherit that behaviour. To have a child process 344 # that responds to signals normally, we need to reset our 345 # child process's signal handling (just) after we fork and 346 # before we execvpe. 347 for signalnum in range(1, signal.NSIG): 348 if signal.getsignal(signalnum) == signal.SIG_IGN: 349 # Reset signal handling to the default 350 signal.signal(signalnum, signal.SIG_DFL) 351 352 def _fork(self, path, uid, gid, executable, args, environment, **kwargs): 353 """ 354 Fork and then exec sub-process. 355 356 @param path: the path where to run the new process. 357 @type path: L{bytes} or L{unicode} 358 @param uid: if defined, the uid used to run the new process. 359 @type uid: L{int} 360 @param gid: if defined, the gid used to run the new process. 361 @type gid: L{int} 362 @param executable: the executable to run in a new process. 363 @type executable: L{str} 364 @param args: arguments used to create the new process. 365 @type args: L{list}. 366 @param environment: environment used for the new process. 367 @type environment: L{dict}. 368 @param kwargs: keyword arguments to L{_setupChild} method. 369 """ 370 collectorEnabled = gc.isenabled() 371 gc.disable() 372 try: 373 self.pid = os.fork() 374 except BaseException: 375 # Still in the parent process 376 if collectorEnabled: 377 gc.enable() 378 raise 379 else: 380 if self.pid == 0: 381 # A return value of 0 from fork() indicates that we are now 382 # executing in the child process. 383 384 # Do not put *ANY* code outside the try block. The child 385 # process must either exec or _exit. If it gets outside this 386 # block (due to an exception that is not handled here, but 387 # which might be handled higher up), there will be two copies 388 # of the parent running in parallel, doing all kinds of damage. 389 390 # After each change to this code, review it to make sure there 391 # are no exit paths. 392 393 try: 394 # Stop debugging. If I am, I don't care anymore. 395 sys.settrace(None) 396 self._setupChild(**kwargs) 397 self._execChild(path, uid, gid, executable, args, environment) 398 except BaseException: 399 # If there are errors, try to write something descriptive 400 # to stderr before exiting. 401 402 # The parent's stderr isn't *necessarily* fd 2 anymore, or 403 # even still available; however, even libc assumes that 404 # write(2, err) is a useful thing to attempt. 405 406 try: 407 # On Python 3, print_exc takes a text stream, but 408 # on Python 2 it still takes a byte stream. So on 409 # Python 3 we will wrap up the byte stream returned 410 # by os.fdopen using TextIOWrapper. 411 412 # We hard-code UTF-8 as the encoding here, rather 413 # than looking at something like 414 # getfilesystemencoding() or sys.stderr.encoding, 415 # because we want an encoding that will be able to 416 # encode the full range of code points. We are 417 # (most likely) talking to the parent process on 418 # the other end of this pipe and not the filesystem 419 # or the original sys.stderr, so there's no point 420 # in trying to match the encoding of one of those 421 # objects. 422 423 stderr = io.TextIOWrapper(os.fdopen(2, "wb"), encoding="utf-8") 424 msg = ("Upon execvpe {} {} in environment id {}" "\n:").format( 425 executable, str(args), id(environment) 426 ) 427 stderr.write(msg) 428 traceback.print_exc(file=stderr) 429 stderr.flush() 430 431 for fd in range(3): 432 os.close(fd) 433 except BaseException: 434 # Handle all errors during the error-reporting process 435 # silently to ensure that the child terminates. 436 pass 437 438 # See comment above about making sure that we reach this line 439 # of code. 440 os._exit(1) 441 442 # we are now in parent process 443 if collectorEnabled: 444 gc.enable() 445 self.status = -1 # this records the exit status of the child 446 447 def _setupChild(self, *args, **kwargs): 448 """ 449 Setup the child process. Override in subclasses. 450 """ 451 raise NotImplementedError() 452 453 def _execChild(self, path, uid, gid, executable, args, environment): 454 """ 455 The exec() which is done in the forked child. 456 """ 457 if path: 458 os.chdir(path) 459 if uid is not None or gid is not None: 460 if uid is None: 461 uid = os.geteuid() 462 if gid is None: 463 gid = os.getegid() 464 # set the UID before I actually exec the process 465 os.setuid(0) 466 os.setgid(0) 467 switchUID(uid, gid) 468 os.execvpe(executable, args, environment) 469 470 def __repr__(self) -> str: 471 """ 472 String representation of a process. 473 """ 474 return "<{} pid={} status={}>".format( 475 self.__class__.__name__, 476 self.pid, 477 self.status, 478 ) 479 480 481class _FDDetector: 482 """ 483 This class contains the logic necessary to decide which of the available 484 system techniques should be used to detect the open file descriptors for 485 the current process. The chosen technique gets monkey-patched into the 486 _listOpenFDs method of this class so that the detection only needs to occur 487 once. 488 489 @ivar listdir: The implementation of listdir to use. This gets overwritten 490 by the test cases. 491 @ivar getpid: The implementation of getpid to use, returns the PID of the 492 running process. 493 @ivar openfile: The implementation of open() to use, by default the Python 494 builtin. 495 """ 496 497 # So that we can unit test this 498 listdir = os.listdir 499 getpid = os.getpid 500 openfile = open 501 502 def __init__(self): 503 self._implementations = [ 504 self._procFDImplementation, 505 self._devFDImplementation, 506 self._fallbackFDImplementation, 507 ] 508 509 def _listOpenFDs(self): 510 """ 511 Return an iterable of file descriptors which I{may} be open in this 512 process. 513 514 This will try to return the fewest possible descriptors without missing 515 any. 516 """ 517 self._listOpenFDs = self._getImplementation() 518 return self._listOpenFDs() 519 520 def _getImplementation(self): 521 """ 522 Pick a method which gives correct results for C{_listOpenFDs} in this 523 runtime environment. 524 525 This involves a lot of very platform-specific checks, some of which may 526 be relatively expensive. Therefore the returned method should be saved 527 and re-used, rather than always calling this method to determine what it 528 is. 529 530 See the implementation for the details of how a method is selected. 531 """ 532 for impl in self._implementations: 533 try: 534 before = impl() 535 except BaseException: 536 continue 537 with self.openfile("/dev/null", "r"): 538 after = impl() 539 if before != after: 540 return impl 541 # If no implementation can detect the newly opened file above, then just 542 # return the last one. The last one should therefore always be one 543 # which makes a simple static guess which includes all possible open 544 # file descriptors, but perhaps also many other values which do not 545 # correspond to file descriptors. For example, the scheme implemented 546 # by _fallbackFDImplementation is suitable to be the last entry. 547 return impl 548 549 def _devFDImplementation(self): 550 """ 551 Simple implementation for systems where /dev/fd actually works. 552 See: http://www.freebsd.org/cgi/man.cgi?fdescfs 553 """ 554 dname = "/dev/fd" 555 result = [int(fd) for fd in self.listdir(dname)] 556 return result 557 558 def _procFDImplementation(self): 559 """ 560 Simple implementation for systems where /proc/pid/fd exists (we assume 561 it works). 562 """ 563 dname = "/proc/%d/fd" % (self.getpid(),) 564 return [int(fd) for fd in self.listdir(dname)] 565 566 def _fallbackFDImplementation(self): 567 """ 568 Fallback implementation where either the resource module can inform us 569 about the upper bound of how many FDs to expect, or where we just guess 570 a constant maximum if there is no resource module. 571 572 All possible file descriptors from 0 to that upper bound are returned 573 with no attempt to exclude invalid file descriptor values. 574 """ 575 try: 576 import resource 577 except ImportError: 578 maxfds = 1024 579 else: 580 # OS-X reports 9223372036854775808. That's a lot of fds to close. 581 # OS-X should get the /dev/fd implementation instead, so mostly 582 # this check probably isn't necessary. 583 maxfds = min(1024, resource.getrlimit(resource.RLIMIT_NOFILE)[1]) 584 return range(maxfds) 585 586 587detector = _FDDetector() 588 589 590def _listOpenFDs(): 591 """ 592 Use the global detector object to figure out which FD implementation to 593 use. 594 """ 595 return detector._listOpenFDs() 596 597 598@implementer(IProcessTransport) 599class Process(_BaseProcess): 600 """ 601 An operating-system Process. 602 603 This represents an operating-system process with arbitrary input/output 604 pipes connected to it. Those pipes may represent standard input, 605 standard output, and standard error, or any other file descriptor. 606 607 On UNIX, this is implemented using fork(), exec(), pipe() 608 and fcntl(). These calls may not exist elsewhere so this 609 code is not cross-platform. (also, windows can only select 610 on sockets...) 611 """ 612 613 debug = False 614 debug_child = False 615 616 status = -1 617 pid = None 618 619 processWriterFactory = ProcessWriter 620 processReaderFactory = ProcessReader 621 622 def __init__( 623 self, 624 reactor, 625 executable, 626 args, 627 environment, 628 path, 629 proto, 630 uid=None, 631 gid=None, 632 childFDs=None, 633 ): 634 """ 635 Spawn an operating-system process. 636 637 This is where the hard work of disconnecting all currently open 638 files / forking / executing the new process happens. (This is 639 executed automatically when a Process is instantiated.) 640 641 This will also run the subprocess as a given user ID and group ID, if 642 specified. (Implementation Note: this doesn't support all the arcane 643 nuances of setXXuid on UNIX: it will assume that either your effective 644 or real UID is 0.) 645 """ 646 if not proto: 647 assert "r" not in childFDs.values() 648 assert "w" not in childFDs.values() 649 _BaseProcess.__init__(self, proto) 650 651 self.pipes = {} 652 # keys are childFDs, we can sense them closing 653 # values are ProcessReader/ProcessWriters 654 655 helpers = {} 656 # keys are childFDs 657 # values are parentFDs 658 659 if childFDs is None: 660 childFDs = { 661 0: "w", # we write to the child's stdin 662 1: "r", # we read from their stdout 663 2: "r", # and we read from their stderr 664 } 665 666 debug = self.debug 667 if debug: 668 print("childFDs", childFDs) 669 670 _openedPipes = [] 671 672 def pipe(): 673 r, w = os.pipe() 674 _openedPipes.extend([r, w]) 675 return r, w 676 677 # fdmap.keys() are filenos of pipes that are used by the child. 678 fdmap = {} # maps childFD to parentFD 679 try: 680 for childFD, target in childFDs.items(): 681 if debug: 682 print("[%d]" % childFD, target) 683 if target == "r": 684 # we need a pipe that the parent can read from 685 readFD, writeFD = pipe() 686 if debug: 687 print("readFD=%d, writeFD=%d" % (readFD, writeFD)) 688 fdmap[childFD] = writeFD # child writes to this 689 helpers[childFD] = readFD # parent reads from this 690 elif target == "w": 691 # we need a pipe that the parent can write to 692 readFD, writeFD = pipe() 693 if debug: 694 print("readFD=%d, writeFD=%d" % (readFD, writeFD)) 695 fdmap[childFD] = readFD # child reads from this 696 helpers[childFD] = writeFD # parent writes to this 697 else: 698 assert type(target) == int, f"{target!r} should be an int" 699 fdmap[childFD] = target # parent ignores this 700 if debug: 701 print("fdmap", fdmap) 702 if debug: 703 print("helpers", helpers) 704 # the child only cares about fdmap.values() 705 706 self._fork(path, uid, gid, executable, args, environment, fdmap=fdmap) 707 except BaseException: 708 for pipe in _openedPipes: 709 os.close(pipe) 710 raise 711 712 # we are the parent process: 713 self.proto = proto 714 715 # arrange for the parent-side pipes to be read and written 716 for childFD, parentFD in helpers.items(): 717 os.close(fdmap[childFD]) 718 if childFDs[childFD] == "r": 719 reader = self.processReaderFactory(reactor, self, childFD, parentFD) 720 self.pipes[childFD] = reader 721 722 if childFDs[childFD] == "w": 723 writer = self.processWriterFactory( 724 reactor, self, childFD, parentFD, forceReadHack=True 725 ) 726 self.pipes[childFD] = writer 727 728 try: 729 # the 'transport' is used for some compatibility methods 730 if self.proto is not None: 731 self.proto.makeConnection(self) 732 except BaseException: 733 log.err() 734 735 # The reactor might not be running yet. This might call back into 736 # processEnded synchronously, triggering an application-visible 737 # callback. That's probably not ideal. The replacement API for 738 # spawnProcess should improve upon this situation. 739 registerReapProcessHandler(self.pid, self) 740 741 def _setupChild(self, fdmap): 742 """ 743 fdmap[childFD] = parentFD 744 745 The child wants to end up with 'childFD' attached to what used to be 746 the parent's parentFD. As an example, a bash command run like 747 'command 2>&1' would correspond to an fdmap of {0:0, 1:1, 2:1}. 748 'command >foo.txt' would be {0:0, 1:os.open('foo.txt'), 2:2}. 749 750 This is accomplished in two steps:: 751 752 1. close all file descriptors that aren't values of fdmap. This 753 means 0 .. maxfds (or just the open fds within that range, if 754 the platform supports '/proc/<pid>/fd'). 755 756 2. for each childFD:: 757 758 - if fdmap[childFD] == childFD, the descriptor is already in 759 place. Make sure the CLOEXEC flag is not set, then delete 760 the entry from fdmap. 761 762 - if childFD is in fdmap.values(), then the target descriptor 763 is busy. Use os.dup() to move it elsewhere, update all 764 fdmap[childFD] items that point to it, then close the 765 original. Then fall through to the next case. 766 767 - now fdmap[childFD] is not in fdmap.values(), and is free. 768 Use os.dup2() to move it to the right place, then close the 769 original. 770 """ 771 debug = self.debug_child 772 if debug: 773 errfd = sys.stderr 774 errfd.write("starting _setupChild\n") 775 776 destList = fdmap.values() 777 for fd in _listOpenFDs(): 778 if fd in destList: 779 continue 780 if debug and fd == errfd.fileno(): 781 continue 782 try: 783 os.close(fd) 784 except BaseException: 785 pass 786 787 # at this point, the only fds still open are the ones that need to 788 # be moved to their appropriate positions in the child (the targets 789 # of fdmap, i.e. fdmap.values() ) 790 791 if debug: 792 print("fdmap", fdmap, file=errfd) 793 for child in sorted(fdmap.keys()): 794 target = fdmap[child] 795 if target == child: 796 # fd is already in place 797 if debug: 798 print("%d already in place" % target, file=errfd) 799 fdesc._unsetCloseOnExec(child) 800 else: 801 if child in fdmap.values(): 802 # we can't replace child-fd yet, as some other mapping 803 # still needs the fd it wants to target. We must preserve 804 # that old fd by duping it to a new home. 805 newtarget = os.dup(child) # give it a safe home 806 if debug: 807 print("os.dup(%d) -> %d" % (child, newtarget), file=errfd) 808 os.close(child) # close the original 809 for c, p in list(fdmap.items()): 810 if p == child: 811 fdmap[c] = newtarget # update all pointers 812 # now it should be available 813 if debug: 814 print("os.dup2(%d,%d)" % (target, child), file=errfd) 815 os.dup2(target, child) 816 817 # At this point, the child has everything it needs. We want to close 818 # everything that isn't going to be used by the child, i.e. 819 # everything not in fdmap.keys(). The only remaining fds open are 820 # those in fdmap.values(). 821 822 # Any given fd may appear in fdmap.values() multiple times, so we 823 # need to remove duplicates first. 824 825 old = [] 826 for fd in fdmap.values(): 827 if fd not in old: 828 if fd not in fdmap.keys(): 829 old.append(fd) 830 if debug: 831 print("old", old, file=errfd) 832 for fd in old: 833 os.close(fd) 834 835 self._resetSignalDisposition() 836 837 def writeToChild(self, childFD, data): 838 self.pipes[childFD].write(data) 839 840 def closeChildFD(self, childFD): 841 # for writer pipes, loseConnection tries to write the remaining data 842 # out to the pipe before closing it 843 # if childFD is not in the list of pipes, assume that it is already 844 # closed 845 if childFD in self.pipes: 846 self.pipes[childFD].loseConnection() 847 848 def pauseProducing(self): 849 for p in self.pipes.values(): 850 if isinstance(p, ProcessReader): 851 p.stopReading() 852 853 def resumeProducing(self): 854 for p in self.pipes.values(): 855 if isinstance(p, ProcessReader): 856 p.startReading() 857 858 # compatibility 859 def closeStdin(self): 860 """ 861 Call this to close standard input on this process. 862 """ 863 self.closeChildFD(0) 864 865 def closeStdout(self): 866 self.closeChildFD(1) 867 868 def closeStderr(self): 869 self.closeChildFD(2) 870 871 def loseConnection(self): 872 self.closeStdin() 873 self.closeStderr() 874 self.closeStdout() 875 876 def write(self, data): 877 """ 878 Call this to write to standard input on this process. 879 880 NOTE: This will silently lose data if there is no standard input. 881 """ 882 if 0 in self.pipes: 883 self.pipes[0].write(data) 884 885 def registerProducer(self, producer, streaming): 886 """ 887 Call this to register producer for standard input. 888 889 If there is no standard input producer.stopProducing() will 890 be called immediately. 891 """ 892 if 0 in self.pipes: 893 self.pipes[0].registerProducer(producer, streaming) 894 else: 895 producer.stopProducing() 896 897 def unregisterProducer(self): 898 """ 899 Call this to unregister producer for standard input.""" 900 if 0 in self.pipes: 901 self.pipes[0].unregisterProducer() 902 903 def writeSequence(self, seq): 904 """ 905 Call this to write to standard input on this process. 906 907 NOTE: This will silently lose data if there is no standard input. 908 """ 909 if 0 in self.pipes: 910 self.pipes[0].writeSequence(seq) 911 912 def childDataReceived(self, name, data): 913 self.proto.childDataReceived(name, data) 914 915 def childConnectionLost(self, childFD, reason): 916 # this is called when one of the helpers (ProcessReader or 917 # ProcessWriter) notices their pipe has been closed 918 os.close(self.pipes[childFD].fileno()) 919 del self.pipes[childFD] 920 try: 921 self.proto.childConnectionLost(childFD) 922 except BaseException: 923 log.err() 924 self.maybeCallProcessEnded() 925 926 def maybeCallProcessEnded(self): 927 # we don't call ProcessProtocol.processEnded until: 928 # the child has terminated, AND 929 # all writers have indicated an error status, AND 930 # all readers have indicated EOF 931 # This insures that we've gathered all output from the process. 932 if self.pipes: 933 return 934 if not self.lostProcess: 935 self.reapProcess() 936 return 937 _BaseProcess.maybeCallProcessEnded(self) 938 939 def getHost(self): 940 # ITransport.getHost 941 raise NotImplementedError() 942 943 def getPeer(self): 944 # ITransport.getPeer 945 raise NotImplementedError() 946 947 948@implementer(IProcessTransport) 949class PTYProcess(abstract.FileDescriptor, _BaseProcess): 950 """ 951 An operating-system Process that uses PTY support. 952 """ 953 954 status = -1 955 pid = None 956 957 def __init__( 958 self, 959 reactor, 960 executable, 961 args, 962 environment, 963 path, 964 proto, 965 uid=None, 966 gid=None, 967 usePTY=None, 968 ): 969 """ 970 Spawn an operating-system process. 971 972 This is where the hard work of disconnecting all currently open 973 files / forking / executing the new process happens. (This is 974 executed automatically when a Process is instantiated.) 975 976 This will also run the subprocess as a given user ID and group ID, if 977 specified. (Implementation Note: this doesn't support all the arcane 978 nuances of setXXuid on UNIX: it will assume that either your effective 979 or real UID is 0.) 980 """ 981 if pty is None and not isinstance(usePTY, (tuple, list)): 982 # no pty module and we didn't get a pty to use 983 raise NotImplementedError( 984 "cannot use PTYProcess on platforms without the pty module." 985 ) 986 abstract.FileDescriptor.__init__(self, reactor) 987 _BaseProcess.__init__(self, proto) 988 989 if isinstance(usePTY, (tuple, list)): 990 masterfd, slavefd, _ = usePTY 991 else: 992 masterfd, slavefd = pty.openpty() 993 994 try: 995 self._fork( 996 path, 997 uid, 998 gid, 999 executable, 1000 args, 1001 environment, 1002 masterfd=masterfd, 1003 slavefd=slavefd, 1004 ) 1005 except BaseException: 1006 if not isinstance(usePTY, (tuple, list)): 1007 os.close(masterfd) 1008 os.close(slavefd) 1009 raise 1010 1011 # we are now in parent process: 1012 os.close(slavefd) 1013 fdesc.setNonBlocking(masterfd) 1014 self.fd = masterfd 1015 self.startReading() 1016 self.connected = 1 1017 self.status = -1 1018 try: 1019 self.proto.makeConnection(self) 1020 except BaseException: 1021 log.err() 1022 registerReapProcessHandler(self.pid, self) 1023 1024 def _setupChild(self, masterfd, slavefd): 1025 """ 1026 Set up child process after C{fork()} but before C{exec()}. 1027 1028 This involves: 1029 1030 - closing C{masterfd}, since it is not used in the subprocess 1031 1032 - creating a new session with C{os.setsid} 1033 1034 - changing the controlling terminal of the process (and the new 1035 session) to point at C{slavefd} 1036 1037 - duplicating C{slavefd} to standard input, output, and error 1038 1039 - closing all other open file descriptors (according to 1040 L{_listOpenFDs}) 1041 1042 - re-setting all signal handlers to C{SIG_DFL} 1043 1044 @param masterfd: The master end of a PTY file descriptors opened with 1045 C{openpty}. 1046 @type masterfd: L{int} 1047 1048 @param slavefd: The slave end of a PTY opened with C{openpty}. 1049 @type slavefd: L{int} 1050 """ 1051 os.close(masterfd) 1052 os.setsid() 1053 fcntl.ioctl(slavefd, termios.TIOCSCTTY, "") 1054 1055 for fd in range(3): 1056 if fd != slavefd: 1057 os.close(fd) 1058 1059 os.dup2(slavefd, 0) # stdin 1060 os.dup2(slavefd, 1) # stdout 1061 os.dup2(slavefd, 2) # stderr 1062 1063 for fd in _listOpenFDs(): 1064 if fd > 2: 1065 try: 1066 os.close(fd) 1067 except BaseException: 1068 pass 1069 1070 self._resetSignalDisposition() 1071 1072 def closeStdin(self): 1073 # PTYs do not have stdin/stdout/stderr. They only have in and out, just 1074 # like sockets. You cannot close one without closing off the entire PTY 1075 pass 1076 1077 def closeStdout(self): 1078 pass 1079 1080 def closeStderr(self): 1081 pass 1082 1083 def doRead(self): 1084 """ 1085 Called when my standard output stream is ready for reading. 1086 """ 1087 return fdesc.readFromFD( 1088 self.fd, lambda data: self.proto.childDataReceived(1, data) 1089 ) 1090 1091 def fileno(self): 1092 """ 1093 This returns the file number of standard output on this process. 1094 """ 1095 return self.fd 1096 1097 def maybeCallProcessEnded(self): 1098 # two things must happen before we call the ProcessProtocol's 1099 # processEnded method. 1: the child process must die and be reaped 1100 # (which calls our own processEnded method). 2: the child must close 1101 # their stdin/stdout/stderr fds, causing the pty to close, causing 1102 # our connectionLost method to be called. #2 can also be triggered 1103 # by calling .loseConnection(). 1104 if self.lostProcess == 2: 1105 _BaseProcess.maybeCallProcessEnded(self) 1106 1107 def connectionLost(self, reason): 1108 """ 1109 I call this to clean up when one or all of my connections has died. 1110 """ 1111 abstract.FileDescriptor.connectionLost(self, reason) 1112 os.close(self.fd) 1113 self.lostProcess += 1 1114 self.maybeCallProcessEnded() 1115 1116 def writeSomeData(self, data): 1117 """ 1118 Write some data to the open process. 1119 """ 1120 return fdesc.writeToFD(self.fd, data) 1121 1122 def closeChildFD(self, descriptor): 1123 # IProcessTransport 1124 raise NotImplementedError() 1125 1126 def writeToChild(self, childFD, data): 1127 # IProcessTransport 1128 raise NotImplementedError() 1129