1################################################################################ 2""" 3 4Modification of http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440554 5 6""" 7 8#################################### IMPORTS ################################### 9 10import os 11import platform 12import subprocess 13import errno 14import time 15import sys 16import unittest 17import tempfile 18 19 20def geterror(): 21 return sys.exc_info()[1] 22 23 24if sys.version_info >= (3,): 25 null_byte = "\x00".encode("ascii") 26else: 27 null_byte = "\x00" 28 29if platform.system() == "Windows": 30 if sys.version_info >= (3,): 31 # Test date should be in ascii. 32 def encode(s): 33 return s.encode("ascii") 34 35 def decode(b): 36 return b.decode("ascii") 37 38 else: 39 # Strings only; do nothing 40 def encode(s): 41 return s 42 43 def decode(b): 44 return b 45 46 try: 47 import ctypes 48 from ctypes.wintypes import DWORD 49 50 kernel32 = ctypes.windll.kernel32 51 TerminateProcess = ctypes.windll.kernel32.TerminateProcess 52 53 def WriteFile(handle, data, ol=None): 54 c_written = DWORD() 55 success = ctypes.windll.kernel32.WriteFile( 56 handle, 57 ctypes.create_string_buffer(encode(data)), 58 len(data), 59 ctypes.byref(c_written), 60 ol, 61 ) 62 return ctypes.windll.kernel32.GetLastError(), c_written.value 63 64 def ReadFile(handle, desired_bytes, ol=None): 65 c_read = DWORD() 66 buffer = ctypes.create_string_buffer(desired_bytes + 1) 67 success = ctypes.windll.kernel32.ReadFile( 68 handle, buffer, desired_bytes, ctypes.byref(c_read), ol 69 ) 70 buffer[c_read.value] = null_byte 71 return ctypes.windll.kernel32.GetLastError(), decode(buffer.value) 72 73 def PeekNamedPipe(handle, desired_bytes): 74 c_avail = DWORD() 75 c_message = DWORD() 76 if desired_bytes > 0: 77 c_read = DWORD() 78 buffer = ctypes.create_string_buffer(desired_bytes + 1) 79 success = ctypes.windll.kernel32.PeekNamedPipe( 80 handle, 81 buffer, 82 desired_bytes, 83 ctypes.byref(c_read), 84 ctypes.byref(c_avail), 85 ctypes.byref(c_message), 86 ) 87 buffer[c_read.value] = null_byte 88 return decode(buffer.value), c_avail.value, c_message.value 89 else: 90 success = ctypes.windll.kernel32.PeekNamedPipe( 91 handle, 92 None, 93 desired_bytes, 94 None, 95 ctypes.byref(c_avail), 96 ctypes.byref(c_message), 97 ) 98 return "", c_avail.value, c_message.value 99 100 except ImportError: 101 from win32file import ReadFile, WriteFile 102 from win32pipe import PeekNamedPipe 103 from win32api import TerminateProcess 104 import msvcrt 105 106else: 107 from signal import SIGINT, SIGTERM, SIGKILL 108 import select 109 import fcntl 110 111################################### CONSTANTS ################################## 112 113PIPE = subprocess.PIPE 114 115################################################################################ 116 117 118class Popen(subprocess.Popen): 119 def recv(self, maxsize=None): 120 return self._recv("stdout", maxsize) 121 122 def recv_err(self, maxsize=None): 123 return self._recv("stderr", maxsize) 124 125 def send_recv(self, input="", maxsize=None): 126 return self.send(input), self.recv(maxsize), self.recv_err(maxsize) 127 128 def read_async(self, wait=0.1, e=1, tr=5, stderr=0): 129 if tr < 1: 130 tr = 1 131 x = time.time() + wait 132 y = [] 133 r = "" 134 pr = self.recv 135 if stderr: 136 pr = self.recv_err 137 while time.time() < x or r: 138 r = pr() 139 if r is None: 140 if e: 141 raise Exception("Other end disconnected!") 142 else: 143 break 144 elif r: 145 y.append(r) 146 else: 147 time.sleep(max((x - time.time()) / tr, 0)) 148 return "".join(y) 149 150 def send_all(self, data): 151 while len(data): 152 sent = self.send(data) 153 if sent is None: 154 raise Exception("Other end disconnected!") 155 data = buffer(data, sent) 156 157 def get_conn_maxsize(self, which, maxsize): 158 if maxsize is None: 159 maxsize = 1024 160 elif maxsize < 1: 161 maxsize = 1 162 return getattr(self, which), maxsize 163 164 def _close(self, which): 165 getattr(self, which).close() 166 setattr(self, which, None) 167 168 if platform.system() == "Windows": 169 170 def kill(self): 171 # Recipes 172 # http://me.in-berlin.de/doc/python/faq/windows.html#how-do-i-emulate-os-kill-in-windows 173 # http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/347462 174 175 """kill function for Win32""" 176 TerminateProcess(int(self._handle), 0) # returns None 177 178 def send(self, input): 179 if not self.stdin: 180 return None 181 182 try: 183 x = msvcrt.get_osfhandle(self.stdin.fileno()) 184 (errCode, written) = WriteFile(x, input) 185 except ValueError: 186 return self._close("stdin") 187 except (subprocess.pywintypes.error, Exception): 188 if geterror()[0] in (109, errno.ESHUTDOWN): 189 return self._close("stdin") 190 raise 191 192 return written 193 194 def _recv(self, which, maxsize): 195 conn, maxsize = self.get_conn_maxsize(which, maxsize) 196 if conn is None: 197 return None 198 199 try: 200 x = msvcrt.get_osfhandle(conn.fileno()) 201 (read, nAvail, nMessage) = PeekNamedPipe(x, 0) 202 if maxsize < nAvail: 203 nAvail = maxsize 204 if nAvail > 0: 205 (errCode, read) = ReadFile(x, nAvail, None) 206 except ValueError: 207 return self._close(which) 208 except (subprocess.pywintypes.error, Exception): 209 if geterror()[0] in (109, errno.ESHUTDOWN): 210 return self._close(which) 211 raise 212 213 if self.universal_newlines: 214 # Translate newlines. For Python 3.x assume read is text. 215 # If bytes then another solution is needed. 216 read = read.replace("\r\n", "\n").replace("\r", "\n") 217 return read 218 219 else: 220 221 def kill(self): 222 for i, sig in enumerate([SIGTERM, SIGKILL] * 2): 223 if i % 2 == 0: 224 os.kill(self.pid, sig) 225 time.sleep((i * (i % 2) / 5.0) + 0.01) 226 227 killed_pid, stat = os.waitpid(self.pid, os.WNOHANG) 228 if killed_pid != 0: 229 return 230 231 def send(self, input): 232 if not self.stdin: 233 return None 234 235 if not select.select([], [self.stdin], [], 0)[1]: 236 return 0 237 238 try: 239 written = os.write(self.stdin.fileno(), input) 240 except OSError: 241 if geterror()[0] == errno.EPIPE: # broken pipe 242 return self._close("stdin") 243 raise 244 245 return written 246 247 def _recv(self, which, maxsize): 248 conn, maxsize = self.get_conn_maxsize(which, maxsize) 249 if conn is None: 250 return None 251 252 if not select.select([conn], [], [], 0)[0]: 253 return "" 254 255 r = conn.read(maxsize) 256 if not r: 257 return self._close(which) 258 259 if self.universal_newlines: 260 r = r.replace("\r\n", "\n").replace("\r", "\n") 261 return r 262 263 264################################################################################ 265 266 267def proc_in_time_or_kill(cmd, time_out, wd=None, env=None): 268 proc = Popen( 269 cmd, 270 cwd=wd, 271 env=env, 272 stdin=subprocess.PIPE, 273 stdout=subprocess.PIPE, 274 stderr=subprocess.STDOUT, 275 universal_newlines=1, 276 ) 277 278 ret_code = None 279 response = [] 280 281 t = time.time() 282 while ret_code is None and ((time.time() - t) < time_out): 283 ret_code = proc.poll() 284 response += [proc.read_async(wait=0.1, e=0)] 285 286 if ret_code is None: 287 ret_code = '"Process timed out (time_out = %s secs) ' % time_out 288 try: 289 proc.kill() 290 ret_code += 'and was successfully terminated"' 291 except Exception: 292 ret_code += 'and termination failed (exception: %s)"' % (geterror(),) 293 294 return ret_code, "".join(response) 295 296 297################################################################################ 298 299 300class AsyncTest(unittest.TestCase): 301 def test_proc_in_time_or_kill(self): 302 ret_code, response = proc_in_time_or_kill( 303 [sys.executable, "-c", "while 1: pass"], time_out=1 304 ) 305 306 self.assertIn("rocess timed out", ret_code) 307 self.assertIn("successfully terminated", ret_code) 308 309 310################################################################################ 311 312if __name__ == "__main__": 313 unittest.main() 314