1# -*- coding: utf-8 -*-
2from __future__ import division
3import os
4import sys
5import socket
6import signal
7import functools
8import atexit
9import tempfile
10from subprocess import Popen, PIPE, STDOUT
11from threading import Thread
12try:
13    from Queue import Queue, Empty
14except ImportError:
15    from queue import Queue, Empty
16from time import sleep
17try:
18    import simplejson as json
19except ImportError:
20    import json
21from .exceptions import CommandError, TimeoutWaitingFor
22
23ON_POSIX = 'posix' in sys.builtin_module_names
24
25# Directory relative to basetest module location
26CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
27
28# Location of binary files (usually the src/ folder)
29BIN_PREFIX = os.path.abspath(
30    os.path.join(CURRENT_DIR, "..", "..", "src")
31)
32
33# Default location of test certificates
34DEFAULT_CERT_PATH = os.path.abspath(
35    os.path.join(CURRENT_DIR, "..", "test_certs")
36)
37
38# Default location of test extensions
39DEFAULT_EXTENSION_PATH = os.path.abspath(
40    os.path.join(CURRENT_DIR, "..", "test_extensions")
41)
42
43
44# Environment flags to control skipping of shared tests
45SHARED_SKIP = os.environ.get("SHARED_SKIP", False)
46# Environment flags to control use of PATH or in-tree binaries
47SHARED_USE_PATH = os.environ.get("SHARED_USE_PATH", False)
48
49UUID_REGEXP = ("[0-9A-Fa-f]{8}-" + ("[0-9A-Fa-f]{4}-" * 3) + "[0-9A-Fa-f]{12}")
50
51
52def shared_binary_location(cmd="shared"):
53    """ ../src/ is used by default.
54    """
55    return os.path.join(BIN_PREFIX, cmd)
56    return binary_location(cmd, SHARED_USE_PATH)
57
58
59def binary_location(cmd, USE_PATH=False):
60    """ ../src/ is used by default.
61    """
62    return os.path.join(BIN_PREFIX, cmd)
63
64
65def wait_condition(cond, timeout=1, sleeptime=.01):
66    """Wait for condition to return anything other than None
67    """
68    # NOTE Increasing sleeptime can dramatically increase testsuite runtime
69    # It also reduces CPU load significantly
70    if timeout is None:
71        timeout = 1
72
73    if timeout < sleeptime:
74        print("Warning, timeout cannot be smaller than", sleeptime)
75        timeout = sleeptime
76
77    # Max number of attempts until giving up
78    tries = int(timeout / sleeptime)
79
80    for i in range(tries):
81        val = cond()
82
83        if val is not None:
84            break
85
86        sleep(sleeptime)
87
88    return val
89
90
91def wait_process(pid, timeout=None):
92    """Wait for process to finish
93    """
94    def process():
95        try:
96            os.kill(pid, 0)
97        except OSError:
98            # Process is dead
99            return True
100        else:
101            # Process is still ticking
102            return None
103
104    return wait_condition(process, timeout)
105
106
107def _queue_output(arguments, pidq, outputq):
108    """Read/Write output/input of given process.
109    This function is meant to be executed in a thread as it may block
110    """
111    kwargs = arguments["process"]
112    input = arguments["input"]
113
114    try:
115        proc = Popen(**kwargs)
116    except OSError as e:
117        # pid None is read by the main thread as a crash of the process
118        pidq.put(None)
119
120        outputq.put((
121            "",
122            ("Unexpected exception caught during execution: '{0}' . ".format(e)),
123            255))  # false exitcode
124
125        return
126
127    # Put the PID in the queue for main process to know.
128    pidq.put(proc.pid)
129
130    # Send input and wait for finish
131    out, err = proc.communicate(input)
132
133    if sys.version_info > (3,):
134        out, err = out.decode('utf-8'), err.decode('utf-8')
135
136    # Give the output back to the caller
137    outputq.put((out, err, proc.returncode))
138
139
140def _retrieve_output(thread, timeout, queue, thread_error):
141    """Fetch output from binary subprocess queues
142    """
143    # Try to join the thread on failure abort
144    thread.join(timeout)
145    if thread.isAlive():
146        # Join should have killed the thread. This is unexpected
147        raise TimeoutWaitingFor(thread_error + ". Unexpected error")
148
149    # Thread died so we should have output
150    try:
151        # data = (stdout, stderr, exitcode)
152        data = queue.get(timeout=timeout)
153    except Empty:
154        data = TimeoutWaitingFor("streams from program")
155
156    return data
157
158
159def _get_output(arguments, timeout=None):
160    """Collect output from the subprocess without blocking the main process if
161    subprocess hangs.
162    """
163    # NOTE Increase this value if tests fail with None being received as
164    # stdout/stderr instead of the expected content
165    output_timeout = 0.1  # seconds
166
167    pidq = Queue()
168    outputq = Queue()
169
170    t = Thread(target=_queue_output, args=(arguments, pidq, outputq))
171    t.daemon = True
172    t.start()
173
174    try:
175        pid = pidq.get(timeout=timeout)
176    except Empty:
177        pid = None
178
179    # Process crashed or timed out for some reason
180    if pid is None:
181        return _retrieve_output(t, output_timeout, outputq,
182                                "Program to start")
183
184    # Wait for process to finish (normal execution)
185    state = wait_process(pid, timeout)
186
187    if state:
188        # Process finished
189        return _retrieve_output(t, output_timeout, outputq,
190                                "Program thread to join")
191
192    # If we reach this point we assume the process got stuck or timed out
193    for sig in (signal.SIGABRT, signal.SIGTERM, signal.SIGKILL):
194        # Start with lower signals and escalate if process ignores them
195        try:
196            os.kill(pid, signal.SIGABRT)
197        except OSError as e:
198            # 3 means the process finished/died between last check and now
199            if e.errno != 3:
200                raise
201
202        # Wait for process to finish (should die/exit after signal)
203        state = wait_process(pid, timeout)
204
205        if state:
206            # Process finished
207            return _retrieve_output(t, output_timeout, outputq,
208                                    "Program to die")
209
210    # This should never happen but in case something goes really bad
211    raise OSError("Program stopped responding and couldn't be killed")
212
213
214def run_cmd_wait(cmd, input=None, stdout=PIPE, stderr=PIPE,
215                 merge_streams=False, env=os.environ, timeout=None):
216    "Run a subprocess and wait for it to finish"
217
218    if input is None:
219        stdin = None
220    else:
221        stdin = PIPE
222
223    if merge_streams:
224        stderr = STDOUT
225    else:
226        stderr = PIPE
227
228    arguments = {
229        "process": {
230            "args": cmd,
231            "stdin": stdin,
232            "stdout": stdout,
233            "stderr": stderr,
234            "bufsize": 1,
235            "close_fds": ON_POSIX,
236            "env": env,
237        },
238        "input": input,
239    }
240    out, err, exit = _get_output(arguments, timeout)
241
242    if merge_streams:
243        if exit != 0:
244            raise CommandError(cmd, exit, out)
245        else:
246            return exit, out
247    else:
248        if exit != 0:
249            raise CommandError(cmd, exit, out, err)
250        else:
251            return exit, out, err
252
253
254def run_cmd_wait_nofail(*args, **kwargs):
255    "Same as run_cmd_wait but silence the exception if it happens"
256    try:
257        return run_cmd_wait(*args, **kwargs)
258    except CommandError as e:
259        return e.code, e.out, e.err
260
261
262def memoize(obj):
263    """Keep an in-memory cache of function results given it's inputs
264    """
265    cache = obj.cache = {}
266
267    @functools.wraps(obj)
268    def memoizer(*args, **kwargs):
269        key = str(args) + str(kwargs)
270        if key not in cache:
271            cache[key] = obj(*args, **kwargs)
272        return cache[key]
273    return memoizer
274
275
276try:
277    from shutil import which
278    which = memoize(which)
279except ImportError:
280    # NOTE: This is shutil.which backported from python-3.3.3
281    @memoize
282    def which(cmd, mode=os.F_OK | os.X_OK, path=None):
283        """Given a command, mode, and a PATH string, return the path which
284        conforms to the given mode on the PATH, or None if there is no such
285        file.
286
287        `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result
288        of os.environ.get("PATH"), or can be overridden with a custom search
289        path.
290
291        """
292        # Check that a given file can be accessed with the correct mode.
293        # Additionally check that `file` is not a directory, as on Windows
294        # directories pass the os.access check.
295        def _access_check(fn, mode):
296            return (os.path.exists(fn) and os.access(fn, mode) and
297                    not os.path.isdir(fn))
298
299        # If we're given a path with a directory part, look it up directly
300        # rather than referring to PATH directories. This includes checking
301        # relative to the current directory, e.g. ./script
302        if os.path.dirname(cmd):
303            if _access_check(cmd, mode):
304                return cmd
305            return None
306
307        if path is None:
308            path = os.environ.get("PATH", os.defpath)
309        if not path:
310            return None
311        path = path.split(os.pathsep)
312
313        if sys.platform == "win32":
314            # The current directory takes precedence on Windows.
315            if os.curdir not in path:
316                path.insert(0, os.curdir)
317
318            # PATHEXT is necessary to check on Windows.
319            pathext = os.environ.get("PATHEXT", "").split(os.pathsep)
320            # See if the given file matches any of the expected path
321            # extensions. This will allow us to short circuit when given
322            # "python.exe". If it does match, only test that one, otherwise we
323            # have to try others.
324            if any(cmd.lower().endswith(ext.lower()) for ext in pathext):
325                files = [cmd]
326            else:
327                files = [cmd + ext for ext in pathext]
328        else:
329            # On other platforms you don't have things like PATHEXT to tell you
330            # what file suffixes are executable, so just pass on cmd as-is.
331            files = [cmd]
332
333        seen = set()
334        for dir in path:
335            normdir = os.path.normcase(dir)
336            if normdir not in seen:
337                seen.add(normdir)
338                for thefile in files:
339                    name = os.path.join(dir, thefile)
340                    if _access_check(name, mode):
341                        return name
342        return None
343
344
345def parse_datafile(file):
346    """Parse .data files, treating files as JSON
347    """
348    data = []
349    with open(file) as fh:
350        for line in fh:
351            line = line.rstrip("\n")
352
353            # Turn [] strings into {} to be treated properly as JSON hashes
354            if line.startswith('[') and line.endswith(']'):
355                line = '{' + line[1:-1] + '}'
356
357            if line.startswith("{"):
358                data.append(json.loads(line))
359            else:
360                data.append(line)
361    return data
362
363
364def mkstemp(data):
365    """
366    Create a temporary file that is removed at process exit
367    """
368    def rmtemp(name):
369        try:
370            os.remove(name)
371        except OSError:
372            pass
373
374    f = tempfile.NamedTemporaryFile(delete=False)
375    f.write(data)
376    f.close()
377
378    # Ensure removal at end of python session
379    atexit.register(rmtemp, f.name)
380
381    return f.name
382
383
384def mkstemp_exec(data):
385    """Create a temporary executable file that is removed at process exit
386    """
387    name = mkstemp(data)
388    os.chmod(name, 0o755)
389
390    return name
391
392# vim: ai sts=4 et sw=4
393