1# Copyright (c) 2012 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Takes care of sharding the python-drive tests in multiple devices."""
6
7import copy
8import logging
9import multiprocessing
10
11from python_test_caller import CallPythonTest
12from run_java_tests import FatalTestException
13import sharded_tests_queue
14from test_result import TestResults
15
16
17def SetTestsContainer(tests_container):
18  """Sets PythonTestSharder as a top-level field.
19
20  PythonTestSharder uses multiprocessing.Pool, which creates a pool of
21  processes. This is used to initialize each worker in the pool, ensuring that
22  each worker has access to this shared pool of tests.
23
24  The multiprocessing module requires that this be a top-level method.
25
26  Args:
27    tests_container: the container for all the tests.
28  """
29  PythonTestSharder.tests_container = tests_container
30
31
32def _DefaultRunnable(test_runner):
33  """A default runnable for a PythonTestRunner.
34
35  Args:
36    test_runner: A PythonTestRunner which will run tests.
37
38  Returns:
39    The test results.
40  """
41  return test_runner.RunTests()
42
43
44class PythonTestRunner(object):
45  """Thin wrapper around a list of PythonTestBase instances.
46
47  This is meant to be a long-lived object which can run multiple Python tests
48  within its lifetime. Tests will receive the device_id and shard_index.
49
50  The shard index affords the ability to create unique port numbers (e.g.
51  DEFAULT_PORT + shard_index) if the test so wishes.
52  """
53
54  def __init__(self, options):
55    """Constructor.
56
57    Args:
58      options: Options to use for setting up tests.
59    """
60    self.options = options
61
62  def RunTests(self):
63    """Runs tests from the shared pool of tests, aggregating results.
64
65    Returns:
66      A list of test results for all of the tests which this runner executed.
67    """
68    tests = PythonTestSharder.tests_container
69
70    results = []
71    for t in tests:
72      res = CallPythonTest(t, self.options)
73      results.append(res)
74
75    return TestResults.FromTestResults(results)
76
77
78class PythonTestSharder(object):
79  """Runs Python tests in parallel on multiple devices.
80
81  This is lifted more or less wholesale from BaseTestRunner.
82
83  Under the covers, it creates a pool of long-lived PythonTestRunners, which
84  execute tests from the pool of tests.
85
86  Args:
87    attached_devices: a list of device IDs attached to the host.
88    available_tests: a list of tests to run which subclass PythonTestBase.
89    options: Options to use for setting up tests.
90
91  Returns:
92    An aggregated list of test results.
93  """
94  tests_container = None
95
96  def __init__(self, attached_devices, available_tests, options):
97    self.options = options
98    self.attached_devices = attached_devices
99    self.retries = options.shard_retries
100    self.tests = available_tests
101
102  def _SetupSharding(self, tests):
103    """Creates the shared pool of tests and makes it available to test runners.
104
105    Args:
106      tests: the list of tests which will be consumed by workers.
107    """
108    SetTestsContainer(sharded_tests_queue.ShardedTestsQueue(
109        len(self.attached_devices), tests))
110
111  def RunShardedTests(self):
112    """Runs tests in parallel using a pool of workers.
113
114    Returns:
115      A list of test results aggregated from all test runs.
116    """
117    logging.warning('*' * 80)
118    logging.warning('Sharding in ' + str(len(self.attached_devices)) +
119                    ' devices.')
120    logging.warning('Note that the output is not synchronized.')
121    logging.warning('Look for the "Final result" banner in the end.')
122    logging.warning('*' * 80)
123    all_passed = []
124    test_results = TestResults()
125    tests_to_run = self.tests
126    for retry in xrange(self.retries):
127      logging.warning('Try %d of %d', retry + 1, self.retries)
128      self._SetupSharding(self.tests)
129      test_runners = self._MakeTestRunners(self.attached_devices)
130      logging.warning('Starting...')
131      pool = multiprocessing.Pool(len(self.attached_devices),
132                                  SetTestsContainer,
133                                  [PythonTestSharder.tests_container])
134
135      # List of TestResults objects from each test execution.
136      try:
137        results_lists = pool.map(_DefaultRunnable, test_runners)
138      except Exception:
139        logging.exception('Unable to run tests. Something with the '
140                          'PythonTestRunners has gone wrong.')
141        raise FatalTestException('PythonTestRunners were unable to run tests.')
142
143      test_results = TestResults.FromTestResults(results_lists)
144      # Accumulate passing results.
145      all_passed += test_results.ok
146      # If we have failed tests, map them to tests to retry.
147      failed_tests = test_results.GetAllBroken()
148      tests_to_run = self._GetTestsToRetry(self.tests,
149                                           failed_tests)
150
151      # Bail out early if we have no more tests. This can happen if all tests
152      # pass before we're out of retries, for example.
153      if not tests_to_run:
154        break
155
156    final_results = TestResults()
157    # all_passed has accumulated all passing test results.
158    # test_results will have the results from the most recent run, which could
159    # include a variety of failure modes (unknown, crashed, failed, etc).
160    final_results = test_results
161    final_results.ok = all_passed
162
163    return final_results
164
165  def _MakeTestRunners(self, attached_devices):
166    """Initialize and return a list of PythonTestRunners.
167
168    Args:
169      attached_devices: list of device IDs attached to host.
170
171    Returns:
172      A list of PythonTestRunners, one for each device.
173    """
174    test_runners = []
175    for index, device in enumerate(attached_devices):
176      logging.warning('*' * 80)
177      logging.warning('Creating shard %d for %s', index, device)
178      logging.warning('*' * 80)
179      # Bind the PythonTestRunner to a device & shard index. Give it the
180      # runnable which it will use to actually execute the tests.
181      test_options = copy.deepcopy(self.options)
182      test_options.ensure_value('device_id', device)
183      test_options.ensure_value('shard_index', index)
184      test_runner = PythonTestRunner(test_options)
185      test_runners.append(test_runner)
186
187    return test_runners
188
189  def _GetTestsToRetry(self, available_tests, failed_tests):
190    """Infers a list of tests to retry from failed tests and available tests.
191
192    Args:
193      available_tests: a list of tests which subclass PythonTestBase.
194      failed_tests: a list of SingleTestResults representing failed tests.
195
196    Returns:
197      A list of test objects which correspond to test names found in
198      failed_tests, or an empty list if there is no correspondence.
199    """
200    failed_test_names = map(lambda t: t.test_name, failed_tests)
201    tests_to_retry = [t for t in available_tests
202                      if t.qualified_name in failed_test_names]
203    return tests_to_retry
204