1# Copyright (c) 2012 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Takes care of sharding the python-drive tests in multiple devices.""" 6 7import copy 8import logging 9import multiprocessing 10 11from python_test_caller import CallPythonTest 12from run_java_tests import FatalTestException 13import sharded_tests_queue 14from test_result import TestResults 15 16 17def SetTestsContainer(tests_container): 18 """Sets PythonTestSharder as a top-level field. 19 20 PythonTestSharder uses multiprocessing.Pool, which creates a pool of 21 processes. This is used to initialize each worker in the pool, ensuring that 22 each worker has access to this shared pool of tests. 23 24 The multiprocessing module requires that this be a top-level method. 25 26 Args: 27 tests_container: the container for all the tests. 28 """ 29 PythonTestSharder.tests_container = tests_container 30 31 32def _DefaultRunnable(test_runner): 33 """A default runnable for a PythonTestRunner. 34 35 Args: 36 test_runner: A PythonTestRunner which will run tests. 37 38 Returns: 39 The test results. 40 """ 41 return test_runner.RunTests() 42 43 44class PythonTestRunner(object): 45 """Thin wrapper around a list of PythonTestBase instances. 46 47 This is meant to be a long-lived object which can run multiple Python tests 48 within its lifetime. Tests will receive the device_id and shard_index. 49 50 The shard index affords the ability to create unique port numbers (e.g. 51 DEFAULT_PORT + shard_index) if the test so wishes. 52 """ 53 54 def __init__(self, options): 55 """Constructor. 56 57 Args: 58 options: Options to use for setting up tests. 59 """ 60 self.options = options 61 62 def RunTests(self): 63 """Runs tests from the shared pool of tests, aggregating results. 64 65 Returns: 66 A list of test results for all of the tests which this runner executed. 67 """ 68 tests = PythonTestSharder.tests_container 69 70 results = [] 71 for t in tests: 72 res = CallPythonTest(t, self.options) 73 results.append(res) 74 75 return TestResults.FromTestResults(results) 76 77 78class PythonTestSharder(object): 79 """Runs Python tests in parallel on multiple devices. 80 81 This is lifted more or less wholesale from BaseTestRunner. 82 83 Under the covers, it creates a pool of long-lived PythonTestRunners, which 84 execute tests from the pool of tests. 85 86 Args: 87 attached_devices: a list of device IDs attached to the host. 88 available_tests: a list of tests to run which subclass PythonTestBase. 89 options: Options to use for setting up tests. 90 91 Returns: 92 An aggregated list of test results. 93 """ 94 tests_container = None 95 96 def __init__(self, attached_devices, available_tests, options): 97 self.options = options 98 self.attached_devices = attached_devices 99 self.retries = options.shard_retries 100 self.tests = available_tests 101 102 def _SetupSharding(self, tests): 103 """Creates the shared pool of tests and makes it available to test runners. 104 105 Args: 106 tests: the list of tests which will be consumed by workers. 107 """ 108 SetTestsContainer(sharded_tests_queue.ShardedTestsQueue( 109 len(self.attached_devices), tests)) 110 111 def RunShardedTests(self): 112 """Runs tests in parallel using a pool of workers. 113 114 Returns: 115 A list of test results aggregated from all test runs. 116 """ 117 logging.warning('*' * 80) 118 logging.warning('Sharding in ' + str(len(self.attached_devices)) + 119 ' devices.') 120 logging.warning('Note that the output is not synchronized.') 121 logging.warning('Look for the "Final result" banner in the end.') 122 logging.warning('*' * 80) 123 all_passed = [] 124 test_results = TestResults() 125 tests_to_run = self.tests 126 for retry in xrange(self.retries): 127 logging.warning('Try %d of %d', retry + 1, self.retries) 128 self._SetupSharding(self.tests) 129 test_runners = self._MakeTestRunners(self.attached_devices) 130 logging.warning('Starting...') 131 pool = multiprocessing.Pool(len(self.attached_devices), 132 SetTestsContainer, 133 [PythonTestSharder.tests_container]) 134 135 # List of TestResults objects from each test execution. 136 try: 137 results_lists = pool.map(_DefaultRunnable, test_runners) 138 except Exception: 139 logging.exception('Unable to run tests. Something with the ' 140 'PythonTestRunners has gone wrong.') 141 raise FatalTestException('PythonTestRunners were unable to run tests.') 142 143 test_results = TestResults.FromTestResults(results_lists) 144 # Accumulate passing results. 145 all_passed += test_results.ok 146 # If we have failed tests, map them to tests to retry. 147 failed_tests = test_results.GetAllBroken() 148 tests_to_run = self._GetTestsToRetry(self.tests, 149 failed_tests) 150 151 # Bail out early if we have no more tests. This can happen if all tests 152 # pass before we're out of retries, for example. 153 if not tests_to_run: 154 break 155 156 final_results = TestResults() 157 # all_passed has accumulated all passing test results. 158 # test_results will have the results from the most recent run, which could 159 # include a variety of failure modes (unknown, crashed, failed, etc). 160 final_results = test_results 161 final_results.ok = all_passed 162 163 return final_results 164 165 def _MakeTestRunners(self, attached_devices): 166 """Initialize and return a list of PythonTestRunners. 167 168 Args: 169 attached_devices: list of device IDs attached to host. 170 171 Returns: 172 A list of PythonTestRunners, one for each device. 173 """ 174 test_runners = [] 175 for index, device in enumerate(attached_devices): 176 logging.warning('*' * 80) 177 logging.warning('Creating shard %d for %s', index, device) 178 logging.warning('*' * 80) 179 # Bind the PythonTestRunner to a device & shard index. Give it the 180 # runnable which it will use to actually execute the tests. 181 test_options = copy.deepcopy(self.options) 182 test_options.ensure_value('device_id', device) 183 test_options.ensure_value('shard_index', index) 184 test_runner = PythonTestRunner(test_options) 185 test_runners.append(test_runner) 186 187 return test_runners 188 189 def _GetTestsToRetry(self, available_tests, failed_tests): 190 """Infers a list of tests to retry from failed tests and available tests. 191 192 Args: 193 available_tests: a list of tests which subclass PythonTestBase. 194 failed_tests: a list of SingleTestResults representing failed tests. 195 196 Returns: 197 A list of test objects which correspond to test names found in 198 failed_tests, or an empty list if there is no correspondence. 199 """ 200 failed_test_names = map(lambda t: t.test_name, failed_tests) 201 tests_to_retry = [t for t in available_tests 202 if t.qualified_name in failed_test_names] 203 return tests_to_retry 204