1#!/usr/local/bin/python3.8 2# 3# Copyright 2012 Ettus Research LLC 4# Copyright 2018 Ettus Research, a National Instruments Company 5# 6# SPDX-License-Identifier: GPL-3.0-or-later 7# 8 9import subprocess, time 10from optparse import OptionParser 11from string import split 12import sys 13import os 14 15try: 16 from gnuradio.eng_option import eng_option 17except: 18 eng_option = None 19 20def launch_test(args="", rate=None, spb=None, spp=0, prefix="", suffix="", extra=[], verbose=False, title=None): 21 real = os.path.realpath(__file__) 22 basedir = os.path.dirname(real) 23 responder = [ 24 os.path.join(basedir, "responder") 25 ] 26 27 if args is not None and len(args) > 0: 28 responder += ["--args=" + args] 29 if rate is not None and rate > 0: 30 responder += ["--rate=%f" % (rate)] 31 if spb is not None and spb > 0: 32 responder += ["--spb=%d" % (spb)] 33 if spp is not None and spp > 0: 34 responder += ["--spp=%d" % (spp)] 35 if prefix is not None and len(prefix) > 0: 36 responder += ["--stats-file-prefix=" + prefix] 37 if suffix is not None and len(suffix) > 0: 38 responder += ["--stats-file-suffix=" + suffix] 39 if extra is not None: 40 responder += extra 41 if title is not None and len(title) > 0: 42 responder += ["--title=\"" + title + "\""] 43 if verbose: 44 print "==> Executing:", " ".join(responder) 45 try: 46 responder += ["--log-file"] # This will produce another output file with logs 47 responder += ["--combine-eob"] 48 p = subprocess.Popen(responder) 49 res = p.wait() # make sure subprocess finishes 50 except KeyboardInterrupt: 51 res = p.wait() # even in CTRL+C case wait till subprocess finishes 52 print "==> Caught CTRL+C" 53 return None 54 55 return res 56 57# These return codes should match the C++ return codes 58class ReturnCode: 59 RETCODE_OK = 0 60 RETCODE_BAD_ARGS = -1 61 RETCODE_RUNTIME_ERROR = -2 62 RETCODE_UNKNOWN_EXCEPTION = -3 63 RETCODE_RECEIVE_TIMEOUT = -4 64 RETCODE_RECEIVE_FAILED = -5 65 RETCODE_MANUAL_ABORT = -6 66 RETCODE_BAD_PACKET = -7 67 RETCODE_OVERFLOW = -8 68 69 70def get_initialized_OptionParser(): 71 def_rates = ".25 1 4 8 25" 72 usage = "%prog: [options] -- [extra arguments]" 73 opt_kwds = {} 74 if eng_option: opt_kwds['option_class'] = eng_option 75 parser = OptionParser(usage=usage, **opt_kwds) 76 77 parser.add_option("", "--rates", type="string", help="sample rates (Msps) [default: %default]", default=def_rates) 78 parser.add_option("", "--spbs", type="string", help="samples per block [default: %default]", 79 default="32 64 256 1024") 80 parser.add_option("", "--spps", type="string", help="samples per packet (0: driver default) [default: %default]", 81 default="0 64 128 256 512") 82 parser.add_option("", "--args", type="string", help="UHD device arguments [default: %default]", default=None) 83 parser.add_option("", "--prefix", type="string", help="Stats filename prefix [default: %default]", default=None) 84 parser.add_option("", "--suffix", type="string", help="Stats filename suffix [default: %default]", default=None) 85 parser.add_option("", "--pause", action="store_true", help="pause between tests [default=%default]", default=False) 86 parser.add_option("", "--interactive", action="store_true", help="enable prompts within test [default=%default]", 87 default=False) 88 parser.add_option("", "--wait", type="float", help="time to wait between tests (seconds) [default=%default]", 89 default=0.0) 90 parser.add_option("", "--abort", action="store_true", help="abort on error [default=%default]", default=False) 91 parser.add_option("", "--verbose", action="store_true", help="be verbose [default=%default]", default=False) 92 parser.add_option("", "--title", type="string", help="test title [default: %default]", default=None) 93 94 return parser 95 96 97def set_gen_prefix(prefix, save_dir): 98 if not save_dir[-1] == "/": 99 save_dir = save_dir + "/" 100 101 if prefix == None: 102 if os.path.exists(save_dir) is not True: 103 os.makedirs(save_dir) 104 prefix = save_dir 105 return prefix 106 107 108def get_extra_args(options, args): 109 extra_args = { 110 "adjust-simulation-rate": None, 111 "time-mul": "1e6", 112 "test-success": 5, 113 "simulate": 1000, 114 "iterations": 1000, 115 "delay-min": "50e-6", 116 "delay-max": "5e-3", 117 "delay-step": "50e-6", 118 } 119 120 if options.interactive is not True: 121 extra_args["batch-mode"] = None 122 if options.pause is True: 123 extra_args["pause"] = None 124 125 for arg in args: 126 if len(arg) > 2 and arg[0:2] == "--": 127 arg = arg[2:] 128 idx = arg.find('=') 129 if idx == -1: 130 extra_args[arg] = None 131 else: 132 extra_args[arg[0:idx]] = arg[idx + 1:] 133 134 def _format_arg(d, k): 135 a = "--" + str(k) 136 if d[k] is not None: 137 a += "=" + str(d[k]) 138 return a 139 140 extra = map(lambda x: _format_arg(extra_args, x), extra_args) 141 142 print "\n".join(map(lambda x: str(x) + " = " + str(extra_args[x]), extra_args.keys())) 143 144 return extra 145 146 147def wait_for_keyboard(): 148 try: 149 print "\nPress ENTER to start..." 150 raw_input() 151 return ReturnCode.RETCODE_OK 152 except KeyboardInterrupt: 153 print "Aborted" 154 return ReturnCode.RETCODE_MANUAL_ABORT 155 156 157def main(): 158 parser = get_initialized_OptionParser() 159 (options, args) = parser.parse_args() 160 161 save_dir = "results" 162 options.prefix = set_gen_prefix(options.prefix, save_dir) 163 extra = get_extra_args(options, args) 164 165 rates = map(lambda x: float(x) * 1e6, split(options.rates)) 166 spbs = map(int, split(options.spbs)) 167 spps = map(int, split(options.spps)) 168 total = len(rates) * len(spbs) * len(spps) 169 170 title = options.title or "" 171 if len(title) >= 2 and title[0] == "\"" and title[-1] == "\"": 172 title = title[1:-1] 173 174 count = 0 175 results = {} 176 177 try: 178 for rate in rates: 179 results_rates = results[rate] = {} 180 for spb in spbs: 181 results_spbs = results_rates[spb] = {} 182 for spp in spps: 183 if count > 0: 184 if options.pause: 185 print "Press ENTER to begin next test..." 186 raw_input() 187 elif options.wait > 0: 188 time.sleep(options.wait) 189 title = "Test #%d of %d (%d%% complete, %d to go)" % ( 190 count + 1, total, int(100 * count / total), total - count - 1) 191 res = launch_test(options.args, rate, spb, spp, options.prefix, options.suffix, extra, 192 options.verbose, title) 193 sys.stdout.flush() 194 count += 1 195 # Break out of loop. Exception thrown if Ctrl + C was pressed. 196 if res is None: 197 raise Exception 198 results_spbs[spp] = res 199 if res < 0 and (res == ReturnCode.RETCODE_MANUAL_ABORT or options.abort): 200 raise Exception 201 except: 202 pass 203 204 for rate in results.keys(): 205 results_rates = results[rate] 206 for spb in results_rates.keys(): 207 results_spbs = results_rates[spb] 208 for spp in results_spbs.keys(): 209 res = results_spbs[spp] 210 print res, ":", rate, spb, spp 211 print "Tests finished" 212 return 0 213 214 215if __name__ == '__main__': 216 main() 217