1# -*- coding: utf-8 -*- 2from __future__ import division 3import os 4import sys 5import socket 6import signal 7import functools 8import atexit 9import tempfile 10from subprocess import Popen, PIPE, STDOUT 11from threading import Thread 12try: 13 from Queue import Queue, Empty 14except ImportError: 15 from queue import Queue, Empty 16from time import sleep 17try: 18 import simplejson as json 19except ImportError: 20 import json 21from .exceptions import CommandError, TimeoutWaitingFor 22 23ON_POSIX = 'posix' in sys.builtin_module_names 24 25# Directory relative to basetest module location 26CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) 27 28# Location of binary files (usually the src/ folder) 29BIN_PREFIX = os.path.abspath( 30 os.path.join(CURRENT_DIR, "..", "..", "src") 31) 32 33# Default location of test certificates 34DEFAULT_CERT_PATH = os.path.abspath( 35 os.path.join(CURRENT_DIR, "..", "test_certs") 36) 37 38# Default location of test extensions 39DEFAULT_EXTENSION_PATH = os.path.abspath( 40 os.path.join(CURRENT_DIR, "..", "test_extensions") 41) 42 43 44# Environment flags to control skipping of shared tests 45SHARED_SKIP = os.environ.get("SHARED_SKIP", False) 46# Environment flags to control use of PATH or in-tree binaries 47SHARED_USE_PATH = os.environ.get("SHARED_USE_PATH", False) 48 49UUID_REGEXP = ("[0-9A-Fa-f]{8}-" + ("[0-9A-Fa-f]{4}-" * 3) + "[0-9A-Fa-f]{12}") 50 51 52def shared_binary_location(cmd="shared"): 53 """ ../src/ is used by default. 54 """ 55 return os.path.join(BIN_PREFIX, cmd) 56 return binary_location(cmd, SHARED_USE_PATH) 57 58 59def binary_location(cmd, USE_PATH=False): 60 """ ../src/ is used by default. 61 """ 62 return os.path.join(BIN_PREFIX, cmd) 63 64 65def wait_condition(cond, timeout=1, sleeptime=.01): 66 """Wait for condition to return anything other than None 67 """ 68 # NOTE Increasing sleeptime can dramatically increase testsuite runtime 69 # It also reduces CPU load significantly 70 if timeout is None: 71 timeout = 1 72 73 if timeout < sleeptime: 74 print("Warning, timeout cannot be smaller than", sleeptime) 75 timeout = sleeptime 76 77 # Max number of attempts until giving up 78 tries = int(timeout / sleeptime) 79 80 for i in range(tries): 81 val = cond() 82 83 if val is not None: 84 break 85 86 sleep(sleeptime) 87 88 return val 89 90 91def wait_process(pid, timeout=None): 92 """Wait for process to finish 93 """ 94 def process(): 95 try: 96 os.kill(pid, 0) 97 except OSError: 98 # Process is dead 99 return True 100 else: 101 # Process is still ticking 102 return None 103 104 return wait_condition(process, timeout) 105 106 107def _queue_output(arguments, pidq, outputq): 108 """Read/Write output/input of given process. 109 This function is meant to be executed in a thread as it may block 110 """ 111 kwargs = arguments["process"] 112 input = arguments["input"] 113 114 try: 115 proc = Popen(**kwargs) 116 except OSError as e: 117 # pid None is read by the main thread as a crash of the process 118 pidq.put(None) 119 120 outputq.put(( 121 "", 122 ("Unexpected exception caught during execution: '{0}' . ".format(e)), 123 255)) # false exitcode 124 125 return 126 127 # Put the PID in the queue for main process to know. 128 pidq.put(proc.pid) 129 130 # Send input and wait for finish 131 out, err = proc.communicate(input) 132 133 if sys.version_info > (3,): 134 out, err = out.decode('utf-8'), err.decode('utf-8') 135 136 # Give the output back to the caller 137 outputq.put((out, err, proc.returncode)) 138 139 140def _retrieve_output(thread, timeout, queue, thread_error): 141 """Fetch output from binary subprocess queues 142 """ 143 # Try to join the thread on failure abort 144 thread.join(timeout) 145 if thread.isAlive(): 146 # Join should have killed the thread. This is unexpected 147 raise TimeoutWaitingFor(thread_error + ". Unexpected error") 148 149 # Thread died so we should have output 150 try: 151 # data = (stdout, stderr, exitcode) 152 data = queue.get(timeout=timeout) 153 except Empty: 154 data = TimeoutWaitingFor("streams from program") 155 156 return data 157 158 159def _get_output(arguments, timeout=None): 160 """Collect output from the subprocess without blocking the main process if 161 subprocess hangs. 162 """ 163 # NOTE Increase this value if tests fail with None being received as 164 # stdout/stderr instead of the expected content 165 output_timeout = 0.1 # seconds 166 167 pidq = Queue() 168 outputq = Queue() 169 170 t = Thread(target=_queue_output, args=(arguments, pidq, outputq)) 171 t.daemon = True 172 t.start() 173 174 try: 175 pid = pidq.get(timeout=timeout) 176 except Empty: 177 pid = None 178 179 # Process crashed or timed out for some reason 180 if pid is None: 181 return _retrieve_output(t, output_timeout, outputq, 182 "Program to start") 183 184 # Wait for process to finish (normal execution) 185 state = wait_process(pid, timeout) 186 187 if state: 188 # Process finished 189 return _retrieve_output(t, output_timeout, outputq, 190 "Program thread to join") 191 192 # If we reach this point we assume the process got stuck or timed out 193 for sig in (signal.SIGABRT, signal.SIGTERM, signal.SIGKILL): 194 # Start with lower signals and escalate if process ignores them 195 try: 196 os.kill(pid, signal.SIGABRT) 197 except OSError as e: 198 # 3 means the process finished/died between last check and now 199 if e.errno != 3: 200 raise 201 202 # Wait for process to finish (should die/exit after signal) 203 state = wait_process(pid, timeout) 204 205 if state: 206 # Process finished 207 return _retrieve_output(t, output_timeout, outputq, 208 "Program to die") 209 210 # This should never happen but in case something goes really bad 211 raise OSError("Program stopped responding and couldn't be killed") 212 213 214def run_cmd_wait(cmd, input=None, stdout=PIPE, stderr=PIPE, 215 merge_streams=False, env=os.environ, timeout=None): 216 "Run a subprocess and wait for it to finish" 217 218 if input is None: 219 stdin = None 220 else: 221 stdin = PIPE 222 223 if merge_streams: 224 stderr = STDOUT 225 else: 226 stderr = PIPE 227 228 arguments = { 229 "process": { 230 "args": cmd, 231 "stdin": stdin, 232 "stdout": stdout, 233 "stderr": stderr, 234 "bufsize": 1, 235 "close_fds": ON_POSIX, 236 "env": env, 237 }, 238 "input": input, 239 } 240 out, err, exit = _get_output(arguments, timeout) 241 242 if merge_streams: 243 if exit != 0: 244 raise CommandError(cmd, exit, out) 245 else: 246 return exit, out 247 else: 248 if exit != 0: 249 raise CommandError(cmd, exit, out, err) 250 else: 251 return exit, out, err 252 253 254def run_cmd_wait_nofail(*args, **kwargs): 255 "Same as run_cmd_wait but silence the exception if it happens" 256 try: 257 return run_cmd_wait(*args, **kwargs) 258 except CommandError as e: 259 return e.code, e.out, e.err 260 261 262def memoize(obj): 263 """Keep an in-memory cache of function results given it's inputs 264 """ 265 cache = obj.cache = {} 266 267 @functools.wraps(obj) 268 def memoizer(*args, **kwargs): 269 key = str(args) + str(kwargs) 270 if key not in cache: 271 cache[key] = obj(*args, **kwargs) 272 return cache[key] 273 return memoizer 274 275 276try: 277 from shutil import which 278 which = memoize(which) 279except ImportError: 280 # NOTE: This is shutil.which backported from python-3.3.3 281 @memoize 282 def which(cmd, mode=os.F_OK | os.X_OK, path=None): 283 """Given a command, mode, and a PATH string, return the path which 284 conforms to the given mode on the PATH, or None if there is no such 285 file. 286 287 `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result 288 of os.environ.get("PATH"), or can be overridden with a custom search 289 path. 290 291 """ 292 # Check that a given file can be accessed with the correct mode. 293 # Additionally check that `file` is not a directory, as on Windows 294 # directories pass the os.access check. 295 def _access_check(fn, mode): 296 return (os.path.exists(fn) and os.access(fn, mode) and 297 not os.path.isdir(fn)) 298 299 # If we're given a path with a directory part, look it up directly 300 # rather than referring to PATH directories. This includes checking 301 # relative to the current directory, e.g. ./script 302 if os.path.dirname(cmd): 303 if _access_check(cmd, mode): 304 return cmd 305 return None 306 307 if path is None: 308 path = os.environ.get("PATH", os.defpath) 309 if not path: 310 return None 311 path = path.split(os.pathsep) 312 313 if sys.platform == "win32": 314 # The current directory takes precedence on Windows. 315 if os.curdir not in path: 316 path.insert(0, os.curdir) 317 318 # PATHEXT is necessary to check on Windows. 319 pathext = os.environ.get("PATHEXT", "").split(os.pathsep) 320 # See if the given file matches any of the expected path 321 # extensions. This will allow us to short circuit when given 322 # "python.exe". If it does match, only test that one, otherwise we 323 # have to try others. 324 if any(cmd.lower().endswith(ext.lower()) for ext in pathext): 325 files = [cmd] 326 else: 327 files = [cmd + ext for ext in pathext] 328 else: 329 # On other platforms you don't have things like PATHEXT to tell you 330 # what file suffixes are executable, so just pass on cmd as-is. 331 files = [cmd] 332 333 seen = set() 334 for dir in path: 335 normdir = os.path.normcase(dir) 336 if normdir not in seen: 337 seen.add(normdir) 338 for thefile in files: 339 name = os.path.join(dir, thefile) 340 if _access_check(name, mode): 341 return name 342 return None 343 344 345def parse_datafile(file): 346 """Parse .data files, treating files as JSON 347 """ 348 data = [] 349 with open(file) as fh: 350 for line in fh: 351 line = line.rstrip("\n") 352 353 # Turn [] strings into {} to be treated properly as JSON hashes 354 if line.startswith('[') and line.endswith(']'): 355 line = '{' + line[1:-1] + '}' 356 357 if line.startswith("{"): 358 data.append(json.loads(line)) 359 else: 360 data.append(line) 361 return data 362 363 364def mkstemp(data): 365 """ 366 Create a temporary file that is removed at process exit 367 """ 368 def rmtemp(name): 369 try: 370 os.remove(name) 371 except OSError: 372 pass 373 374 f = tempfile.NamedTemporaryFile(delete=False) 375 f.write(data) 376 f.close() 377 378 # Ensure removal at end of python session 379 atexit.register(rmtemp, f.name) 380 381 return f.name 382 383 384def mkstemp_exec(data): 385 """Create a temporary executable file that is removed at process exit 386 """ 387 name = mkstemp(data) 388 os.chmod(name, 0o755) 389 390 return name 391 392# vim: ai sts=4 et sw=4 393