summaryrefslogtreecommitdiffstats
path: root/media/webrtc/trunk/build/android/pylib/python_test_sharder.py
blob: e27096d788de922a52b6d15d5b9701670b19a9ec (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# Copyright (c) 2012 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Takes care of sharding the python-drive tests in multiple devices."""

import copy
import logging
import multiprocessing

from python_test_caller import CallPythonTest
from run_java_tests import FatalTestException
import sharded_tests_queue
from test_result import TestResults


def SetTestsContainer(tests_container):
  """Sets PythonTestSharder as a top-level field.

  PythonTestSharder uses multiprocessing.Pool, which creates a pool of
  processes. This is used to initialize each worker in the pool, ensuring that
  each worker has access to this shared pool of tests.

  The multiprocessing module requires that this be a top-level method.

  Args:
    tests_container: the container for all the tests.
  """
  PythonTestSharder.tests_container = tests_container


def _DefaultRunnable(test_runner):
  """A default runnable for a PythonTestRunner.

  Args:
    test_runner: A PythonTestRunner which will run tests.

  Returns:
    The test results.
  """
  return test_runner.RunTests()


class PythonTestRunner(object):
  """Thin wrapper around a list of PythonTestBase instances.

  This is meant to be a long-lived object which can run multiple Python tests
  within its lifetime. Tests will receive the device_id and shard_index.

  The shard index affords the ability to create unique port numbers (e.g.
  DEFAULT_PORT + shard_index) if the test so wishes.
  """

  def __init__(self, options):
    """Constructor.

    Args:
      options: Options to use for setting up tests.
    """
    self.options = options

  def RunTests(self):
    """Runs tests from the shared pool of tests, aggregating results.

    Returns:
      A list of test results for all of the tests which this runner executed.
    """
    tests = PythonTestSharder.tests_container

    results = []
    for t in tests:
      res = CallPythonTest(t, self.options)
      results.append(res)

    return TestResults.FromTestResults(results)


class PythonTestSharder(object):
  """Runs Python tests in parallel on multiple devices.

  This is lifted more or less wholesale from BaseTestRunner.

  Under the covers, it creates a pool of long-lived PythonTestRunners, which
  execute tests from the pool of tests.

  Args:
    attached_devices: a list of device IDs attached to the host.
    available_tests: a list of tests to run which subclass PythonTestBase.
    options: Options to use for setting up tests.

  Returns:
    An aggregated list of test results.
  """
  tests_container = None

  def __init__(self, attached_devices, available_tests, options):
    self.options = options
    self.attached_devices = attached_devices
    self.retries = options.shard_retries
    self.tests = available_tests

  def _SetupSharding(self, tests):
    """Creates the shared pool of tests and makes it available to test runners.

    Args:
      tests: the list of tests which will be consumed by workers.
    """
    SetTestsContainer(sharded_tests_queue.ShardedTestsQueue(
        len(self.attached_devices), tests))

  def RunShardedTests(self):
    """Runs tests in parallel using a pool of workers.

    Returns:
      A list of test results aggregated from all test runs.
    """
    logging.warning('*' * 80)
    logging.warning('Sharding in ' + str(len(self.attached_devices)) +
                    ' devices.')
    logging.warning('Note that the output is not synchronized.')
    logging.warning('Look for the "Final result" banner in the end.')
    logging.warning('*' * 80)
    all_passed = []
    test_results = TestResults()
    tests_to_run = self.tests
    for retry in xrange(self.retries):
      logging.warning('Try %d of %d', retry + 1, self.retries)
      self._SetupSharding(self.tests)
      test_runners = self._MakeTestRunners(self.attached_devices)
      logging.warning('Starting...')
      pool = multiprocessing.Pool(len(self.attached_devices),
                                  SetTestsContainer,
                                  [PythonTestSharder.tests_container])

      # List of TestResults objects from each test execution.
      try:
        results_lists = pool.map(_DefaultRunnable, test_runners)
      except Exception:
        logging.exception('Unable to run tests. Something with the '
                          'PythonTestRunners has gone wrong.')
        raise FatalTestException('PythonTestRunners were unable to run tests.')

      test_results = TestResults.FromTestResults(results_lists)
      # Accumulate passing results.
      all_passed += test_results.ok
      # If we have failed tests, map them to tests to retry.
      failed_tests = test_results.GetAllBroken()
      tests_to_run = self._GetTestsToRetry(self.tests,
                                           failed_tests)

      # Bail out early if we have no more tests. This can happen if all tests
      # pass before we're out of retries, for example.
      if not tests_to_run:
        break

    final_results = TestResults()
    # all_passed has accumulated all passing test results.
    # test_results will have the results from the most recent run, which could
    # include a variety of failure modes (unknown, crashed, failed, etc).
    final_results = test_results
    final_results.ok = all_passed

    return final_results

  def _MakeTestRunners(self, attached_devices):
    """Initialize and return a list of PythonTestRunners.

    Args:
      attached_devices: list of device IDs attached to host.

    Returns:
      A list of PythonTestRunners, one for each device.
    """
    test_runners = []
    for index, device in enumerate(attached_devices):
      logging.warning('*' * 80)
      logging.warning('Creating shard %d for %s', index, device)
      logging.warning('*' * 80)
      # Bind the PythonTestRunner to a device & shard index. Give it the
      # runnable which it will use to actually execute the tests.
      test_options = copy.deepcopy(self.options)
      test_options.ensure_value('device_id', device)
      test_options.ensure_value('shard_index', index)
      test_runner = PythonTestRunner(test_options)
      test_runners.append(test_runner)

    return test_runners

  def _GetTestsToRetry(self, available_tests, failed_tests):
    """Infers a list of tests to retry from failed tests and available tests.

    Args:
      available_tests: a list of tests which subclass PythonTestBase.
      failed_tests: a list of SingleTestResults representing failed tests.

    Returns:
      A list of test objects which correspond to test names found in
      failed_tests, or an empty list if there is no correspondence.
    """
    failed_test_names = map(lambda t: t.test_name, failed_tests)
    tests_to_retry = [t for t in available_tests
                      if t.qualified_name in failed_test_names]
    return tests_to_retry