1from __future__ import absolute_import 2 3__all__ = ['create_subprocess_exec', 'create_subprocess_shell'] 4 5import subprocess 6 7from . import events 8from . import protocols 9from . import streams 10from . import tasks 11from .coroutines import coroutine, From, Return 12from .py33_exceptions import BrokenPipeError, ConnectionResetError 13from .log import logger 14 15 16PIPE = subprocess.PIPE 17STDOUT = subprocess.STDOUT 18if hasattr(subprocess, 'DEVNULL'): 19 DEVNULL = subprocess.DEVNULL 20 21 22class SubprocessStreamProtocol(streams.FlowControlMixin, 23 protocols.SubprocessProtocol): 24 """Like StreamReaderProtocol, but for a subprocess.""" 25 26 def __init__(self, limit, loop): 27 super(SubprocessStreamProtocol, self).__init__(loop=loop) 28 self._limit = limit 29 self.stdin = self.stdout = self.stderr = None 30 self._transport = None 31 32 def __repr__(self): 33 info = [self.__class__.__name__] 34 if self.stdin is not None: 35 info.append('stdin=%r' % self.stdin) 36 if self.stdout is not None: 37 info.append('stdout=%r' % self.stdout) 38 if self.stderr is not None: 39 info.append('stderr=%r' % self.stderr) 40 return '<%s>' % ' '.join(info) 41 42 def connection_made(self, transport): 43 self._transport = transport 44 45 stdout_transport = transport.get_pipe_transport(1) 46 if stdout_transport is not None: 47 self.stdout = streams.StreamReader(limit=self._limit, 48 loop=self._loop) 49 self.stdout.set_transport(stdout_transport) 50 51 stderr_transport = transport.get_pipe_transport(2) 52 if stderr_transport is not None: 53 self.stderr = streams.StreamReader(limit=self._limit, 54 loop=self._loop) 55 self.stderr.set_transport(stderr_transport) 56 57 stdin_transport = transport.get_pipe_transport(0) 58 if stdin_transport is not None: 59 self.stdin = streams.StreamWriter(stdin_transport, 60 protocol=self, 61 reader=None, 62 loop=self._loop) 63 64 def pipe_data_received(self, fd, data): 65 if fd == 1: 66 reader = self.stdout 67 elif fd == 2: 68 reader = self.stderr 69 else: 70 reader = None 71 if reader is not None: 72 reader.feed_data(data) 73 74 def pipe_connection_lost(self, fd, exc): 75 if fd == 0: 76 pipe = self.stdin 77 if pipe is not None: 78 pipe.close() 79 self.connection_lost(exc) 80 return 81 if fd == 1: 82 reader = self.stdout 83 elif fd == 2: 84 reader = self.stderr 85 else: 86 reader = None 87 if reader != None: 88 if exc is None: 89 reader.feed_eof() 90 else: 91 reader.set_exception(exc) 92 93 def process_exited(self): 94 self._transport.close() 95 self._transport = None 96 97 98class Process: 99 def __init__(self, transport, protocol, loop): 100 self._transport = transport 101 self._protocol = protocol 102 self._loop = loop 103 self.stdin = protocol.stdin 104 self.stdout = protocol.stdout 105 self.stderr = protocol.stderr 106 self.pid = transport.get_pid() 107 108 def __repr__(self): 109 return '<%s %s>' % (self.__class__.__name__, self.pid) 110 111 @property 112 def returncode(self): 113 return self._transport.get_returncode() 114 115 @coroutine 116 def wait(self): 117 """Wait until the process exit and return the process return code. 118 119 This method is a coroutine.""" 120 return_code = yield From(self._transport._wait()) 121 raise Return(return_code) 122 123 def send_signal(self, signal): 124 self._transport.send_signal(signal) 125 126 def terminate(self): 127 self._transport.terminate() 128 129 def kill(self): 130 self._transport.kill() 131 132 @coroutine 133 def _feed_stdin(self, input): 134 debug = self._loop.get_debug() 135 self.stdin.write(input) 136 if debug: 137 logger.debug('%r communicate: feed stdin (%s bytes)', 138 self, len(input)) 139 try: 140 yield From(self.stdin.drain()) 141 except (BrokenPipeError, ConnectionResetError) as exc: 142 # communicate() ignores BrokenPipeError and ConnectionResetError 143 if debug: 144 logger.debug('%r communicate: stdin got %r', self, exc) 145 146 if debug: 147 logger.debug('%r communicate: close stdin', self) 148 self.stdin.close() 149 150 @coroutine 151 def _noop(self): 152 return None 153 154 @coroutine 155 def _read_stream(self, fd): 156 transport = self._transport.get_pipe_transport(fd) 157 if fd == 2: 158 stream = self.stderr 159 else: 160 assert fd == 1 161 stream = self.stdout 162 if self._loop.get_debug(): 163 name = 'stdout' if fd == 1 else 'stderr' 164 logger.debug('%r communicate: read %s', self, name) 165 output = yield From(stream.read()) 166 if self._loop.get_debug(): 167 name = 'stdout' if fd == 1 else 'stderr' 168 logger.debug('%r communicate: close %s', self, name) 169 transport.close() 170 raise Return(output) 171 172 @coroutine 173 def communicate(self, input=None): 174 if input: 175 stdin = self._feed_stdin(input) 176 else: 177 stdin = self._noop() 178 if self.stdout is not None: 179 stdout = self._read_stream(1) 180 else: 181 stdout = self._noop() 182 if self.stderr is not None: 183 stderr = self._read_stream(2) 184 else: 185 stderr = self._noop() 186 stdin, stdout, stderr = yield From(tasks.gather(stdin, stdout, stderr, 187 loop=self._loop)) 188 yield From(self.wait()) 189 raise Return(stdout, stderr) 190 191 192@coroutine 193def create_subprocess_shell(cmd, **kwds): 194 stdin = kwds.pop('stdin', None) 195 stdout = kwds.pop('stdout', None) 196 stderr = kwds.pop('stderr', None) 197 loop = kwds.pop('loop', None) 198 limit = kwds.pop('limit', streams._DEFAULT_LIMIT) 199 if loop is None: 200 loop = events.get_event_loop() 201 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit, 202 loop=loop) 203 transport, protocol = yield From(loop.subprocess_shell( 204 protocol_factory, 205 cmd, stdin=stdin, stdout=stdout, 206 stderr=stderr, **kwds)) 207 raise Return(Process(transport, protocol, loop)) 208 209@coroutine 210def create_subprocess_exec(program, *args, **kwds): 211 stdin = kwds.pop('stdin', None) 212 stdout = kwds.pop('stdout', None) 213 stderr = kwds.pop('stderr', None) 214 loop = kwds.pop('loop', None) 215 limit = kwds.pop('limit', streams._DEFAULT_LIMIT) 216 if loop is None: 217 loop = events.get_event_loop() 218 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit, 219 loop=loop) 220 transport, protocol = yield From(loop.subprocess_exec( 221 protocol_factory, 222 program, *args, 223 stdin=stdin, stdout=stdout, 224 stderr=stderr, **kwds)) 225 raise Return(Process(transport, protocol, loop)) 226