1import py
2import pytest
3
4from xdist.workermanage import NodeManager
5from xdist.scheduler import (
6    EachScheduling,
7    LoadScheduling,
8    LoadScopeScheduling,
9    LoadFileScheduling,
10)
11
12
13from six.moves.queue import Empty, Queue
14
15
16class Interrupted(KeyboardInterrupt):
17    """ signals an immediate interruption. """
18
19
20class DSession(object):
21    """A pytest plugin which runs a distributed test session
22
23    At the beginning of the test session this creates a NodeManager
24    instance which creates and starts all nodes.  Nodes then emit
25    events processed in the pytest_runtestloop hook using the worker_*
26    methods.
27
28    Once a node is started it will automatically start running the
29    pytest mainloop with some custom hooks.  This means a node
30    automatically starts collecting tests.  Once tests are collected
31    it will wait for instructions.
32    """
33
34    def __init__(self, config):
35        self.config = config
36        self.log = py.log.Producer("dsession")
37        if not config.option.debug:
38            py.log.setconsumer(self.log._keywords, None)
39        self.nodemanager = None
40        self.sched = None
41        self.shuttingdown = False
42        self.countfailures = 0
43        self.maxfail = config.getvalue("maxfail")
44        self.queue = Queue()
45        self._session = None
46        self._failed_collection_errors = {}
47        self._active_nodes = set()
48        self._failed_nodes_count = 0
49        self._max_worker_restart = get_default_max_worker_restart(self.config)
50        # summary message to print at the end of the session
51        self._summary_report = None
52        self.terminal = config.pluginmanager.getplugin("terminalreporter")
53        if self.terminal:
54            self.trdist = TerminalDistReporter(config)
55            config.pluginmanager.register(self.trdist, "terminaldistreporter")
56
57    @property
58    def session_finished(self):
59        """Return True if the distributed session has finished
60
61        This means all nodes have executed all test items.  This is
62        used by pytest_runtestloop to break out of its loop.
63        """
64        return bool(self.shuttingdown and not self._active_nodes)
65
66    def report_line(self, line):
67        if self.terminal and self.config.option.verbose >= 0:
68            self.terminal.write_line(line)
69
70    @pytest.mark.trylast
71    def pytest_sessionstart(self, session):
72        """Creates and starts the nodes.
73
74        The nodes are setup to put their events onto self.queue.  As
75        soon as nodes start they will emit the worker_workerready event.
76        """
77        self.nodemanager = NodeManager(self.config)
78        nodes = self.nodemanager.setup_nodes(putevent=self.queue.put)
79        self._active_nodes.update(nodes)
80        self._session = session
81
82    def pytest_sessionfinish(self, session):
83        """Shutdown all nodes."""
84        nm = getattr(self, "nodemanager", None)  # if not fully initialized
85        if nm is not None:
86            nm.teardown_nodes()
87        self._session = None
88
89    def pytest_collection(self):
90        # prohibit collection of test items in master process
91        return True
92
93    @pytest.mark.trylast
94    def pytest_xdist_make_scheduler(self, config, log):
95        dist = config.getvalue("dist")
96        schedulers = {
97            "each": EachScheduling,
98            "load": LoadScheduling,
99            "loadscope": LoadScopeScheduling,
100            "loadfile": LoadFileScheduling,
101        }
102        return schedulers[dist](config, log)
103
104    def pytest_runtestloop(self):
105        self.sched = self.config.hook.pytest_xdist_make_scheduler(
106            config=self.config, log=self.log
107        )
108        assert self.sched is not None
109
110        self.shouldstop = False
111        while not self.session_finished:
112            self.loop_once()
113            if self.shouldstop:
114                self.triggershutdown()
115                raise Interrupted(str(self.shouldstop))
116        return True
117
118    def loop_once(self):
119        """Process one callback from one of the workers."""
120        while 1:
121            if not self._active_nodes:
122                # If everything has died stop looping
123                self.triggershutdown()
124                raise RuntimeError("Unexpectedly no active workers available")
125            try:
126                eventcall = self.queue.get(timeout=2.0)
127                break
128            except Empty:
129                continue
130        callname, kwargs = eventcall
131        assert callname, kwargs
132        method = "worker_" + callname
133        call = getattr(self, method)
134        self.log("calling method", method, kwargs)
135        call(**kwargs)
136        if self.sched.tests_finished:
137            self.triggershutdown()
138
139    #
140    # callbacks for processing events from workers
141    #
142
143    def worker_workerready(self, node, workerinfo):
144        """Emitted when a node first starts up.
145
146        This adds the node to the scheduler, nodes continue with
147        collection without any further input.
148        """
149        node.workerinfo = workerinfo
150        node.workerinfo["id"] = node.gateway.id
151        node.workerinfo["spec"] = node.gateway.spec
152
153        # TODO: (#234 task) needs this for pytest. Remove when refactor in pytest repo
154        node.slaveinfo = node.workerinfo
155
156        self.config.hook.pytest_testnodeready(node=node)
157        if self.shuttingdown:
158            node.shutdown()
159        else:
160            self.sched.add_node(node)
161
162    def worker_workerfinished(self, node):
163        """Emitted when node executes its pytest_sessionfinish hook.
164
165        Removes the node from the scheduler.
166
167        The node might not be in the scheduler if it had not emitted
168        workerready before shutdown was triggered.
169        """
170        self.config.hook.pytest_testnodedown(node=node, error=None)
171        if node.workeroutput["exitstatus"] == 2:  # keyboard-interrupt
172            self.shouldstop = "{} received keyboard-interrupt".format(node)
173            self.worker_errordown(node, "keyboard-interrupt")
174            return
175        if node in self.sched.nodes:
176            crashitem = self.sched.remove_node(node)
177            assert not crashitem, (crashitem, node)
178        self._active_nodes.remove(node)
179
180    def worker_errordown(self, node, error):
181        """Emitted by the WorkerController when a node dies."""
182        self.config.hook.pytest_testnodedown(node=node, error=error)
183        try:
184            crashitem = self.sched.remove_node(node)
185        except KeyError:
186            pass
187        else:
188            if crashitem:
189                self.handle_crashitem(crashitem, node)
190
191        self._failed_nodes_count += 1
192        maximum_reached = (
193            self._max_worker_restart is not None
194            and self._failed_nodes_count > self._max_worker_restart
195        )
196        if maximum_reached:
197            if self._max_worker_restart == 0:
198                msg = "worker {} crashed and worker restarting disabled".format(
199                    node.gateway.id
200                )
201            else:
202                msg = "maximum crashed workers reached: %d" % self._max_worker_restart
203            self._summary_report = msg
204            self.report_line("\n" + msg)
205            self.triggershutdown()
206        else:
207            self.report_line("\nreplacing crashed worker %s" % node.gateway.id)
208            self._clone_node(node)
209        self._active_nodes.remove(node)
210
211    def pytest_terminal_summary(self, terminalreporter):
212        if self.config.option.verbose >= 0 and self._summary_report:
213            terminalreporter.write_sep("=", "xdist: {}".format(self._summary_report))
214
215    def worker_collectionfinish(self, node, ids):
216        """worker has finished test collection.
217
218        This adds the collection for this node to the scheduler.  If
219        the scheduler indicates collection is finished (i.e. all
220        initial nodes have submitted their collections), then tells the
221        scheduler to schedule the collected items.  When initiating
222        scheduling the first time it logs which scheduler is in use.
223        """
224        if self.shuttingdown:
225            return
226        self.config.hook.pytest_xdist_node_collection_finished(node=node, ids=ids)
227        # tell session which items were effectively collected otherwise
228        # the master node will finish the session with EXIT_NOTESTSCOLLECTED
229        self._session.testscollected = len(ids)
230        self.sched.add_node_collection(node, ids)
231        if self.terminal:
232            self.trdist.setstatus(node.gateway.spec, "[%d]" % (len(ids)))
233        if self.sched.collection_is_completed:
234            if self.terminal and not self.sched.has_pending:
235                self.trdist.ensure_show_status()
236                self.terminal.write_line("")
237                if self.config.option.verbose > 0:
238                    self.terminal.write_line(
239                        "scheduling tests via %s" % (self.sched.__class__.__name__)
240                    )
241            self.sched.schedule()
242
243    def worker_logstart(self, node, nodeid, location):
244        """Emitted when a node calls the pytest_runtest_logstart hook."""
245        self.config.hook.pytest_runtest_logstart(nodeid=nodeid, location=location)
246
247    def worker_logfinish(self, node, nodeid, location):
248        """Emitted when a node calls the pytest_runtest_logfinish hook."""
249        self.config.hook.pytest_runtest_logfinish(nodeid=nodeid, location=location)
250
251    def worker_testreport(self, node, rep):
252        """Emitted when a node calls the pytest_runtest_logreport hook."""
253        rep.node = node
254        self.config.hook.pytest_runtest_logreport(report=rep)
255        self._handlefailures(rep)
256
257    def worker_runtest_protocol_complete(self, node, item_index, duration):
258        """
259        Emitted when a node fires the 'runtest_protocol_complete' event,
260        signalling that a test has completed the runtestprotocol and should be
261        removed from the pending list in the scheduler.
262        """
263        self.sched.mark_test_complete(node, item_index, duration)
264
265    def worker_collectreport(self, node, rep):
266        """Emitted when a node calls the pytest_collectreport hook.
267
268        Because we only need the report when there's a failure/skip, as optimization
269        we only expect to receive failed/skipped reports from workers (#330).
270        """
271        assert not rep.passed
272        self._failed_worker_collectreport(node, rep)
273
274    def worker_logwarning(self, message, code, nodeid, fslocation):
275        """Emitted when a node calls the pytest_logwarning hook."""
276        kwargs = dict(message=message, code=code, nodeid=nodeid, fslocation=fslocation)
277        self.config.hook.pytest_logwarning.call_historic(kwargs=kwargs)
278
279    def worker_warning_captured(self, warning_message, when, item):
280        """Emitted when a node calls the pytest_logwarning hook."""
281        kwargs = dict(warning_message=warning_message, when=when, item=item)
282        self.config.hook.pytest_warning_captured.call_historic(kwargs=kwargs)
283
284    def _clone_node(self, node):
285        """Return new node based on an existing one.
286
287        This is normally for when a node dies, this will copy the spec
288        of the existing node and create a new one with a new id.  The
289        new node will have been setup so it will start calling the
290        "worker_*" hooks and do work soon.
291        """
292        spec = node.gateway.spec
293        spec.id = None
294        self.nodemanager.group.allocate_id(spec)
295        node = self.nodemanager.setup_node(spec, self.queue.put)
296        self._active_nodes.add(node)
297        return node
298
299    def _failed_worker_collectreport(self, node, rep):
300        # Check we haven't already seen this report (from
301        # another worker).
302        if rep.longrepr not in self._failed_collection_errors:
303            self._failed_collection_errors[rep.longrepr] = True
304            self.config.hook.pytest_collectreport(report=rep)
305            self._handlefailures(rep)
306
307    def _handlefailures(self, rep):
308        if rep.failed:
309            self.countfailures += 1
310            if self.maxfail and self.countfailures >= self.maxfail:
311                self.shouldstop = "stopping after %d failures" % (self.countfailures)
312
313    def triggershutdown(self):
314        self.log("triggering shutdown")
315        self.shuttingdown = True
316        for node in self.sched.nodes:
317            node.shutdown()
318
319    def handle_crashitem(self, nodeid, worker):
320        # XXX get more reporting info by recording pytest_runtest_logstart?
321        # XXX count no of failures and retry N times
322        runner = self.config.pluginmanager.getplugin("runner")
323        fspath = nodeid.split("::")[0]
324        msg = "worker {!r} crashed while running {!r}".format(worker.gateway.id, nodeid)
325        rep = runner.TestReport(
326            nodeid, (fspath, None, fspath), (), "failed", msg, "???"
327        )
328        rep.node = worker
329        self.config.hook.pytest_runtest_logreport(report=rep)
330
331
332class TerminalDistReporter(object):
333    def __init__(self, config):
334        self.config = config
335        self.tr = config.pluginmanager.getplugin("terminalreporter")
336        self._status = {}
337        self._lastlen = 0
338        self._isatty = getattr(self.tr, "isatty", self.tr.hasmarkup)
339
340    def write_line(self, msg):
341        self.tr.write_line(msg)
342
343    def ensure_show_status(self):
344        if not self._isatty:
345            self.write_line(self.getstatus())
346
347    def setstatus(self, spec, status, show=True):
348        self._status[spec.id] = status
349        if show and self._isatty:
350            self.rewrite(self.getstatus())
351
352    def getstatus(self):
353        if self.config.option.verbose >= 0:
354            parts = [
355                "{} {}".format(spec.id, self._status[spec.id]) for spec in self._specs
356            ]
357            return " / ".join(parts)
358        else:
359            return "bringing up nodes..."
360
361    def rewrite(self, line, newline=False):
362        pline = line + " " * max(self._lastlen - len(line), 0)
363        if newline:
364            self._lastlen = 0
365            pline += "\n"
366        else:
367            self._lastlen = len(line)
368        self.tr.rewrite(pline, bold=True)
369
370    def pytest_xdist_setupnodes(self, specs):
371        self._specs = specs
372        for spec in specs:
373            self.setstatus(spec, "I", show=False)
374        self.setstatus(spec, "I", show=True)
375        self.ensure_show_status()
376
377    def pytest_xdist_newgateway(self, gateway):
378        if self.config.option.verbose > 0:
379            rinfo = gateway._rinfo()
380            version = "%s.%s.%s" % rinfo.version_info[:3]
381            self.rewrite(
382                "[%s] %s Python %s cwd: %s"
383                % (gateway.id, rinfo.platform, version, rinfo.cwd),
384                newline=True,
385            )
386        self.setstatus(gateway.spec, "C")
387
388    def pytest_testnodeready(self, node):
389        if self.config.option.verbose > 0:
390            d = node.workerinfo
391            infoline = "[{}] Python {}".format(
392                d["id"], d["version"].replace("\n", " -- ")
393            )
394            self.rewrite(infoline, newline=True)
395        self.setstatus(node.gateway.spec, "ok")
396
397    def pytest_testnodedown(self, node, error):
398        if not error:
399            return
400        self.write_line("[{}] node down: {}".format(node.gateway.id, error))
401
402
403def get_default_max_worker_restart(config):
404    """gets the default value of --max-worker-restart option if it is not provided.
405
406    Use a reasonable default to avoid workers from restarting endlessly due to crashing collections (#226).
407    """
408    result = config.option.maxworkerrestart
409    if result is not None:
410        result = int(result)
411    elif config.option.numprocesses:
412        # if --max-worker-restart was not provided, use a reasonable default (#226)
413        result = config.option.numprocesses * 4
414    return result
415