1import py 2import pytest 3 4from xdist.workermanage import NodeManager 5from xdist.scheduler import ( 6 EachScheduling, 7 LoadScheduling, 8 LoadScopeScheduling, 9 LoadFileScheduling, 10) 11 12 13from six.moves.queue import Empty, Queue 14 15 16class Interrupted(KeyboardInterrupt): 17 """ signals an immediate interruption. """ 18 19 20class DSession(object): 21 """A pytest plugin which runs a distributed test session 22 23 At the beginning of the test session this creates a NodeManager 24 instance which creates and starts all nodes. Nodes then emit 25 events processed in the pytest_runtestloop hook using the worker_* 26 methods. 27 28 Once a node is started it will automatically start running the 29 pytest mainloop with some custom hooks. This means a node 30 automatically starts collecting tests. Once tests are collected 31 it will wait for instructions. 32 """ 33 34 def __init__(self, config): 35 self.config = config 36 self.log = py.log.Producer("dsession") 37 if not config.option.debug: 38 py.log.setconsumer(self.log._keywords, None) 39 self.nodemanager = None 40 self.sched = None 41 self.shuttingdown = False 42 self.countfailures = 0 43 self.maxfail = config.getvalue("maxfail") 44 self.queue = Queue() 45 self._session = None 46 self._failed_collection_errors = {} 47 self._active_nodes = set() 48 self._failed_nodes_count = 0 49 self._max_worker_restart = get_default_max_worker_restart(self.config) 50 # summary message to print at the end of the session 51 self._summary_report = None 52 self.terminal = config.pluginmanager.getplugin("terminalreporter") 53 if self.terminal: 54 self.trdist = TerminalDistReporter(config) 55 config.pluginmanager.register(self.trdist, "terminaldistreporter") 56 57 @property 58 def session_finished(self): 59 """Return True if the distributed session has finished 60 61 This means all nodes have executed all test items. This is 62 used by pytest_runtestloop to break out of its loop. 63 """ 64 return bool(self.shuttingdown and not self._active_nodes) 65 66 def report_line(self, line): 67 if self.terminal and self.config.option.verbose >= 0: 68 self.terminal.write_line(line) 69 70 @pytest.mark.trylast 71 def pytest_sessionstart(self, session): 72 """Creates and starts the nodes. 73 74 The nodes are setup to put their events onto self.queue. As 75 soon as nodes start they will emit the worker_workerready event. 76 """ 77 self.nodemanager = NodeManager(self.config) 78 nodes = self.nodemanager.setup_nodes(putevent=self.queue.put) 79 self._active_nodes.update(nodes) 80 self._session = session 81 82 def pytest_sessionfinish(self, session): 83 """Shutdown all nodes.""" 84 nm = getattr(self, "nodemanager", None) # if not fully initialized 85 if nm is not None: 86 nm.teardown_nodes() 87 self._session = None 88 89 def pytest_collection(self): 90 # prohibit collection of test items in master process 91 return True 92 93 @pytest.mark.trylast 94 def pytest_xdist_make_scheduler(self, config, log): 95 dist = config.getvalue("dist") 96 schedulers = { 97 "each": EachScheduling, 98 "load": LoadScheduling, 99 "loadscope": LoadScopeScheduling, 100 "loadfile": LoadFileScheduling, 101 } 102 return schedulers[dist](config, log) 103 104 def pytest_runtestloop(self): 105 self.sched = self.config.hook.pytest_xdist_make_scheduler( 106 config=self.config, log=self.log 107 ) 108 assert self.sched is not None 109 110 self.shouldstop = False 111 while not self.session_finished: 112 self.loop_once() 113 if self.shouldstop: 114 self.triggershutdown() 115 raise Interrupted(str(self.shouldstop)) 116 return True 117 118 def loop_once(self): 119 """Process one callback from one of the workers.""" 120 while 1: 121 if not self._active_nodes: 122 # If everything has died stop looping 123 self.triggershutdown() 124 raise RuntimeError("Unexpectedly no active workers available") 125 try: 126 eventcall = self.queue.get(timeout=2.0) 127 break 128 except Empty: 129 continue 130 callname, kwargs = eventcall 131 assert callname, kwargs 132 method = "worker_" + callname 133 call = getattr(self, method) 134 self.log("calling method", method, kwargs) 135 call(**kwargs) 136 if self.sched.tests_finished: 137 self.triggershutdown() 138 139 # 140 # callbacks for processing events from workers 141 # 142 143 def worker_workerready(self, node, workerinfo): 144 """Emitted when a node first starts up. 145 146 This adds the node to the scheduler, nodes continue with 147 collection without any further input. 148 """ 149 node.workerinfo = workerinfo 150 node.workerinfo["id"] = node.gateway.id 151 node.workerinfo["spec"] = node.gateway.spec 152 153 # TODO: (#234 task) needs this for pytest. Remove when refactor in pytest repo 154 node.slaveinfo = node.workerinfo 155 156 self.config.hook.pytest_testnodeready(node=node) 157 if self.shuttingdown: 158 node.shutdown() 159 else: 160 self.sched.add_node(node) 161 162 def worker_workerfinished(self, node): 163 """Emitted when node executes its pytest_sessionfinish hook. 164 165 Removes the node from the scheduler. 166 167 The node might not be in the scheduler if it had not emitted 168 workerready before shutdown was triggered. 169 """ 170 self.config.hook.pytest_testnodedown(node=node, error=None) 171 if node.workeroutput["exitstatus"] == 2: # keyboard-interrupt 172 self.shouldstop = "{} received keyboard-interrupt".format(node) 173 self.worker_errordown(node, "keyboard-interrupt") 174 return 175 if node in self.sched.nodes: 176 crashitem = self.sched.remove_node(node) 177 assert not crashitem, (crashitem, node) 178 self._active_nodes.remove(node) 179 180 def worker_errordown(self, node, error): 181 """Emitted by the WorkerController when a node dies.""" 182 self.config.hook.pytest_testnodedown(node=node, error=error) 183 try: 184 crashitem = self.sched.remove_node(node) 185 except KeyError: 186 pass 187 else: 188 if crashitem: 189 self.handle_crashitem(crashitem, node) 190 191 self._failed_nodes_count += 1 192 maximum_reached = ( 193 self._max_worker_restart is not None 194 and self._failed_nodes_count > self._max_worker_restart 195 ) 196 if maximum_reached: 197 if self._max_worker_restart == 0: 198 msg = "worker {} crashed and worker restarting disabled".format( 199 node.gateway.id 200 ) 201 else: 202 msg = "maximum crashed workers reached: %d" % self._max_worker_restart 203 self._summary_report = msg 204 self.report_line("\n" + msg) 205 self.triggershutdown() 206 else: 207 self.report_line("\nreplacing crashed worker %s" % node.gateway.id) 208 self._clone_node(node) 209 self._active_nodes.remove(node) 210 211 def pytest_terminal_summary(self, terminalreporter): 212 if self.config.option.verbose >= 0 and self._summary_report: 213 terminalreporter.write_sep("=", "xdist: {}".format(self._summary_report)) 214 215 def worker_collectionfinish(self, node, ids): 216 """worker has finished test collection. 217 218 This adds the collection for this node to the scheduler. If 219 the scheduler indicates collection is finished (i.e. all 220 initial nodes have submitted their collections), then tells the 221 scheduler to schedule the collected items. When initiating 222 scheduling the first time it logs which scheduler is in use. 223 """ 224 if self.shuttingdown: 225 return 226 self.config.hook.pytest_xdist_node_collection_finished(node=node, ids=ids) 227 # tell session which items were effectively collected otherwise 228 # the master node will finish the session with EXIT_NOTESTSCOLLECTED 229 self._session.testscollected = len(ids) 230 self.sched.add_node_collection(node, ids) 231 if self.terminal: 232 self.trdist.setstatus(node.gateway.spec, "[%d]" % (len(ids))) 233 if self.sched.collection_is_completed: 234 if self.terminal and not self.sched.has_pending: 235 self.trdist.ensure_show_status() 236 self.terminal.write_line("") 237 if self.config.option.verbose > 0: 238 self.terminal.write_line( 239 "scheduling tests via %s" % (self.sched.__class__.__name__) 240 ) 241 self.sched.schedule() 242 243 def worker_logstart(self, node, nodeid, location): 244 """Emitted when a node calls the pytest_runtest_logstart hook.""" 245 self.config.hook.pytest_runtest_logstart(nodeid=nodeid, location=location) 246 247 def worker_logfinish(self, node, nodeid, location): 248 """Emitted when a node calls the pytest_runtest_logfinish hook.""" 249 self.config.hook.pytest_runtest_logfinish(nodeid=nodeid, location=location) 250 251 def worker_testreport(self, node, rep): 252 """Emitted when a node calls the pytest_runtest_logreport hook.""" 253 rep.node = node 254 self.config.hook.pytest_runtest_logreport(report=rep) 255 self._handlefailures(rep) 256 257 def worker_runtest_protocol_complete(self, node, item_index, duration): 258 """ 259 Emitted when a node fires the 'runtest_protocol_complete' event, 260 signalling that a test has completed the runtestprotocol and should be 261 removed from the pending list in the scheduler. 262 """ 263 self.sched.mark_test_complete(node, item_index, duration) 264 265 def worker_collectreport(self, node, rep): 266 """Emitted when a node calls the pytest_collectreport hook. 267 268 Because we only need the report when there's a failure/skip, as optimization 269 we only expect to receive failed/skipped reports from workers (#330). 270 """ 271 assert not rep.passed 272 self._failed_worker_collectreport(node, rep) 273 274 def worker_logwarning(self, message, code, nodeid, fslocation): 275 """Emitted when a node calls the pytest_logwarning hook.""" 276 kwargs = dict(message=message, code=code, nodeid=nodeid, fslocation=fslocation) 277 self.config.hook.pytest_logwarning.call_historic(kwargs=kwargs) 278 279 def worker_warning_captured(self, warning_message, when, item): 280 """Emitted when a node calls the pytest_logwarning hook.""" 281 kwargs = dict(warning_message=warning_message, when=when, item=item) 282 self.config.hook.pytest_warning_captured.call_historic(kwargs=kwargs) 283 284 def _clone_node(self, node): 285 """Return new node based on an existing one. 286 287 This is normally for when a node dies, this will copy the spec 288 of the existing node and create a new one with a new id. The 289 new node will have been setup so it will start calling the 290 "worker_*" hooks and do work soon. 291 """ 292 spec = node.gateway.spec 293 spec.id = None 294 self.nodemanager.group.allocate_id(spec) 295 node = self.nodemanager.setup_node(spec, self.queue.put) 296 self._active_nodes.add(node) 297 return node 298 299 def _failed_worker_collectreport(self, node, rep): 300 # Check we haven't already seen this report (from 301 # another worker). 302 if rep.longrepr not in self._failed_collection_errors: 303 self._failed_collection_errors[rep.longrepr] = True 304 self.config.hook.pytest_collectreport(report=rep) 305 self._handlefailures(rep) 306 307 def _handlefailures(self, rep): 308 if rep.failed: 309 self.countfailures += 1 310 if self.maxfail and self.countfailures >= self.maxfail: 311 self.shouldstop = "stopping after %d failures" % (self.countfailures) 312 313 def triggershutdown(self): 314 self.log("triggering shutdown") 315 self.shuttingdown = True 316 for node in self.sched.nodes: 317 node.shutdown() 318 319 def handle_crashitem(self, nodeid, worker): 320 # XXX get more reporting info by recording pytest_runtest_logstart? 321 # XXX count no of failures and retry N times 322 runner = self.config.pluginmanager.getplugin("runner") 323 fspath = nodeid.split("::")[0] 324 msg = "worker {!r} crashed while running {!r}".format(worker.gateway.id, nodeid) 325 rep = runner.TestReport( 326 nodeid, (fspath, None, fspath), (), "failed", msg, "???" 327 ) 328 rep.node = worker 329 self.config.hook.pytest_runtest_logreport(report=rep) 330 331 332class TerminalDistReporter(object): 333 def __init__(self, config): 334 self.config = config 335 self.tr = config.pluginmanager.getplugin("terminalreporter") 336 self._status = {} 337 self._lastlen = 0 338 self._isatty = getattr(self.tr, "isatty", self.tr.hasmarkup) 339 340 def write_line(self, msg): 341 self.tr.write_line(msg) 342 343 def ensure_show_status(self): 344 if not self._isatty: 345 self.write_line(self.getstatus()) 346 347 def setstatus(self, spec, status, show=True): 348 self._status[spec.id] = status 349 if show and self._isatty: 350 self.rewrite(self.getstatus()) 351 352 def getstatus(self): 353 if self.config.option.verbose >= 0: 354 parts = [ 355 "{} {}".format(spec.id, self._status[spec.id]) for spec in self._specs 356 ] 357 return " / ".join(parts) 358 else: 359 return "bringing up nodes..." 360 361 def rewrite(self, line, newline=False): 362 pline = line + " " * max(self._lastlen - len(line), 0) 363 if newline: 364 self._lastlen = 0 365 pline += "\n" 366 else: 367 self._lastlen = len(line) 368 self.tr.rewrite(pline, bold=True) 369 370 def pytest_xdist_setupnodes(self, specs): 371 self._specs = specs 372 for spec in specs: 373 self.setstatus(spec, "I", show=False) 374 self.setstatus(spec, "I", show=True) 375 self.ensure_show_status() 376 377 def pytest_xdist_newgateway(self, gateway): 378 if self.config.option.verbose > 0: 379 rinfo = gateway._rinfo() 380 version = "%s.%s.%s" % rinfo.version_info[:3] 381 self.rewrite( 382 "[%s] %s Python %s cwd: %s" 383 % (gateway.id, rinfo.platform, version, rinfo.cwd), 384 newline=True, 385 ) 386 self.setstatus(gateway.spec, "C") 387 388 def pytest_testnodeready(self, node): 389 if self.config.option.verbose > 0: 390 d = node.workerinfo 391 infoline = "[{}] Python {}".format( 392 d["id"], d["version"].replace("\n", " -- ") 393 ) 394 self.rewrite(infoline, newline=True) 395 self.setstatus(node.gateway.spec, "ok") 396 397 def pytest_testnodedown(self, node, error): 398 if not error: 399 return 400 self.write_line("[{}] node down: {}".format(node.gateway.id, error)) 401 402 403def get_default_max_worker_restart(config): 404 """gets the default value of --max-worker-restart option if it is not provided. 405 406 Use a reasonable default to avoid workers from restarting endlessly due to crashing collections (#226). 407 """ 408 result = config.option.maxworkerrestart 409 if result is not None: 410 result = int(result) 411 elif config.option.numprocesses: 412 # if --max-worker-restart was not provided, use a reasonable default (#226) 413 result = config.option.numprocesses * 4 414 return result 415