1#!/usr/bin/python3
2
3import subprocess
4import argparse
5import difflib
6import filecmp
7import fnmatch
8import json
9import sys
10import re
11import os
12
13fmtr_class = argparse.ArgumentDefaultsHelpFormatter
14parser = argparse.ArgumentParser(prog = 'nasm-t.py',
15                                 formatter_class=fmtr_class)
16
17parser.add_argument('-d', '--directory',
18                    dest = 'dir', default = './travis/test',
19                    help = 'Directory with tests')
20
21parser.add_argument('--nasm',
22                    dest = 'nasm', default = './nasm',
23                    help = 'Nasm executable to use')
24
25parser.add_argument('--hexdump',
26                    dest = 'hexdump', default = '/usr/bin/hexdump',
27                    help = 'Hexdump executable to use')
28
29sp = parser.add_subparsers(dest = 'cmd')
30for cmd in ['run']:
31    spp = sp.add_parser(cmd, help = 'Run test cases')
32    spp.add_argument('-t', '--test',
33                     dest = 'test',
34                     help = 'Run the selected test only',
35                     required = False)
36
37for cmd in ['list']:
38    spp = sp.add_parser(cmd, help = 'List test cases')
39
40for cmd in ['update']:
41    spp = sp.add_parser(cmd, help = 'Update test cases with new compiler')
42    spp.add_argument('-t', '--test',
43                     dest = 'test',
44                     help = 'Update the selected test only',
45                     required = False)
46
47args = parser.parse_args()
48
49if args.cmd == None:
50    parser.print_help()
51    sys.exit(1)
52
53def read_stdfile(path):
54    with open(path, "rb") as f:
55        data = f.read().decode("utf-8").strip("\n")
56        f.close()
57        return data
58
59#
60# Check if descriptor has mandatory fields
61def is_valid_desc(desc):
62    if desc == None:
63        return False
64    if 'description' not in desc:
65        return False
66    if desc['description'] == "":
67        return False
68    return True
69
70#
71# Expand ref/id in descriptors array
72def expand_templates(desc_array):
73    desc_ids = { }
74    for d in desc_array:
75        if 'id' in d:
76            desc_ids[d['id']] = d
77    for i, d in enumerate(desc_array):
78        if 'ref' in d and d['ref'] in desc_ids:
79            ref = desc_ids[d['ref']]
80            own = d.copy()
81            desc_array[i] = ref.copy()
82            for k, v in own.items():
83                desc_array[i][k] = v
84            del desc_array[i]['id']
85    return desc_array
86
87def prepare_desc(desc, basedir, name, path):
88    if not is_valid_desc(desc):
89        return False
90    #
91    # Put private fields
92    desc['_base-dir'] = basedir
93    desc['_json-file'] = name
94    desc['_json-path'] = path
95    desc['_test-name'] = basedir + os.sep + name[:-5]
96    #
97    # If no target provided never update
98    if 'target' not in desc:
99        desc['target'] = []
100        desc['update'] = 'false'
101    #
102    # Which code to expect when nasm finishes
103    desc['_wait'] = 0
104    if 'error' in desc:
105        if desc['error'] == 'expected':
106            desc['_wait'] = 1
107    #
108    # Walk over targets and generate match templates
109    # if were not provided yet
110    for d in desc['target']:
111        if 'output' in d and not 'match' in d:
112            d['match'] = d['output'] + ".t"
113    return True
114
115def read_json(path):
116    desc = None
117    try:
118        with open(path, "rb") as f:
119            try:
120                desc = json.loads(f.read().decode("utf-8").strip("\n"))
121            except:
122                desc = None
123            finally:
124                f.close()
125    except:
126        pass
127    return desc
128
129def read_desc(basedir, name):
130    path = basedir + os.sep + name
131    desc = read_json(path)
132    desc_array = []
133    if type(desc) == dict:
134        if prepare_desc(desc, basedir, name, path) == True:
135            desc_array += [desc]
136    elif type(desc) == list:
137        expand_templates(desc)
138        for de in desc:
139            if prepare_desc(de, basedir, name, path) == True:
140                desc_array += [de]
141    return desc_array
142
143def collect_test_desc_from_file(path):
144    if not fnmatch.fnmatch(path, '*.json'):
145        path += '.json'
146    basedir = os.path.dirname(path)
147    filename = os.path.basename(path)
148    return read_desc(basedir, filename)
149
150def collect_test_desc_from_dir(basedir):
151    desc_array = []
152    if os.path.isdir(basedir):
153        for filename in os.listdir(basedir):
154            if os.path.isdir(basedir + os.sep + filename):
155                desc_array += collect_test_desc_from_dir(basedir + os.sep + filename)
156            elif fnmatch.fnmatch(filename, '*.json'):
157                desc = read_desc(basedir, filename)
158                if desc == None:
159                    continue
160                desc_array += desc
161        desc_array.sort(key=lambda x: x['_test-name'])
162    return desc_array
163
164if args.cmd == 'list':
165    fmt_entry = '%-32s %s'
166    desc_array = collect_test_desc_from_dir(args.dir)
167    print(fmt_entry % ('Name', 'Description'))
168    for desc in desc_array:
169        print(fmt_entry % (desc['_test-name'], desc['description']))
170
171def test_abort(test, message):
172    print("\t%s: %s" % (test, message))
173    print("=== Test %s ABORT ===" % (test))
174    sys.exit(1)
175    return False
176
177def test_fail(test, message):
178    print("\t%s: %s" % (test, message))
179    print("=== Test %s FAIL ===" % (test))
180    return False
181
182def test_skip(test, message):
183    print("\t%s: %s" % (test, message))
184    print("=== Test %s SKIP ===" % (test))
185    return True
186
187def test_over(test):
188    print("=== Test %s ERROR OVER ===" % (test))
189    return True
190
191def test_pass(test):
192    print("=== Test %s PASS ===" % (test))
193    return True
194
195def test_updated(test):
196    print("=== Test %s UPDATED ===" % (test))
197    return True
198
199def run_hexdump(path):
200    p = subprocess.Popen([args.hexdump, "-C", path],
201                         stdout = subprocess.PIPE,
202                         close_fds = True)
203    if p.wait() == 0:
204        return p
205    return None
206
207def show_std(stdname, data):
208    print("\t--- %s" % (stdname))
209    for i in data.split("\n"):
210        print("\t%s" % i)
211    print("\t---")
212
213def cmp_std(from_name, from_data, match_name, match_data):
214    if from_data != match_data:
215        print("\t--- %s" % (from_name))
216        for i in from_data.split("\n"):
217            print("\t%s" % i)
218        print("\t--- %s" % (match_name))
219        for i in match_data.split("\n"):
220            print("\t%s" % i)
221
222        diff = difflib.unified_diff(from_data.split("\n"), match_data.split("\n"),
223                                    fromfile = from_name, tofile = match_name)
224        for i in diff:
225            print("\t%s" % i.strip("\n"))
226        print("\t---")
227        return False
228    return True
229
230def show_diff(test, patha, pathb):
231    pa = run_hexdump(patha)
232    pb = run_hexdump(pathb)
233    if pa == None or pb == None:
234        return test_fail(test, "Can't create dumps")
235    sa = pa.stdout.read().decode("utf-8").strip("\n")
236    sb = pb.stdout.read().decode("utf-8").strip("\n")
237    print("\t--- hexdump %s" % (patha))
238    for i in sa.split("\n"):
239        print("\t%s" % i)
240    print("\t--- hexdump %s" % (pathb))
241    for i in sb.split("\n"):
242        print("\t%s" % i)
243    pa.stdout.close()
244    pb.stdout.close()
245
246    diff = difflib.unified_diff(sa.split("\n"), sb.split("\n"),
247                                fromfile = patha, tofile = pathb)
248    for i in diff:
249        print("\t%s" % i.strip("\n"))
250    print("\t---")
251    return True
252
253def prepare_run_opts(desc):
254    opts = []
255
256    if 'format' in desc:
257        opts += ['-f', desc['format']]
258    if 'option' in desc:
259        opts += desc['option'].split(" ")
260    for t in desc['target']:
261        if 'output' in t:
262            if 'option' in t:
263                opts += t['option'].split(" ") + [desc['_base-dir'] + os.sep + t['output']]
264            else:
265                opts += ['-o', desc['_base-dir'] + os.sep + t['output']]
266        if 'stdout' in t or 'stderr' in t:
267            if 'option' in t:
268                opts += t['option'].split(" ")
269    if 'source' in desc:
270        opts += [desc['_base-dir'] + os.sep + desc['source']]
271    return opts
272
273def exec_nasm(desc):
274    print("\tProcessing %s" % (desc['_test-name']))
275    opts = [args.nasm] + prepare_run_opts(desc)
276
277    nasm_env = os.environ.copy()
278    nasm_env['NASM_TEST_RUN'] = 'y'
279
280    desc_env = desc.get('environ')
281    if desc_env:
282        for i in desc_env:
283            v = i.split('=')
284            if len(v) == 2:
285                nasm_env[v[0]] = v[1]
286            else:
287                nasm_env[v[0]] = None
288
289    print("\tExecuting %s" % (" ".join(opts)))
290    pnasm = subprocess.Popen(opts,
291                             stdout = subprocess.PIPE,
292                             stderr = subprocess.PIPE,
293                             close_fds = True,
294                             env = nasm_env)
295    if pnasm == None:
296        test_fail(desc['_test-name'], "Unable to execute test")
297        return None
298
299    stderr = pnasm.stderr.read(4194304).decode("utf-8").strip("\n")
300    stdout = pnasm.stdout.read(4194304).decode("utf-8").strip("\n")
301
302    pnasm.stdout.close()
303    pnasm.stderr.close()
304
305    wait_rc = pnasm.wait();
306
307    if desc['_wait'] != wait_rc:
308        if stdout != "":
309            show_std("stdout", stdout)
310        if stderr != "":
311            show_std("stderr", stderr)
312        test_fail(desc['_test-name'],
313                  "Unexpected ret code: " + str(wait_rc))
314        return None, None, None
315    return pnasm, stdout, stderr
316
317def test_run(desc):
318    print("=== Running %s ===" % (desc['_test-name']))
319
320    pnasm, stdout, stderr = exec_nasm(desc)
321    if pnasm == None:
322        return False
323
324    for t in desc['target']:
325        if 'output' in t:
326            output = desc['_base-dir'] + os.sep + t['output']
327            match = desc['_base-dir'] + os.sep + t['match']
328            if desc['_wait'] == 1:
329                continue
330            print("\tComparing %s %s" % (output, match))
331            if filecmp.cmp(match, output) == False:
332                show_diff(desc['_test-name'], match, output)
333                return test_fail(desc['_test-name'], match + " and " + output + " files are different")
334        elif 'stdout' in t:
335            print("\tComparing stdout")
336            match = desc['_base-dir'] + os.sep + t['stdout']
337            match_data = read_stdfile(match)
338            if match_data == None:
339                return test_fail(test, "Can't read " + match)
340            if cmp_std(match, match_data, 'stdout', stdout) == False:
341                return test_fail(desc['_test-name'], "Stdout mismatch")
342            else:
343                stdout = ""
344        elif 'stderr' in t:
345            print("\tComparing stderr")
346            match = desc['_base-dir'] + os.sep + t['stderr']
347            match_data = read_stdfile(match)
348            if match_data == None:
349                return test_fail(test, "Can't read " + match)
350            if cmp_std(match, match_data, 'stderr', stderr) == False:
351                return test_fail(desc['_test-name'], "Stderr mismatch")
352            else:
353                stderr = ""
354
355    if stdout != "":
356        show_std("stdout", stdout)
357        return test_fail(desc['_test-name'], "Stdout is not empty")
358
359    if stderr != "":
360        show_std("stderr", stderr)
361        return test_fail(desc['_test-name'], "Stderr is not empty")
362
363    return test_pass(desc['_test-name'])
364
365#
366# Compile sources and generate new targets
367def test_update(desc):
368    print("=== Updating %s ===" % (desc['_test-name']))
369
370    if 'update' in desc and desc['update'] == 'false':
371        return test_skip(desc['_test-name'], "No output provided")
372
373    pnasm, stdout, stderr = exec_nasm(desc)
374    if pnasm == None:
375        return False
376
377    for t in desc['target']:
378        if 'output' in t:
379            output = desc['_base-dir'] + os.sep + t['output']
380            match = desc['_base-dir'] + os.sep + t['match']
381            print("\tMoving %s to %s" % (output, match))
382            os.rename(output, match)
383        if 'stdout' in t:
384            match = desc['_base-dir'] + os.sep + t['stdout']
385            print("\tMoving %s to %s" % ('stdout', match))
386            with open(match, "wb") as f:
387                f.write(stdout.encode("utf-8"))
388                f.close()
389        if 'stderr' in t:
390            match = desc['_base-dir'] + os.sep + t['stderr']
391            print("\tMoving %s to %s" % ('stderr', match))
392            with open(match, "wb") as f:
393                f.write(stderr.encode("utf-8"))
394                f.close()
395
396    return test_updated(desc['_test-name'])
397
398if args.cmd == 'run':
399    desc_array = []
400    if args.test == None:
401        desc_array = collect_test_desc_from_dir(args.dir)
402    else:
403        desc_array = collect_test_desc_from_file(args.test)
404        if len(desc_array) == 0:
405            test_abort(args.test, "Can't obtain test descriptors")
406
407    for desc in desc_array:
408        if test_run(desc) == False:
409            if 'error' in desc and desc['error'] == 'over':
410                test_over(desc['_test-name'])
411            else:
412                test_abort(desc['_test-name'], "Error detected")
413
414if args.cmd == 'update':
415    desc_array = []
416    if args.test == None:
417        desc_array = collect_test_desc_from_dir(args.dir)
418    else:
419        desc_array = collect_test_desc_from_file(args.test)
420        if len(desc_array) == 0:
421            test_abort(args.test, "Can't obtain a test descriptors")
422
423    for desc in desc_array:
424        if test_update(desc) == False:
425            if 'error' in desc and desc['error'] == 'over':
426                test_over(desc['_test-name'])
427            else:
428                test_abort(desc['_test-name'], "Error detected")
429