1import multiprocessing
2import os
3import time
4
5import lit10.Test
6import lit10.util
7import lit10.worker
8
9
10# No-operation semaphore for supporting `None` for parallelism_groups.
11#   lit_config.parallelism_groups['my_group'] = None
12class NopSemaphore(object):
13    def acquire(self): pass
14    def release(self): pass
15
16
17def create_run(tests, lit_config, workers, progress_callback, max_failures,
18               timeout):
19    assert workers > 0
20    if workers == 1:
21        return SerialRun(tests, lit_config, progress_callback, max_failures, timeout)
22    return ParallelRun(tests, lit_config, progress_callback, max_failures, timeout, workers)
23
24
25class Run(object):
26    """A concrete, configured testing run."""
27
28    def __init__(self, tests, lit_config, progress_callback, max_failures, timeout):
29        self.tests = tests
30        self.lit_config = lit_config
31        self.progress_callback = progress_callback
32        self.max_failures = max_failures
33        self.timeout = timeout
34
35    def execute(self):
36        """
37        Execute the tests in the run using up to the specified number of
38        parallel tasks, and inform the caller of each individual result. The
39        provided tests should be a subset of the tests available in this run
40        object.
41
42        The progress_callback will be invoked for each completed test.
43
44        If timeout is non-None, it should be a time in seconds after which to
45        stop executing tests.
46
47        Returns the elapsed testing time.
48
49        Upon completion, each test in the run will have its result
50        computed. Tests which were not actually executed (for any reason) will
51        be given an UNRESOLVED result.
52        """
53        self.failure_count = 0
54        self.hit_max_failures = False
55
56        # Larger timeouts (one year, positive infinity) don't work on Windows.
57        one_week = 7 * 24 * 60 * 60  # days * hours * minutes * seconds
58        timeout = self.timeout or one_week
59        deadline = time.time() + timeout
60
61        self._execute(deadline)
62
63        # Mark any tests that weren't run as UNRESOLVED.
64        for test in self.tests:
65            if test.result is None:
66                test.setResult(lit10.Test.Result(lit10.Test.UNRESOLVED, '', 0.0))
67
68    # TODO(yln): as the comment says.. this is racing with the main thread waiting
69    # for results
70    def _process_result(self, test, result):
71        # Don't add any more test results after we've hit the maximum failure
72        # count.  Otherwise we're racing with the main thread, which is going
73        # to terminate the process pool soon.
74        if self.hit_max_failures:
75            return
76
77        test.setResult(result)
78
79        # Use test.isFailure() for correct XFAIL and XPASS handling
80        if test.isFailure():
81            self.failure_count += 1
82            if self.failure_count == self.max_failures:
83                self.hit_max_failures = True
84
85        self.progress_callback(test)
86
87
88class SerialRun(Run):
89    def __init__(self, tests, lit_config, progress_callback, max_failures, timeout):
90        super(SerialRun, self).__init__(tests, lit_config, progress_callback, max_failures, timeout)
91
92    def _execute(self, deadline):
93        # TODO(yln): ignores deadline
94        for test in self.tests:
95            result = lit10.worker._execute(test, self.lit_config)
96            self._process_result(test, result)
97            if self.hit_max_failures:
98                break
99
100
101class ParallelRun(Run):
102    def __init__(self, tests, lit_config, progress_callback, max_failures, timeout, workers):
103        super(ParallelRun, self).__init__(tests, lit_config, progress_callback, max_failures, timeout)
104        self.workers = workers
105
106    def _execute(self, deadline):
107        semaphores = {
108            k: NopSemaphore() if v is None else
109            multiprocessing.BoundedSemaphore(v) for k, v in
110            self.lit_config.parallelism_groups.items()}
111
112        self._increase_process_limit()
113
114        # Start a process pool. Copy over the data shared between all test runs.
115        # FIXME: Find a way to capture the worker process stderr. If the user
116        # interrupts the workers before we make it into our task callback, they
117        # will each raise a KeyboardInterrupt exception and print to stderr at
118        # the same time.
119        pool = multiprocessing.Pool(self.workers, lit10.worker.initialize,
120                                    (self.lit_config, semaphores))
121
122        self._install_win32_signal_handler(pool)
123
124        async_results = [
125            pool.apply_async(lit10.worker.execute, args=[test],
126                callback=lambda r, t=test: self._process_result(t, r))
127            for test in self.tests]
128        pool.close()
129
130        for ar in async_results:
131            timeout = deadline - time.time()
132            try:
133                ar.get(timeout)
134            except multiprocessing.TimeoutError:
135                # TODO(yln): print timeout error
136                pool.terminate()
137                break
138            if self.hit_max_failures:
139                pool.terminate()
140                break
141        pool.join()
142
143    # TODO(yln): interferes with progress bar
144    # Some tests use threads internally, and at least on Linux each of these
145    # threads counts toward the current process limit. Try to raise the (soft)
146    # process limit so that tests don't fail due to resource exhaustion.
147    def _increase_process_limit(self):
148        ncpus = lit10.util.detectCPUs()
149        desired_limit = self.workers * ncpus * 2 # the 2 is a safety factor
150
151        # Importing the resource module will likely fail on Windows.
152        try:
153            import resource
154            NPROC = resource.RLIMIT_NPROC
155
156            soft_limit, hard_limit = resource.getrlimit(NPROC)
157            desired_limit = min(desired_limit, hard_limit)
158
159            if soft_limit < desired_limit:
160                resource.setrlimit(NPROC, (desired_limit, hard_limit))
161                self.lit_config.note('Raised process limit from %d to %d' % \
162                                        (soft_limit, desired_limit))
163        except Exception as ex:
164            # Warn, unless this is Windows, in which case this is expected.
165            if os.name != 'nt':
166                self.lit_config.warning('Failed to raise process limit: %s' % ex)
167
168    def _install_win32_signal_handler(self, pool):
169        if lit10.util.win32api is not None:
170            def console_ctrl_handler(type):
171                print('\nCtrl-C detected, terminating.')
172                pool.terminate()
173                pool.join()
174                lit10.util.abort_now()
175                return True
176            lit10.util.win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)
177