1#! /usr/bin/env python
2
3#    cfvtest.py - initialization and utility stuff for cfv testing
4#    Copyright (C) 2000-2005  Matthew Mueller <donut AT dakotacom DOT net>
5#
6#    This program is free software; you can redistribute it and/or modify
7#    it under the terms of the GNU General Public License as published by
8#    the Free Software Foundation; either version 2 of the License, or
9#    (at your option) any later version.
10#
11#    This program is distributed in the hope that it will be useful,
12#    but WITHOUT ANY WARRANTY; without even the implied warranty of
13#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14#    GNU General Public License for more details.
15#
16#    You should have received a copy of the GNU General Public License
17#    along with this program; if not, write to the Free Software
18#    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19
20from __future__ import print_function
21
22from builtins import map
23from builtins import object
24
25import fnmatch
26import imp
27import importlib
28import os
29import shlex
30import sys
31import traceback
32import unittest
33from doctest import DocTestSuite
34from glob import glob
35from unittest import TestCase  # noqa: F401
36from unittest import main
37
38
39cfvenv = ''
40
41cfvfn = None
42ver_cfv = ver_mmap = None
43runcfv = None
44testpath = os.path.split(__file__)[0] or os.curdir
45datapath = os.path.join(testpath, 'testdata')
46
47
48class NullFile(object):
49    def isatty(self):
50        return 0
51
52    def write(self, s):
53        pass
54
55    def writelines(self, l):
56        pass
57
58    def flush(self):
59        pass
60
61    def close(self):
62        pass
63
64
65nullfile = NullFile()
66
67
68def expand_cmdline(cmd):
69    argv = []
70    for arg in shlex.split(cmd):
71        if '*' in arg or '?' in arg or '[' in arg:
72            argv.extend(glob(arg))
73        else:
74            argv.append(arg)
75    return argv
76
77
78def runcfv_exe(cmd, stdin=None, stdout=None, stderr=None, need_reload=0):
79    import subprocess
80
81    def open_output(fn):
82        if fn == '/dev/null' and not os.path.exists(fn):
83            fn = subprocess.DEVNULL
84        return open(fn, 'wt')
85
86    p_stdin = p_stdout = p_stderr = subprocess.PIPE
87    if stdin:
88        p_stdin = open(stdin, 'rt')
89    if stdout:
90        p_stdout = open_output(stdout)
91    else:
92        p_stderr = subprocess.STDOUT
93    if stderr:
94        p_stderr = open_output(stderr)
95    argv = [cfvfn] + expand_cmdline(cmd)
96    proc = subprocess.Popen(argv, stdin=p_stdin, stdout=p_stdout, stderr=p_stderr, universal_newlines=True)
97    for f in p_stdin, p_stdout, p_stderr:
98        if f not in (subprocess.PIPE, subprocess.STDOUT, None):
99            f.close()
100    obuf, ebuf = proc.communicate()
101    if ebuf or obuf is None:
102        assert not obuf
103        o = ebuf
104    else:
105        o = obuf
106    s = proc.returncode
107    if o:
108        if o[-1:] == '\n':
109            o = o[:-1]
110    return s, o
111
112
113# TODO: make the runcfv_* functions (optionally?) take args as a list instead of a string
114def runcfv_py(cmd, stdin=None, stdout=None, stderr=None, need_reload=0):
115    from io import BytesIO, TextIOWrapper
116    obuf = BytesIO()
117    obuftext = TextIOWrapper(obuf)
118    saved = sys.stdin, sys.stdout, sys.stderr, sys.argv
119    cwd = os.getcwd()
120
121    def open_output(file):
122        if file:
123            if file == "/dev/null":
124                return nullfile
125            return open(file, 'wt')
126        else:
127            return obuftext
128
129    try:
130        if stdin:
131            sys.stdin = open(stdin, 'rt')
132        else:
133            sys.stdin = TextIOWrapper(BytesIO())
134        sys.stdout = open_output(stdout)
135        sys.stderr = open_output(stderr)
136        sys.argv = [cfvfn] + expand_cmdline(cmd)
137        # TODO: make this work with cfv 1.x as well so that we can benchmark compare them in internal mode.
138        import cfv.cftypes
139        importlib.reload(cfv.cftypes)  # XXX
140        import cfv.common
141        importlib.reload(cfv.common)  # XXX: hack until I can get all the global state storage factored out.
142        if need_reload:
143            import cfv.hash
144            importlib.reload(cfv.hash)  # XXX: hack for environment variable changing
145        cfv_ns = {
146            '__name__': '__main__',
147            '__file__': cfvfn,
148            '__doc__': None,
149            '__package__': None,
150        }
151        try:
152            exec (cfv_compiled, cfv_ns)
153            s = 'no exit?'
154        except SystemExit as e:
155            s = e.code
156            if stdin:
157                sys.stdin.close()
158            if stdout:
159                sys.stdout.close()
160            if stderr:
161                sys.stderr.close()
162        except KeyboardInterrupt:
163            raise
164        except Exception:
165            traceback.print_exc(file=obuftext)
166            s = 1
167    finally:
168        sys.stdin, sys.stdout, sys.stderr, sys.argv = saved
169        os.chdir(cwd)
170    obuftext.flush()
171    o = obuf.getvalue().decode()
172    if o:
173        if o[-1:] == '\n':
174            o = o[:-1]
175    return s, o
176
177
178def get_version_flags():
179    global ver_cfv, ver_mmap
180    s, o = runcfv("--version", need_reload=1)
181    if o.find('cfv ') >= 0:
182        ver_cfv = o[o.find('cfv ') + 4:].splitlines()[0]
183    else:
184        ver_cfv = None
185    ver_mmap = o.find('mmap') >= 0
186
187
188def setcfv(fn=None, internal=None):
189    global cfvfn, cfv_compiled, runcfv
190
191    if internal is not None:
192        runcfv = internal and runcfv_py or runcfv_exe
193
194    if fn is None:
195        fn = os.path.join(testpath, 'cfv')
196
197    assert os.path.isfile(fn)
198    cfvfn = os.path.abspath(fn)
199    with open(cfvfn, 'rt') as f:
200        _cfv_code = f.read()
201    cfv_compiled = compile(_cfv_code, cfvfn, 'exec')
202
203    with open(cfvfn, 'rt') as f:
204        # This is so that the sys.path modification of the wrapper (if it has one) will be executed..
205        imp.load_source('cfvwrapper', cfvfn, f)
206
207    get_version_flags()
208
209
210def setenv(k, v):
211    global cfvenv
212    cfvenv = "%s=%s %s" % (k, v, cfvenv)
213    os.environ[k] = v
214    get_version_flags()
215
216
217def my_import(name):
218    mod = __import__(name)
219    components = name.split('.')
220    for comp in components[1:]:
221        mod = getattr(mod, comp)
222    return mod
223
224
225def rfind(root, match):
226    root = os.path.join(root, '')
227    for path, dirs, files in os.walk(root):
228        subpath = path.replace(root, '', 1)
229        for file in files:
230            if fnmatch.fnmatch(file, match):
231                yield os.path.join(subpath, file)
232
233
234def all_unittests_suite():
235    modules_to_test = [os.path.splitext(f)[0].replace(os.sep, '.') for f in rfind(testpath, 'test_*.py')]
236    assert modules_to_test
237    alltests = unittest.TestSuite()
238    for module in map(my_import, modules_to_test):
239        alltests.addTest(unittest.findTestCases(module))
240
241    import cfv.common
242    libdir = os.path.split(cfv.common.__file__)[0]
243    modules_to_doctest = ['cfv.' + os.path.splitext(f)[0].replace(os.sep, '.') for f in rfind(libdir, '*.py')]
244    # TODO: better way to add files in test/ dir to doctest suite?
245    modules_to_doctest.append('benchmark')
246    assert 'cfv.common' in modules_to_doctest
247    for name in modules_to_doctest:
248        module = my_import(name)
249        assert module.__name__ == name, (module, name)
250        try:
251            suite = DocTestSuite(module)
252        except ValueError as e:
253            if len(e.args) != 2 or e.args[1] != 'has no docstrings':
254                print(e)
255        else:
256            alltests.addTest(suite)
257
258    return alltests
259
260
261if __name__ == '__main__':
262    # initialize with default options
263    setcfv(internal=1)
264
265    main(defaultTest='all_unittests_suite')
266