1""" 2Stopwatch plugin for 'nose'. 3 4 5""" 6import sys 7err = sys.stderr 8try: 9 from cPickle import dump, load 10except ImportError: 11 from pickle import dump, load 12 13import time 14import logging 15import os 16from nose.plugins.base import Plugin 17 18log = logging.getLogger(__name__) 19 20class Stopwatch(Plugin): 21 def __init__(self): 22 Plugin.__init__(self) 23 self.dontrun = {} 24 self.times = {} 25 26 def add_options(self, parser, env=os.environ): 27 Plugin.add_options(self, parser, env) 28 parser.add_option("--faster-than", 29 action="store", 30 type="float", 31 dest="faster_than", 32 default=None, 33 help="Run only tests that are faster than FASTER_THAN seconds.") 34 parser.add_option("--stopwatch-file", 35 action="store", 36 dest="stopwatch_file", 37 default=".nose-stopwatch-times", 38 help="Store test timing results in this file.") 39 40 def configure(self, options, config): 41 ### configure logging 42 logger = logging.getLogger(__name__) 43 logger.propagate = 0 44 45 handler = logging.StreamHandler(err) 46 logger.addHandler(handler) 47 logger.setLevel(logging.WARNING) 48 49 Plugin.configure(self, options, config) 50 51 ### configure stopwatch stuff: file containing times, and 52 ### time cutoff. 53 54 self.stopwatch_file = os.path.abspath(options.stopwatch_file) 55 self.faster_than = options.faster_than 56 57 log.info('stopwatch module: using "%s" for stopwatch times' % \ 58 (self.stopwatch_file,)) 59 log.info('selecting tests that run faster than %s seconds' % \ 60 (self.faster_than,)) 61 62 def begin(self): 63 """ 64 Run before any of the tests run. Loads the stopwatch file, and 65 calculates which tests should NOT be run. 66 """ 67 try: 68 self.times = load(open(self.stopwatch_file, 'rb')) 69 except (IOError, EOFError): 70 self.times = {} 71 72 # figure out which tests should NOT be run. 73 74 faster_than = self.faster_than 75 if faster_than is not None: 76 for (k, v) in list(self.times.items()): 77 if v > faster_than: 78 self.dontrun[k] = 1 79 80 def finalize(self, result): 81 """ 82 Save the recorded times, OR dump them into /tmp if the file 83 open fails. 84 """ 85 try: 86 fp = open(self.stopwatch_file, 'wb') 87 except (IOError, OSError): 88 t = int(time.time()) 89 filename = '/tmp/nose-stopwatch-%s.pickle' % (t,) 90 fp = open(filename, 'w') 91 log.warning('WARNING: stopwatch cannot write to "%s"' % (self.stopwatch_file)) 92 log.warning('WARNING: stopwatch is using "%s" to save times' % (filename,)) 93 94 dump(self.times, fp) 95 fp.close() 96 97 def wantMethod(self, method): 98 """ 99 Do we want to run this method? See _should_run. 100 """ 101 # Use the class that asked for the unbound method 102 if hasattr(method, 'im_class'): 103 fullname = '%s.%s.%s' % (method.im_class.__module__, 104 method.im_class.__name__, 105 method.__name__) 106 else: 107 fullname = '%s.%s.%s' % (method.__self__.__class__.__module__, 108 method.__self__.__class__.__name__, 109 method.__name__) 110 111 return self._should_run(fullname) 112 113 def wantFunction(self, func): 114 """ 115 Do we want to run this function? See _should_run. 116 """ 117 fullname = '%s.%s' % (func.__module__, 118 func.__name__) 119 120 return self._should_run(fullname) 121 122 ### note, no wantClass, because classes only *contain* tests, but 123 ### aren't tests themselves. 124 125 def _should_run(self, name): 126 """ 127 If we have this test listed as "don't run" because of explicit 128 time constraints, don't run it. Otherwise, indicate no preference. 129 """ 130 if name in self.dontrun: 131 return False 132 133 return None 134 135 def startTest(self, test): 136 """ 137 startTest: start timing. 138 """ 139 self._started = time.time() 140 141 def stopTest(self, test): 142 """ 143 stopTest: stop timing, and save the run time. 144 """ 145 runtime = time.time() - self._started 146 self.times[test.test.id()] = runtime 147