# Copyright (c) 2012 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Takes care of sharding the python-drive tests in multiple devices.""" import copy import logging import multiprocessing from python_test_caller import CallPythonTest from run_java_tests import FatalTestException import sharded_tests_queue from test_result import TestResults def SetTestsContainer(tests_container): """Sets PythonTestSharder as a top-level field. PythonTestSharder uses multiprocessing.Pool, which creates a pool of processes. This is used to initialize each worker in the pool, ensuring that each worker has access to this shared pool of tests. The multiprocessing module requires that this be a top-level method. Args: tests_container: the container for all the tests. """ PythonTestSharder.tests_container = tests_container def _DefaultRunnable(test_runner): """A default runnable for a PythonTestRunner. Args: test_runner: A PythonTestRunner which will run tests. Returns: The test results. """ return test_runner.RunTests() class PythonTestRunner(object): """Thin wrapper around a list of PythonTestBase instances. This is meant to be a long-lived object which can run multiple Python tests within its lifetime. Tests will receive the device_id and shard_index. The shard index affords the ability to create unique port numbers (e.g. DEFAULT_PORT + shard_index) if the test so wishes. """ def __init__(self, options): """Constructor. Args: options: Options to use for setting up tests. """ self.options = options def RunTests(self): """Runs tests from the shared pool of tests, aggregating results. Returns: A list of test results for all of the tests which this runner executed. """ tests = PythonTestSharder.tests_container results = [] for t in tests: res = CallPythonTest(t, self.options) results.append(res) return TestResults.FromTestResults(results) class PythonTestSharder(object): """Runs Python tests in parallel on multiple devices. This is lifted more or less wholesale from BaseTestRunner. Under the covers, it creates a pool of long-lived PythonTestRunners, which execute tests from the pool of tests. Args: attached_devices: a list of device IDs attached to the host. available_tests: a list of tests to run which subclass PythonTestBase. options: Options to use for setting up tests. Returns: An aggregated list of test results. """ tests_container = None def __init__(self, attached_devices, available_tests, options): self.options = options self.attached_devices = attached_devices self.retries = options.shard_retries self.tests = available_tests def _SetupSharding(self, tests): """Creates the shared pool of tests and makes it available to test runners. Args: tests: the list of tests which will be consumed by workers. """ SetTestsContainer(sharded_tests_queue.ShardedTestsQueue( len(self.attached_devices), tests)) def RunShardedTests(self): """Runs tests in parallel using a pool of workers. Returns: A list of test results aggregated from all test runs. """ logging.warning('*' * 80) logging.warning('Sharding in ' + str(len(self.attached_devices)) + ' devices.') logging.warning('Note that the output is not synchronized.') logging.warning('Look for the "Final result" banner in the end.') logging.warning('*' * 80) all_passed = [] test_results = TestResults() tests_to_run = self.tests for retry in xrange(self.retries): logging.warning('Try %d of %d', retry + 1, self.retries) self._SetupSharding(self.tests) test_runners = self._MakeTestRunners(self.attached_devices) logging.warning('Starting...') pool = multiprocessing.Pool(len(self.attached_devices), SetTestsContainer, [PythonTestSharder.tests_container]) # List of TestResults objects from each test execution. try: results_lists = pool.map(_DefaultRunnable, test_runners) except Exception: logging.exception('Unable to run tests. Something with the ' 'PythonTestRunners has gone wrong.') raise FatalTestException('PythonTestRunners were unable to run tests.') test_results = TestResults.FromTestResults(results_lists) # Accumulate passing results. all_passed += test_results.ok # If we have failed tests, map them to tests to retry. failed_tests = test_results.GetAllBroken() tests_to_run = self._GetTestsToRetry(self.tests, failed_tests) # Bail out early if we have no more tests. This can happen if all tests # pass before we're out of retries, for example. if not tests_to_run: break final_results = TestResults() # all_passed has accumulated all passing test results. # test_results will have the results from the most recent run, which could # include a variety of failure modes (unknown, crashed, failed, etc). final_results = test_results final_results.ok = all_passed return final_results def _MakeTestRunners(self, attached_devices): """Initialize and return a list of PythonTestRunners. Args: attached_devices: list of device IDs attached to host. Returns: A list of PythonTestRunners, one for each device. """ test_runners = [] for index, device in enumerate(attached_devices): logging.warning('*' * 80) logging.warning('Creating shard %d for %s', index, device) logging.warning('*' * 80) # Bind the PythonTestRunner to a device & shard index. Give it the # runnable which it will use to actually execute the tests. test_options = copy.deepcopy(self.options) test_options.ensure_value('device_id', device) test_options.ensure_value('shard_index', index) test_runner = PythonTestRunner(test_options) test_runners.append(test_runner) return test_runners def _GetTestsToRetry(self, available_tests, failed_tests): """Infers a list of tests to retry from failed tests and available tests. Args: available_tests: a list of tests which subclass PythonTestBase. failed_tests: a list of SingleTestResults representing failed tests. Returns: A list of test objects which correspond to test names found in failed_tests, or an empty list if there is no correspondence. """ failed_test_names = map(lambda t: t.test_name, failed_tests) tests_to_retry = [t for t in available_tests if t.qualified_name in failed_test_names] return tests_to_retry