1# coding=utf-8
2"""
3Runs tox from current directory.
4It supports any runner, but well-known runners (py.test and unittest) are switched to our internal runners to provide
5better support
6"""
7import os
8import pluggy
9from tox import config as tox_config
10from tox.session import Session
11
12from tcmessages import TeamcityServiceMessages
13from tox import exception
14
15teamcity = TeamcityServiceMessages()
16
17hookimpl = pluggy.HookimplMarker("tox")
18helpers_dir = str(os.path.split(__file__)[0])
19
20
21class JbToxHook(object):
22    """
23    Hook to report test start and test end.
24    """
25
26    def __init__(self, config):
27        self.offsets = dict()
28        self.current_env = None
29        self.config = config
30
31    @hookimpl
32    def tox_runtest_pre(self, venv):
33        """
34        Launched before each setup.
35        It means prev env (if any) just finished and new is going to be created
36        :param venv: current virtual env
37        """
38        self.current_env = venv
39        name = venv.name
40        node_id = self.offsets[name]
41        teamcity.testStarted(name, location="tox_env://" + str(name), parentNodeId="0", nodeId=node_id)
42
43    @hookimpl
44    def tox_runtest_post(self, venv):
45        """
46        Finishes currently running env. reporting its state
47        """
48        if not self.current_env:
49            return
50        name = venv.name
51        node_id = self.offsets[name]
52        status = self.current_env.status
53        if isinstance(status, exception.InterpreterNotFound):
54            if self.config.option.skip_missing_interpreters:
55                self._reportFailure("SKIP", status, node_id)
56            else:
57                self._reportFailure("ERROR", status, node_id)
58        elif status == "platform mismatch":
59            self._reportFailure("SKIP", status, node_id)
60        elif status and status == "ignored failed command":
61            print("  %s: %s" % (self.current_env.name, str(status)))
62        elif status and status != "skipped tests":
63            self._reportFailure("ERROR", status, node_id)
64        else:
65            teamcity.testStdOut(self.current_env.name, "\n", nodeId=node_id)
66            teamcity.testFinished(self.current_env.name, nodeId=node_id)
67        self.current_env = None
68
69    def _reportFailure(self, state, message, node_id):
70        """
71        In idBased mode each test is leaf, there is no suites, so we can rerport directly to the test
72        :param state: SKIP or ERROR (suite result)
73        """
74        if state == "SKIP":
75            teamcity.testIgnored(state, str(message), nodeId=node_id)
76        else:
77            teamcity.testFailed(state, str(message), nodeId=node_id)
78
79
80class Fixer(object):
81    def __init__(self, runner_name):
82        self.runner_name = runner_name
83
84    def fix(self, command, bin, offset):
85        return [bin, os.path.join(helpers_dir, self.runner_name), "--offset", str(offset), "--"]
86
87    def is_parallel(self, *args, **kwargs):
88        return False
89
90
91class _Unit2(Fixer):
92    def __init__(self):
93        super(_Unit2, self).__init__("_jb_unittest_runner.py")
94
95    def fix(self, command, bin, offset):
96        if command[0] == "unit2":
97            return [bin, os.path.join(helpers_dir, "utrunner.py")] + command[1:] + ["true"]
98        elif command == ["python", "-m", "unittest", "discover"]:
99            return super(_Unit2, self).fix(command, bin, offset) + ["discover"]
100        return None
101
102
103class _PyTest(Fixer):
104    def __init__(self):
105        super(_PyTest, self).__init__("_jb_pytest_runner.py")
106
107    def is_parallel(self, config):  # If xdist is used, then pytest will use parallel run
108        deps = getattr(config, "deps", [])
109        return bool([d for d in deps if d.name == "pytest-xdist"])
110
111    def fix(self, command, bin, offset):
112        if command[0] not in ["pytest", "py.test"]:
113            return None
114        return super(_PyTest, self).fix(command, bin, offset) + command[1:]
115
116
117class _Nose(Fixer):
118    def __init__(self):
119        super(_Nose, self).__init__("_jb_nosetest_runner.py")
120
121    def fix(self, command, bin, offset):
122        if command[0] != "nosetests":
123            return None
124        return super(_Nose, self).fix(command, bin, offset) + command[1:]
125
126
127_RUNNERS = [_Unit2(), _PyTest(), _Nose()]
128
129import sys
130
131durationStrategy = "automatic"
132config = tox_config.parseconfig(args=sys.argv[1:])
133hook = JbToxHook(config)
134config.pluginmanager.register(hook, "jbtoxplugin")
135offset = 1
136for env, tmp_config in config.envconfigs.items():
137    hook.offsets[env] = offset
138    if not tmp_config.setenv:
139        tmp_config.setenv = dict()
140    tmp_config.setenv["_jb_do_not_call_enter_matrix"] = "1"
141    commands = tmp_config.commands
142
143    if "_jb_do_not_patch_test_runners" not in os.environ and isinstance(commands, list):
144        for fixer in _RUNNERS:
145            _env = config.envconfigs[env]
146            for i, command in enumerate(commands):
147                if command:
148                    fixed_command = fixer.fix(command, str(_env.envpython), offset)
149                    if fixer.is_parallel(tmp_config):
150                        durationStrategy = "manual"
151                    if fixed_command:
152                        commands[i] = fixed_command
153    tmp_config.commands = commands
154    offset += 10000
155
156session = Session(config)
157teamcity.testMatrixEntered(durationStrategy=durationStrategy)
158sys.exit(session.runcommand())
159