1import multiprocessing
2import os
3import time
4
5import lit.Test
6import lit.util
7import lit.worker
8
9
10class MaxFailuresError(Exception):
11    pass
12class TimeoutError(Exception):
13    pass
14
15
16class Run(object):
17    """A concrete, configured testing run."""
18
19    def __init__(self, tests, lit_config, workers, progress_callback,
20                 max_failures, timeout):
21        self.tests = tests
22        self.lit_config = lit_config
23        self.workers = workers
24        self.progress_callback = progress_callback
25        self.max_failures = max_failures
26        self.timeout = timeout
27        assert workers > 0
28
29    def execute(self):
30        """
31        Execute the tests in the run using up to the specified number of
32        parallel tasks, and inform the caller of each individual result. The
33        provided tests should be a subset of the tests available in this run
34        object.
35
36        The progress_callback will be invoked for each completed test.
37
38        If timeout is non-None, it should be a time in seconds after which to
39        stop executing tests.
40
41        Returns the elapsed testing time.
42
43        Upon completion, each test in the run will have its result
44        computed. Tests which were not actually executed (for any reason) will
45        be marked SKIPPED.
46        """
47        self.failures = 0
48
49        # Larger timeouts (one year, positive infinity) don't work on Windows.
50        one_week = 7 * 24 * 60 * 60  # days * hours * minutes * seconds
51        timeout = self.timeout or one_week
52        deadline = time.time() + timeout
53
54        try:
55            self._execute(deadline)
56        finally:
57            skipped = lit.Test.Result(lit.Test.SKIPPED)
58            for test in self.tests:
59                if test.result is None:
60                    test.setResult(skipped)
61
62    def _execute(self, deadline):
63        self._increase_process_limit()
64
65        semaphores = {k: multiprocessing.BoundedSemaphore(v)
66                      for k, v in self.lit_config.parallelism_groups.items()
67                      if v is not None}
68
69        pool = multiprocessing.Pool(self.workers, lit.worker.initialize,
70                                    (self.lit_config, semaphores))
71
72        async_results = [
73            pool.apply_async(lit.worker.execute, args=[test],
74                             callback=self.progress_callback)
75            for test in self.tests]
76        pool.close()
77
78        try:
79            self._wait_for(async_results, deadline)
80        except:
81            pool.terminate()
82            raise
83        finally:
84            pool.join()
85
86    def _wait_for(self, async_results, deadline):
87        timeout = deadline - time.time()
88        for idx, ar in enumerate(async_results):
89            try:
90                test = ar.get(timeout)
91            except multiprocessing.TimeoutError:
92                raise TimeoutError()
93            else:
94                self._update_test(self.tests[idx], test)
95                if test.isFailure():
96                    self.failures += 1
97                    if self.failures == self.max_failures:
98                        raise MaxFailuresError()
99
100    # Update local test object "in place" from remote test object.  This
101    # ensures that the original test object which is used for printing test
102    # results reflects the changes.
103    def _update_test(self, local_test, remote_test):
104        # Needed for getMissingRequiredFeatures()
105        local_test.requires = remote_test.requires
106        local_test.result = remote_test.result
107
108    # TODO(yln): interferes with progress bar
109    # Some tests use threads internally, and at least on Linux each of these
110    # threads counts toward the current process limit. Try to raise the (soft)
111    # process limit so that tests don't fail due to resource exhaustion.
112    def _increase_process_limit(self):
113        ncpus = lit.util.detectCPUs()
114        desired_limit = self.workers * ncpus * 2 # the 2 is a safety factor
115
116        # Importing the resource module will likely fail on Windows.
117        try:
118            import resource
119            NPROC = resource.RLIMIT_NPROC
120
121            soft_limit, hard_limit = resource.getrlimit(NPROC)
122            desired_limit = min(desired_limit, hard_limit)
123
124            if soft_limit < desired_limit:
125                resource.setrlimit(NPROC, (desired_limit, hard_limit))
126                self.lit_config.note('Raised process limit from %d to %d' % \
127                                        (soft_limit, desired_limit))
128        except Exception as ex:
129            # Warn, unless this is Windows, in which case this is expected.
130            if os.name != 'nt':
131                self.lit_config.warning('Failed to raise process limit: %s' % ex)
132