1# -*- coding: utf-8 -*- 2import signal 3class Alarm(Exception): 4 pass 5 6class BasicTest: 7 def alarmHandler(self, signum, frame): 8 raise Alarm 9 10 def withTimeout(self, seconds, cmd, *args, **kwds): 11 # Windows doesn't have SIGALRM 12 try: 13 signal.signal(signal.SIGALRM, self.alarmHandler) 14 signal.alarm(seconds) 15 ret = cmd(*args, **kwds) 16 signal.alarm(0) # reset the alarm 17 except AttributeError: 18 ret = cmd(*args, **kwds) 19 return ret 20 21 def communicateFlush(self, string, process): 22 if string: 23 process.stdin.write(string.encode('utf-8')) 24 process.stdin.write(b'\0') 25 process.stdin.flush() 26 27 output = [] 28 char = None 29 try: 30 char = self.withTimeout(2, process.stdout.read, 1) 31 except Alarm: 32 pass 33 while char and char != b'\0': 34 output.append(char) 35 try: 36 char = self.withTimeout(2, process.stdout.read, 1) 37 except Alarm: 38 break # send what we got up till now 39 40 return b"".join(output).decode('utf-8').replace('\r\n', '\n') 41