1import pipes 2from functools import wraps, partial 3from io import TextIOWrapper 4from itertools import chain 5from subprocess import STDOUT, PIPE 6from tempfile import TemporaryFile 7from testfixtures.compat import basestring, PY3, zip_longest, reduce, PY2 8from testfixtures.utils import extend_docstring 9 10from .mock import Mock, call 11 12 13def shell_join(command): 14 if not isinstance(command, basestring): 15 command = " ".join(pipes.quote(part) for part in command) 16 return command 17 18 19class PopenBehaviour(object): 20 """ 21 An object representing the behaviour of a :class:`MockPopen` when 22 simulating a particular command. 23 """ 24 25 def __init__(self, stdout=b'', stderr=b'', returncode=0, pid=1234, 26 poll_count=3): 27 self.stdout = stdout 28 self.stderr = stderr 29 self.returncode = returncode 30 self.pid = pid 31 self.poll_count = poll_count 32 33 34def record(func): 35 @wraps(func) 36 def recorder(self, *args, **kw): 37 self._record((func.__name__,), *args, **kw) 38 return func(self, *args, **kw) 39 return recorder 40 41 42class MockPopenInstance(object): 43 """ 44 A mock process as returned by :class:`MockPopen`. 45 """ 46 47 #: A :class:`~unittest.mock.Mock` representing the pipe into this process. 48 #: This is only set if ``stdin=PIPE`` is passed the constructor. 49 #: The mock records writes and closes in :attr:`MockPopen.all_calls`. 50 stdin = None 51 52 #: A file representing standard output from this process. 53 stdout = None 54 55 #: A file representing error output from this process. 56 stderr = None 57 58 def __init__(self, mock_class, root_call, 59 args, bufsize=0, executable=None, 60 stdin=None, stdout=None, stderr=None, 61 preexec_fn=None, close_fds=False, shell=False, cwd=None, 62 env=None, universal_newlines=False, 63 startupinfo=None, creationflags=0, restore_signals=True, 64 start_new_session=False, pass_fds=(), 65 encoding=None, errors=None, text=None): 66 self.mock = Mock() 67 self.class_instance_mock = mock_class.mock.Popen_instance 68 #: A :func:`unittest.mock.call` representing the call made to instantiate 69 #: this mock process. 70 self.root_call = root_call 71 #: The calls made on this mock process, represented using 72 #: :func:`~unittest.mock.call` instances. 73 self.calls = [] 74 self.all_calls = mock_class.all_calls 75 76 cmd = shell_join(args) 77 78 behaviour = mock_class.commands.get(cmd, mock_class.default_behaviour) 79 if behaviour is None: 80 raise KeyError('Nothing specified for command %r' % cmd) 81 82 if callable(behaviour): 83 behaviour = behaviour(command=cmd, stdin=stdin) 84 85 self.behaviour = behaviour 86 87 stdout_value = behaviour.stdout 88 stderr_value = behaviour.stderr 89 90 if stderr == STDOUT: 91 line_iterator = chain.from_iterable(zip_longest( 92 stdout_value.splitlines(True), 93 stderr_value.splitlines(True) 94 )) 95 stdout_value = b''.join(l for l in line_iterator if l) 96 stderr_value = None 97 98 self.poll_count = behaviour.poll_count 99 for name, option, mock_value in ( 100 ('stdout', stdout, stdout_value), 101 ('stderr', stderr, stderr_value) 102 ): 103 value = None 104 if option is PIPE: 105 value = TemporaryFile() 106 value.write(mock_value) 107 value.flush() 108 value.seek(0) 109 if PY3 and (universal_newlines or text or encoding): 110 value = TextIOWrapper(value, encoding=encoding, errors=errors) 111 setattr(self, name, value) 112 113 if stdin == PIPE: 114 self.stdin = Mock() 115 for method in 'write', 'close': 116 record_writes = partial(self._record, ('stdin', method)) 117 getattr(self.stdin, method).side_effect = record_writes 118 119 self.pid = behaviour.pid 120 #: The return code of this mock process. 121 self.returncode = None 122 if PY3: 123 self.args = args 124 125 def _record(self, names, *args, **kw): 126 for mock in self.class_instance_mock, self.mock: 127 reduce(getattr, names, mock)(*args, **kw) 128 for base_call, store in ( 129 (call, self.calls), 130 (self.root_call, self.all_calls) 131 ): 132 store.append(reduce(getattr, names, base_call)(*args, **kw)) 133 134 if PY3: 135 def __enter__(self): 136 return self 137 138 def __exit__(self, exc_type, exc_val, exc_tb): 139 self.wait() 140 for stream in self.stdout, self.stderr: 141 if stream: 142 stream.close() 143 144 @record 145 def wait(self, timeout=None): 146 "Simulate calls to :meth:`subprocess.Popen.wait`" 147 self.returncode = self.behaviour.returncode 148 return self.returncode 149 150 @record 151 def communicate(self, input=None, timeout=None): 152 "Simulate calls to :meth:`subprocess.Popen.communicate`" 153 self.returncode = self.behaviour.returncode 154 return (self.stdout and self.stdout.read(), 155 self.stderr and self.stderr.read()) 156 else: 157 @record 158 def wait(self): 159 "Simulate calls to :meth:`subprocess.Popen.wait`" 160 self.returncode = self.behaviour.returncode 161 return self.returncode 162 163 @record 164 def communicate(self, input=None): 165 "Simulate calls to :meth:`subprocess.Popen.communicate`" 166 self.returncode = self.behaviour.returncode 167 return (self.stdout and self.stdout.read(), 168 self.stderr and self.stderr.read()) 169 170 @record 171 def poll(self): 172 "Simulate calls to :meth:`subprocess.Popen.poll`" 173 while self.poll_count and self.returncode is None: 174 self.poll_count -= 1 175 return None 176 # This call to wait() is NOT how poll() behaves in reality. 177 # poll() NEVER sets the returncode. 178 # The returncode is *only* ever set by process completion. 179 # The following is an artifact of the fixture's implementation. 180 self.returncode = self.behaviour.returncode 181 return self.returncode 182 183 @record 184 def send_signal(self, signal): 185 "Simulate calls to :meth:`subprocess.Popen.send_signal`" 186 pass 187 188 @record 189 def terminate(self): 190 "Simulate calls to :meth:`subprocess.Popen.terminate`" 191 pass 192 193 @record 194 def kill(self): 195 "Simulate calls to :meth:`subprocess.Popen.kill`" 196 pass 197 198 199class MockPopen(object): 200 """ 201 A specialised mock for testing use of :class:`subprocess.Popen`. 202 An instance of this class can be used in place of the 203 :class:`subprocess.Popen` and is often inserted where it's needed using 204 :func:`unittest.mock.patch` or a :class:`~testfixtures.Replacer`. 205 """ 206 207 default_behaviour = None 208 209 def __init__(self): 210 self.commands = {} 211 self.mock = Mock() 212 #: All calls made using this mock and the objects it returns, represented using 213 #: :func:`~unittest.mock.call` instances. 214 self.all_calls = [] 215 216 def _resolve_behaviour(self, stdout, stderr, returncode, 217 pid, poll_count, behaviour): 218 if behaviour is None: 219 return PopenBehaviour( 220 stdout, stderr, returncode, pid, poll_count 221 ) 222 else: 223 return behaviour 224 225 def set_command(self, command, stdout=b'', stderr=b'', returncode=0, 226 pid=1234, poll_count=3, behaviour=None): 227 """ 228 Set the behaviour of this mock when it is used to simulate the 229 specified command. 230 231 :param command: A string representing the command to be simulated. 232 """ 233 self.commands[shell_join(command)] = self._resolve_behaviour( 234 stdout, stderr, returncode, pid, poll_count, behaviour 235 ) 236 237 def set_default(self, stdout=b'', stderr=b'', returncode=0, 238 pid=1234, poll_count=3, behaviour=None): 239 """ 240 Set the behaviour of this mock when it is used to simulate commands 241 that have no explicit behavior specified using 242 :meth:`~MockPopen.set_command` or :meth:`~MockPopen.set_callable`. 243 """ 244 self.default_behaviour = self._resolve_behaviour( 245 stdout, stderr, returncode, pid, poll_count, behaviour 246 ) 247 248 def __call__(self, *args, **kw): 249 self.mock.Popen(*args, **kw) 250 root_call = call.Popen(*args, **kw) 251 self.all_calls.append(root_call) 252 return MockPopenInstance(self, root_call, *args, **kw) 253 254 255set_command_params = """ 256:param stdout: 257 A string representing the simulated content written by the process 258 to the stdout pipe. 259:param stderr: 260 A string representing the simulated content written by the process 261 to the stderr pipe. 262:param returncode: 263 An integer representing the return code of the simulated process. 264:param pid: 265 An integer representing the process identifier of the simulated 266 process. This is useful if you have code the prints out the pids 267 of running processes. 268:param poll_count: 269 Specifies the number of times :meth:`MockPopen.poll` can be 270 called before :attr:`MockPopen.returncode` is set and returned 271 by :meth:`MockPopen.poll`. 272 273If supplied, ``behaviour`` must be either a :class:`PopenBehaviour` 274instance or a callable that takes the ``command`` string representing 275the command to be simulated and the ``stdin`` for that command and 276returns a :class:`PopenBehaviour` instance. 277""" 278 279 280# add the param docs, so we only have one copy of them! 281extend_docstring(set_command_params, 282 [MockPopen.set_command, MockPopen.set_default]) 283