1#!/usr/bin/python3
2
3import subprocess
4import argparse
5import difflib
6import filecmp
7import fnmatch
8import json
9import sys
10import re
11import os
12
13fmtr_class = argparse.ArgumentDefaultsHelpFormatter
14parser = argparse.ArgumentParser(prog = 'nasm-t.py',
15                                 formatter_class=fmtr_class)
16
17parser.add_argument('-d', '--directory',
18                    dest = 'dir', default = './travis/test',
19                    help = 'Directory with tests')
20
21parser.add_argument('--nasm',
22                    dest = 'nasm', default = './nasm',
23                    help = 'Nasm executable to use')
24
25parser.add_argument('--hexdump',
26                    dest = 'hexdump', default = '/usr/bin/hexdump',
27                    help = 'Hexdump executable to use')
28
29sp = parser.add_subparsers(dest = 'cmd')
30for cmd in ['run']:
31    spp = sp.add_parser(cmd, help = 'Run test cases')
32    spp.add_argument('-t', '--test',
33                     dest = 'test',
34                     help = 'Run the selected test only',
35                     required = False)
36
37for cmd in ['new']:
38    spp = sp.add_parser(cmd, help = 'Add a new test case')
39    spp.add_argument('--description',
40                     dest = 'description', default = "Description of a test",
41                     help = 'Description of a test',
42                     required = False)
43    spp.add_argument('--id',
44                     dest = 'id',
45                     help = 'Test identifier/name',
46                     required = True)
47    spp.add_argument('--format',
48                     dest = 'format', default = 'bin',
49                     help = 'Output format',
50                     required = False)
51    spp.add_argument('--source',
52                     dest = 'source',
53                     help = 'Source file',
54                     required = False)
55    spp.add_argument('--option',
56                     dest = 'option',
57                     default = '-Ox',
58                     help = 'NASM options',
59                     required = False)
60    spp.add_argument('--ref',
61                     dest = 'ref',
62                     help = 'Test reference',
63                     required = False)
64    spp.add_argument('--error',
65                     dest = 'error',
66                     help = 'Set to "y" if test is supposed to fail',
67                     required = False)
68    spp.add_argument('--output',
69                     dest = 'output', default = 'y',
70                     help = 'Output (compiled) file name (or "y")',
71                     required = False)
72    spp.add_argument('--stdout',
73                     dest = 'stdout', default = 'y',
74                     help = 'Filename of stdout file (or "y")',
75                     required = False)
76    spp.add_argument('--stderr',
77                     dest = 'stderr', default = 'y',
78                     help = 'Filename of stderr file (or "y")',
79                     required = False)
80
81for cmd in ['list']:
82    spp = sp.add_parser(cmd, help = 'List test cases')
83
84for cmd in ['update']:
85    spp = sp.add_parser(cmd, help = 'Update test cases with new compiler')
86    spp.add_argument('-t', '--test',
87                     dest = 'test',
88                     help = 'Update the selected test only',
89                     required = False)
90
91map_fmt_ext = {
92        'bin':      '.bin',
93        'elf':      '.o',
94        'elf64':    '.o',
95        'elf32':    '.o',
96        'elfx32':   '.o',
97        'ith':      '.ith',
98        'srec':     '.srec',
99        'obj':      '.obj',
100        'win32':    '.obj',
101        'win64':    '.obj',
102        'coff':     '.obj',
103        'macho':    '.o',
104        'macho32':  '.o',
105        'macho64':  '.o',
106        'aout':     '.out',
107        'aoutb':    '.out',
108        'as86':     '.o',
109        'rdf':      '.rdf',
110}
111
112args = parser.parse_args()
113
114if args.cmd == None:
115    parser.print_help()
116    sys.exit(1)
117
118def read_stdfile(path):
119    with open(path, "rb") as f:
120        data = f.read().decode("utf-8").strip("\n")
121        f.close()
122        return data
123
124#
125# Check if descriptor has mandatory fields
126def is_valid_desc(desc):
127    if desc == None:
128        return False
129    if 'description' not in desc:
130        return False
131    if desc['description'] == "":
132        return False
133    return True
134
135#
136# Expand ref/id in descriptors array
137def expand_templates(desc_array):
138    desc_ids = { }
139    for d in desc_array:
140        if 'id' in d:
141            desc_ids[d['id']] = d
142    for i, d in enumerate(desc_array):
143        if 'ref' in d and d['ref'] in desc_ids:
144            ref = desc_ids[d['ref']]
145            own = d.copy()
146            desc_array[i] = ref.copy()
147            for k, v in own.items():
148                desc_array[i][k] = v
149            del desc_array[i]['id']
150    return desc_array
151
152def prepare_desc(desc, basedir, name, path):
153    if not is_valid_desc(desc):
154        return False
155    #
156    # Put private fields
157    desc['_base-dir'] = basedir
158    desc['_json-file'] = name
159    desc['_json-path'] = path
160    desc['_test-name'] = basedir + os.sep + name[:-5]
161    #
162    # If no target provided never update
163    if 'target' not in desc:
164        desc['target'] = []
165        desc['update'] = 'false'
166    #
167    # Which code to expect when nasm finishes
168    desc['_wait'] = 0
169    if 'error' in desc:
170        if desc['error'] == 'expected':
171            desc['_wait'] = 1
172    #
173    # Walk over targets and generate match templates
174    # if were not provided yet
175    for d in desc['target']:
176        if 'output' in d and not 'match' in d:
177            d['match'] = d['output'] + ".t"
178    return True
179
180def read_json(path):
181    desc = None
182    try:
183        with open(path, "rb") as f:
184            try:
185                desc = json.loads(f.read().decode("utf-8").strip("\n"))
186            except:
187                desc = None
188            finally:
189                f.close()
190    except:
191        pass
192    return desc
193
194def read_desc(basedir, name):
195    path = basedir + os.sep + name
196    desc = read_json(path)
197    desc_array = []
198    if type(desc) == dict:
199        if prepare_desc(desc, basedir, name, path) == True:
200            desc_array += [desc]
201    elif type(desc) == list:
202        expand_templates(desc)
203        for de in desc:
204            if prepare_desc(de, basedir, name, path) == True:
205                desc_array += [de]
206    return desc_array
207
208def collect_test_desc_from_file(path):
209    if not fnmatch.fnmatch(path, '*.json'):
210        path += '.json'
211    basedir = os.path.dirname(path)
212    filename = os.path.basename(path)
213    return read_desc(basedir, filename)
214
215def collect_test_desc_from_dir(basedir):
216    desc_array = []
217    if os.path.isdir(basedir):
218        for filename in os.listdir(basedir):
219            if os.path.isdir(basedir + os.sep + filename):
220                desc_array += collect_test_desc_from_dir(basedir + os.sep + filename)
221            elif fnmatch.fnmatch(filename, '*.json'):
222                desc = read_desc(basedir, filename)
223                if desc == None:
224                    continue
225                desc_array += desc
226        desc_array.sort(key=lambda x: x['_test-name'])
227    return desc_array
228
229if args.cmd == 'list':
230    fmt_entry = '%-32s %s'
231    desc_array = collect_test_desc_from_dir(args.dir)
232    print(fmt_entry % ('Name', 'Description'))
233    for desc in desc_array:
234        print(fmt_entry % (desc['_test-name'], desc['description']))
235
236def test_abort(test, message):
237    print("\t%s: %s" % (test, message))
238    print("=== Test %s ABORT ===" % (test))
239    sys.exit(1)
240    return False
241
242def test_fail(test, message):
243    print("\t%s: %s" % (test, message))
244    print("=== Test %s FAIL ===" % (test))
245    return False
246
247def test_skip(test, message):
248    print("\t%s: %s" % (test, message))
249    print("=== Test %s SKIP ===" % (test))
250    return True
251
252def test_over(test):
253    print("=== Test %s ERROR OVER ===" % (test))
254    return True
255
256def test_pass(test):
257    print("=== Test %s PASS ===" % (test))
258    return True
259
260def test_updated(test):
261    print("=== Test %s UPDATED ===" % (test))
262    return True
263
264def run_hexdump(path):
265    p = subprocess.Popen([args.hexdump, "-C", path],
266                         stdout = subprocess.PIPE,
267                         close_fds = True)
268    if p.wait() == 0:
269        return p
270    return None
271
272def show_std(stdname, data):
273    print("\t--- %s" % (stdname))
274    for i in data.split("\n"):
275        print("\t%s" % i)
276    print("\t---")
277
278def cmp_std(from_name, from_data, match_name, match_data):
279    if from_data != match_data:
280        print("\t--- %s" % (from_name))
281        for i in from_data.split("\n"):
282            print("\t%s" % i)
283        print("\t--- %s" % (match_name))
284        for i in match_data.split("\n"):
285            print("\t%s" % i)
286
287        diff = difflib.unified_diff(from_data.split("\n"), match_data.split("\n"),
288                                    fromfile = from_name, tofile = match_name)
289        for i in diff:
290            print("\t%s" % i.strip("\n"))
291        print("\t---")
292        return False
293    return True
294
295def show_diff(test, patha, pathb):
296    pa = run_hexdump(patha)
297    pb = run_hexdump(pathb)
298    if pa == None or pb == None:
299        return test_fail(test, "Can't create dumps")
300    sa = pa.stdout.read().decode("utf-8").strip("\n")
301    sb = pb.stdout.read().decode("utf-8").strip("\n")
302    print("\t--- hexdump %s" % (patha))
303    for i in sa.split("\n"):
304        print("\t%s" % i)
305    print("\t--- hexdump %s" % (pathb))
306    for i in sb.split("\n"):
307        print("\t%s" % i)
308    pa.stdout.close()
309    pb.stdout.close()
310
311    diff = difflib.unified_diff(sa.split("\n"), sb.split("\n"),
312                                fromfile = patha, tofile = pathb)
313    for i in diff:
314        print("\t%s" % i.strip("\n"))
315    print("\t---")
316    return True
317
318def prepare_run_opts(desc):
319    opts = []
320
321    if 'format' in desc:
322        opts += ['-f', desc['format']]
323    if 'option' in desc:
324        opts += desc['option'].split(" ")
325    for t in desc['target']:
326        if 'output' in t:
327            if 'option' in t:
328                opts += t['option'].split(" ") + [desc['_base-dir'] + os.sep + t['output']]
329            else:
330                opts += ['-o', desc['_base-dir'] + os.sep + t['output']]
331        if 'stdout' in t or 'stderr' in t:
332            if 'option' in t:
333                opts += t['option'].split(" ")
334    if 'source' in desc:
335        opts += [desc['_base-dir'] + os.sep + desc['source']]
336    return opts
337
338def exec_nasm(desc):
339    print("\tProcessing %s" % (desc['_test-name']))
340    opts = [args.nasm] + prepare_run_opts(desc)
341
342    nasm_env = os.environ.copy()
343    nasm_env['NASMENV'] = '--reproducible'
344
345    desc_env = desc.get('environ')
346    if desc_env:
347        for i in desc_env:
348            v = i.split('=')
349            if len(v) == 2:
350                nasm_env[v[0]] = v[1]
351            else:
352                nasm_env[v[0]] = None
353
354    print("\tExecuting %s" % (" ".join(opts)))
355    pnasm = subprocess.Popen(opts,
356                             stdout = subprocess.PIPE,
357                             stderr = subprocess.PIPE,
358                             close_fds = True,
359                             env = nasm_env)
360    if pnasm == None:
361        test_fail(desc['_test-name'], "Unable to execute test")
362        return None
363
364    stderr = pnasm.stderr.read(4194304).decode("utf-8").strip("\n")
365    stdout = pnasm.stdout.read(4194304).decode("utf-8").strip("\n")
366
367    pnasm.stdout.close()
368    pnasm.stderr.close()
369
370    wait_rc = pnasm.wait();
371
372    if desc['_wait'] != wait_rc:
373        if stdout != "":
374            show_std("stdout", stdout)
375        if stderr != "":
376            show_std("stderr", stderr)
377        test_fail(desc['_test-name'],
378                  "Unexpected ret code: " + str(wait_rc))
379        return None, None, None
380    return pnasm, stdout, stderr
381
382def test_run(desc):
383    print("=== Running %s ===" % (desc['_test-name']))
384
385    if 'disable' in desc:
386        return test_skip(desc['_test-name'], desc["disable"])
387
388    pnasm, stdout, stderr = exec_nasm(desc)
389    if pnasm == None:
390        return False
391
392    for t in desc['target']:
393        if 'output' in t:
394            output = desc['_base-dir'] + os.sep + t['output']
395            match = desc['_base-dir'] + os.sep + t['match']
396            if desc['_wait'] == 1:
397                continue
398            print("\tComparing %s %s" % (output, match))
399            if filecmp.cmp(match, output) == False:
400                show_diff(desc['_test-name'], match, output)
401                return test_fail(desc['_test-name'], match + " and " + output + " files are different")
402        elif 'stdout' in t:
403            print("\tComparing stdout")
404            match = desc['_base-dir'] + os.sep + t['stdout']
405            match_data = read_stdfile(match)
406            if match_data == None:
407                return test_fail(test, "Can't read " + match)
408            if cmp_std(match, match_data, 'stdout', stdout) == False:
409                return test_fail(desc['_test-name'], "Stdout mismatch")
410            else:
411                stdout = ""
412        elif 'stderr' in t:
413            print("\tComparing stderr")
414            match = desc['_base-dir'] + os.sep + t['stderr']
415            match_data = read_stdfile(match)
416            if match_data == None:
417                return test_fail(test, "Can't read " + match)
418            if cmp_std(match, match_data, 'stderr', stderr) == False:
419                return test_fail(desc['_test-name'], "Stderr mismatch")
420            else:
421                stderr = ""
422
423    if stdout != "":
424        show_std("stdout", stdout)
425        return test_fail(desc['_test-name'], "Stdout is not empty")
426
427    if stderr != "":
428        show_std("stderr", stderr)
429        return test_fail(desc['_test-name'], "Stderr is not empty")
430
431    return test_pass(desc['_test-name'])
432
433#
434# Compile sources and generate new targets
435def test_update(desc):
436    print("=== Updating %s ===" % (desc['_test-name']))
437
438    if 'update' in desc and desc['update'] == 'false':
439        return test_skip(desc['_test-name'], "No output provided")
440    if 'disable' in desc:
441        return test_skip(desc['_test-name'], desc["disable"])
442
443    pnasm, stdout, stderr = exec_nasm(desc)
444    if pnasm == None:
445        return False
446
447    for t in desc['target']:
448        if 'output' in t:
449            output = desc['_base-dir'] + os.sep + t['output']
450            match = desc['_base-dir'] + os.sep + t['match']
451            print("\tMoving %s to %s" % (output, match))
452            os.rename(output, match)
453        if 'stdout' in t:
454            match = desc['_base-dir'] + os.sep + t['stdout']
455            print("\tMoving %s to %s" % ('stdout', match))
456            with open(match, "wb") as f:
457                f.write(stdout.encode("utf-8"))
458                f.close()
459        if 'stderr' in t:
460            match = desc['_base-dir'] + os.sep + t['stderr']
461            print("\tMoving %s to %s" % ('stderr', match))
462            with open(match, "wb") as f:
463                f.write(stderr.encode("utf-8"))
464                f.close()
465
466    return test_updated(desc['_test-name'])
467
468#
469# Create a new empty test case
470if args.cmd == 'new':
471    #
472    # If no source provided create one
473    # from (ID which is required)
474    if not args.source:
475        args.source = args.id + ".asm"
476
477    #
478    # Emulate "touch" on source file
479    path_asm = args.dir + os.sep + args.source
480    print("\tCreating %s" % (path_asm))
481    open(path_asm, 'a').close()
482
483    #
484    # Fill the test descriptor
485    #
486    # FIXME: We should probably use Jinja
487    path_json = args.dir + os.sep + args.id + ".json"
488    print("\tFilling descriptor %s" % (path_json))
489    with open(path_json, 'wb') as f:
490        f.write("[\n\t{\n".encode("utf-8"))
491        acc = []
492        if args.description:
493            acc.append("\t\t\"description\": \"{}\"".format(args.description))
494        acc.append("\t\t\"id\": \"{}\"".format(args.id))
495        if args.format:
496            acc.append("\t\t\"format\": \"{}\"".format(args.format))
497        acc.append("\t\t\"source\": \"{}\"".format(args.source))
498        if args.option:
499            acc.append("\t\t\"option\": \"{}\"".format(args.option))
500        if args.ref:
501            acc.append("\t\t\"ref\": \"{}\"".format(args.ref))
502        if args.error == 'y':
503            acc.append("\t\t\"error\": \"true\"")
504        f.write(",\n".join(acc).encode("utf-8"))
505        if args.output or args.stdout or args.stderr:
506            acc = []
507            if args.output:
508                if args.output == 'y':
509                    if args.format in map_fmt_ext:
510                        args.output = args.id + map_fmt_ext[args.format]
511                acc.append("\t\t\t{{ \"output\": \"{}\" }}".format(args.output))
512            if args.stdout:
513                if args.stdout == 'y':
514                    args.stdout = args.id + '.stdout'
515                acc.append("\t\t\t{{ \"stdout\": \"{}\" }}".format(args.stdout))
516            if args.stderr:
517                if args.stderr == 'y':
518                    args.stderr = args.id + '.stderr'
519                acc.append("\t\t\t{{ \"stderr\": \"{}\" }}".format(args.stderr))
520            f.write(",\n".encode("utf-8"))
521            f.write("\t\t\"target\": [\n".encode("utf-8"))
522            f.write(",\n".join(acc).encode("utf-8"))
523            f.write("\n\t\t]".encode("utf-8"))
524        f.write("\n\t}\n]\n".encode("utf-8"))
525        f.close()
526
527if args.cmd == 'run':
528    desc_array = []
529    if args.test == None:
530        desc_array = collect_test_desc_from_dir(args.dir)
531    else:
532        desc_array = collect_test_desc_from_file(args.test)
533        if len(desc_array) == 0:
534            test_abort(args.test, "Can't obtain test descriptors")
535
536    for desc in desc_array:
537        if test_run(desc) == False:
538            if 'error' in desc and desc['error'] == 'over':
539                test_over(desc['_test-name'])
540            else:
541                test_abort(desc['_test-name'], "Error detected")
542
543if args.cmd == 'update':
544    desc_array = []
545    if args.test == None:
546        desc_array = collect_test_desc_from_dir(args.dir)
547    else:
548        desc_array = collect_test_desc_from_file(args.test)
549        if len(desc_array) == 0:
550            test_abort(args.test, "Can't obtain a test descriptors")
551
552    for desc in desc_array:
553        if test_update(desc) == False:
554            if 'error' in desc and desc['error'] == 'over':
555                test_over(desc['_test-name'])
556            else:
557                test_abort(desc['_test-name'], "Error detected")
558