diff options
Diffstat (limited to 'testing/web-platform/harness/wptrunner/testrunner.py')
-rw-r--r-- | testing/web-platform/harness/wptrunner/testrunner.py | 667 |
1 files changed, 667 insertions, 0 deletions
diff --git a/testing/web-platform/harness/wptrunner/testrunner.py b/testing/web-platform/harness/wptrunner/testrunner.py new file mode 100644 index 000000000..77d2a8850 --- /dev/null +++ b/testing/web-platform/harness/wptrunner/testrunner.py @@ -0,0 +1,667 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +from __future__ import unicode_literals + +import multiprocessing +import sys +import threading +import traceback +from Queue import Empty +from multiprocessing import Process, current_process, Queue + +from mozlog import structuredlog + +# Special value used as a sentinal in various commands +Stop = object() + + +class MessageLogger(object): + def __init__(self, message_func): + self.send_message = message_func + + def _log_data(self, action, **kwargs): + self.send_message("log", action, kwargs) + + def process_output(self, process, data, command): + self._log_data("process_output", process=process, data=data, command=command) + + +def _log_func(level_name): + def log(self, message): + self._log_data(level_name.lower(), message=message) + log.__doc__ = """Log a message with level %s + +:param message: The string message to log +""" % level_name + log.__name__ = str(level_name).lower() + return log + +# Create all the methods on StructuredLog for debug levels +for level_name in structuredlog.log_levels: + setattr(MessageLogger, level_name.lower(), _log_func(level_name)) + + +class TestRunner(object): + def __init__(self, test_queue, command_queue, result_queue, executor): + """Class implementing the main loop for running tests. + + This class delegates the job of actually running a test to the executor + that is passed in. + + :param test_queue: subprocess.Queue containing the tests to run + :param command_queue: subprocess.Queue used to send commands to the + process + :param result_queue: subprocess.Queue used to send results to the + parent TestManager process + :param executor: TestExecutor object that will actually run a test. + """ + self.test_queue = test_queue + self.command_queue = command_queue + self.result_queue = result_queue + + self.executor = executor + self.name = current_process().name + self.logger = MessageLogger(self.send_message) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.teardown() + + def setup(self): + self.executor.setup(self) + + def teardown(self): + self.executor.teardown() + self.send_message("runner_teardown") + self.result_queue = None + self.command_queue = None + self.browser = None + + def run(self): + """Main loop accepting commands over the pipe and triggering + the associated methods""" + self.setup() + commands = {"run_test": self.run_test, + "stop": self.stop, + "wait": self.wait} + while True: + command, args = self.command_queue.get() + try: + rv = commands[command](*args) + except Exception: + self.send_message("error", + "Error running command %s with arguments %r:\n%s" % + (command, args, traceback.format_exc())) + else: + if rv is Stop: + break + + def stop(self): + return Stop + + def run_test(self): + if not self.executor.is_alive(): + self.send_message("restart_runner") + return + try: + # Need to block here just to allow for contention with other processes + test = self.test_queue.get(block=True, timeout=1) + except Empty: + # If we are running tests in groups (e.g. by-dir) then this queue might be + # empty but there could be other test queues. restart_runner won't actually + # start the runner if there aren't any more tests to run + self.send_message("restart_runner") + return + else: + self.send_message("test_start", test) + try: + return self.executor.run_test(test) + except Exception: + self.logger.critical(traceback.format_exc()) + raise + + def wait(self): + self.executor.protocol.wait() + self.send_message("after_test_ended", True) + + def send_message(self, command, *args): + self.result_queue.put((command, args)) + + +def start_runner(test_queue, runner_command_queue, runner_result_queue, + executor_cls, executor_kwargs, + executor_browser_cls, executor_browser_kwargs, + stop_flag): + """Launch a TestRunner in a new process""" + try: + browser = executor_browser_cls(**executor_browser_kwargs) + executor = executor_cls(browser, **executor_kwargs) + with TestRunner(test_queue, runner_command_queue, runner_result_queue, executor) as runner: + try: + runner.run() + except KeyboardInterrupt: + stop_flag.set() + except Exception: + runner_result_queue.put(("log", ("critical", {"message": traceback.format_exc()}))) + print >> sys.stderr, traceback.format_exc() + stop_flag.set() + finally: + runner_command_queue = None + runner_result_queue = None + + +manager_count = 0 + + +def next_manager_number(): + global manager_count + local = manager_count = manager_count + 1 + return local + + +class TestRunnerManager(threading.Thread): + init_lock = threading.Lock() + + def __init__(self, suite_name, test_queue, test_source_cls, browser_cls, browser_kwargs, + executor_cls, executor_kwargs, stop_flag, pause_after_test=False, + pause_on_unexpected=False, debug_info=None): + """Thread that owns a single TestRunner process and any processes required + by the TestRunner (e.g. the Firefox binary). + + TestRunnerManagers are responsible for launching the browser process and the + runner process, and for logging the test progress. The actual test running + is done by the TestRunner. In particular they: + + * Start the binary of the program under test + * Start the TestRunner + * Tell the TestRunner to start a test, if any + * Log that the test started + * Log the test results + * Take any remedial action required e.g. restart crashed or hung + processes + """ + self.suite_name = suite_name + + self.test_queue = test_queue + self.test_source_cls = test_source_cls + + self.browser_cls = browser_cls + self.browser_kwargs = browser_kwargs + + self.executor_cls = executor_cls + self.executor_kwargs = executor_kwargs + + self.test_source = None + + self.browser = None + self.browser_pid = None + self.browser_started = False + + # Flags used to shut down this thread if we get a sigint + self.parent_stop_flag = stop_flag + self.child_stop_flag = multiprocessing.Event() + + self.pause_after_test = pause_after_test + self.pause_on_unexpected = pause_on_unexpected + self.debug_info = debug_info + + self.manager_number = next_manager_number() + + self.command_queue = Queue() + self.remote_queue = Queue() + + self.test_runner_proc = None + + threading.Thread.__init__(self, name="Thread-TestrunnerManager-%i" % self.manager_number) + # This is started in the actual new thread + self.logger = None + + # The test that is currently running + self.test = None + + self.unexpected_count = 0 + + # This may not really be what we want + self.daemon = True + + self.init_fail_count = 0 + self.max_init_fails = 5 + self.init_timer = None + + self.restart_count = 0 + self.max_restarts = 5 + + def run(self): + """Main loop for the TestManager. + + TestManagers generally receive commands from their + TestRunner updating them on the status of a test. They + may also have a stop flag set by the main thread indicating + that the manager should shut down the next time the event loop + spins.""" + self.logger = structuredlog.StructuredLogger(self.suite_name) + with self.browser_cls(self.logger, **self.browser_kwargs) as browser, self.test_source_cls(self.test_queue) as test_source: + self.browser = browser + self.test_source = test_source + try: + if self.init() is Stop: + return + while True: + commands = {"init_succeeded": self.init_succeeded, + "init_failed": self.init_failed, + "test_start": self.test_start, + "test_ended": self.test_ended, + "after_test_ended": self.after_test_ended, + "restart_runner": self.restart_runner, + "runner_teardown": self.runner_teardown, + "log": self.log, + "error": self.error} + try: + command, data = self.command_queue.get(True, 1) + except IOError: + if not self.should_stop(): + self.logger.error("Got IOError from poll") + self.restart_count += 1 + if self.restart_runner() is Stop: + break + except Empty: + command = None + + if self.should_stop(): + self.logger.debug("A flag was set; stopping") + break + + if command is not None: + self.restart_count = 0 + if commands[command](*data) is Stop: + break + else: + if (self.debug_info and self.debug_info.interactive and + self.browser_started and not browser.is_alive()): + self.logger.debug("Debugger exited") + break + if not self.test_runner_proc.is_alive(): + if not self.command_queue.empty(): + # We got a new message so process that + continue + + # If we got to here the runner presumably shut down + # unexpectedly + self.logger.info("Test runner process shut down") + + if self.test is not None: + # This could happen if the test runner crashed for some other + # reason + # Need to consider the unlikely case where one test causes the + # runner process to repeatedly die + self.logger.critical("Last test did not complete") + break + self.logger.warning( + "More tests found, but runner process died, restarting") + self.restart_count += 1 + if self.restart_runner() is Stop: + break + finally: + self.logger.debug("TestRunnerManager main loop terminating, starting cleanup") + self.stop_runner() + self.teardown() + self.logger.debug("TestRunnerManager main loop terminated") + + def should_stop(self): + return self.child_stop_flag.is_set() or self.parent_stop_flag.is_set() + + def init(self): + """Launch the browser that is being tested, + and the TestRunner process that will run the tests.""" + # It seems that this lock is helpful to prevent some race that otherwise + # sometimes stops the spawned processes initalising correctly, and + # leaves this thread hung + if self.init_timer is not None: + self.init_timer.cancel() + + self.logger.debug("Init called, starting browser and runner") + + def init_failed(): + # This is called from a seperate thread, so we send a message to the + # main loop so we get back onto the manager thread + self.logger.debug("init_failed called from timer") + if self.command_queue: + self.command_queue.put(("init_failed", ())) + else: + self.logger.debug("Setting child stop flag in init_failed") + self.child_stop_flag.set() + + with self.init_lock: + # Guard against problems initialising the browser or the browser + # remote control method + if self.debug_info is None: + self.init_timer = threading.Timer(self.browser.init_timeout, init_failed) + + test_queue = self.test_source.get_queue() + if test_queue is None: + self.logger.info("No more tests") + return Stop + + try: + if self.init_timer is not None: + self.init_timer.start() + self.browser.start() + self.browser_pid = self.browser.pid() + self.start_test_runner(test_queue) + except: + self.logger.warning("Failure during init %s" % traceback.format_exc()) + if self.init_timer is not None: + self.init_timer.cancel() + self.logger.error(traceback.format_exc()) + succeeded = False + else: + succeeded = True + self.browser_started = True + + # This has to happen after the lock is released + if not succeeded: + self.init_failed() + + def init_succeeded(self): + """Callback when we have started the browser, started the remote + control connection, and we are ready to start testing.""" + self.logger.debug("Init succeeded") + if self.init_timer is not None: + self.init_timer.cancel() + self.init_fail_count = 0 + self.start_next_test() + + def init_failed(self): + """Callback when starting the browser or the remote control connect + fails.""" + self.init_fail_count += 1 + self.logger.warning("Init failed %i" % self.init_fail_count) + if self.init_timer is not None: + self.init_timer.cancel() + if self.init_fail_count < self.max_init_fails: + self.restart_runner() + else: + self.logger.critical("Test runner failed to initialise correctly; shutting down") + return Stop + + def start_test_runner(self, test_queue): + # Note that we need to be careful to start the browser before the + # test runner to ensure that any state set when the browser is started + # can be passed in to the test runner. + assert self.command_queue is not None + assert self.remote_queue is not None + self.logger.info("Starting runner") + executor_browser_cls, executor_browser_kwargs = self.browser.executor_browser() + + args = (test_queue, + self.remote_queue, + self.command_queue, + self.executor_cls, + self.executor_kwargs, + executor_browser_cls, + executor_browser_kwargs, + self.child_stop_flag) + self.test_runner_proc = Process(target=start_runner, + args=args, + name="Thread-TestRunner-%i" % self.manager_number) + self.test_runner_proc.start() + self.logger.debug("Test runner started") + + def send_message(self, command, *args): + self.remote_queue.put((command, args)) + + def cleanup(self): + if self.init_timer is not None: + self.init_timer.cancel() + self.logger.debug("TestManager cleanup") + + while True: + try: + self.logger.warning(" ".join(map(repr, self.command_queue.get_nowait()))) + except Empty: + break + + while True: + try: + self.logger.warning(" ".join(map(repr, self.remote_queue.get_nowait()))) + except Empty: + break + + def teardown(self): + self.logger.debug("teardown in testrunnermanager") + self.test_runner_proc = None + self.command_queue.close() + self.remote_queue.close() + self.command_queue = None + self.remote_queue = None + + def ensure_runner_stopped(self): + if self.test_runner_proc is None: + return + + self.test_runner_proc.join(10) + if self.test_runner_proc.is_alive(): + # This might leak a file handle from the queue + self.logger.warning("Forcibly terminating runner process") + self.test_runner_proc.terminate() + self.test_runner_proc.join(10) + else: + self.logger.debug("Testrunner exited with code %i" % self.test_runner_proc.exitcode) + + def runner_teardown(self): + self.ensure_runner_stopped() + return Stop + + def stop_runner(self): + """Stop the TestRunner and the Firefox binary.""" + self.logger.debug("Stopping runner") + if self.test_runner_proc is None: + return + try: + self.browser.stop() + self.browser_started = False + if self.test_runner_proc.is_alive(): + self.send_message("stop") + self.ensure_runner_stopped() + finally: + self.cleanup() + + def start_next_test(self): + self.send_message("run_test") + + def test_start(self, test): + self.test = test + self.logger.test_start(test.id) + + def test_ended(self, test, results): + """Handle the end of a test. + + Output the result of each subtest, and the result of the overall + harness to the logs. + """ + assert test == self.test + # Write the result of each subtest + file_result, test_results = results + subtest_unexpected = False + for result in test_results: + if test.disabled(result.name): + continue + expected = test.expected(result.name) + is_unexpected = expected != result.status + + if is_unexpected: + self.unexpected_count += 1 + self.logger.debug("Unexpected count in this thread %i" % self.unexpected_count) + subtest_unexpected = True + self.logger.test_status(test.id, + result.name, + result.status, + message=result.message, + expected=expected, + stack=result.stack) + + # TODO: consider changing result if there is a crash dump file + + # Write the result of the test harness + expected = test.expected() + status = file_result.status if file_result.status != "EXTERNAL-TIMEOUT" else "TIMEOUT" + is_unexpected = expected != status + if is_unexpected: + self.unexpected_count += 1 + self.logger.debug("Unexpected count in this thread %i" % self.unexpected_count) + if status == "CRASH": + self.browser.log_crash(process=self.browser_pid, test=test.id) + + self.logger.test_end(test.id, + status, + message=file_result.message, + expected=expected, + extra=file_result.extra) + + self.test = None + + restart_before_next = (test.restart_after or + file_result.status in ("CRASH", "EXTERNAL-TIMEOUT") or + subtest_unexpected or is_unexpected) + + if (self.pause_after_test or + (self.pause_on_unexpected and (subtest_unexpected or is_unexpected))): + self.logger.info("Pausing until the browser exits") + self.send_message("wait") + else: + self.after_test_ended(restart_before_next) + + def after_test_ended(self, restart_before_next): + # Handle starting the next test, with a runner restart if required + if restart_before_next: + return self.restart_runner() + else: + return self.start_next_test() + + def restart_runner(self): + """Stop and restart the TestRunner""" + if self.restart_count >= self.max_restarts: + return Stop + self.stop_runner() + return self.init() + + def log(self, action, kwargs): + getattr(self.logger, action)(**kwargs) + + def error(self, message): + self.logger.error(message) + self.restart_runner() + + +class TestQueue(object): + def __init__(self, test_source_cls, test_type, tests, **kwargs): + self.queue = None + self.test_source_cls = test_source_cls + self.test_type = test_type + self.tests = tests + self.kwargs = kwargs + + def __enter__(self): + if not self.tests[self.test_type]: + return None + + self.queue = Queue() + has_tests = self.test_source_cls.queue_tests(self.queue, + self.test_type, + self.tests, + **self.kwargs) + # There is a race condition that means sometimes we continue + # before the tests have been written to the underlying pipe. + # Polling the pipe for data here avoids that + self.queue._reader.poll(10) + assert not self.queue.empty() + return self.queue + + def __exit__(self, *args, **kwargs): + if self.queue is not None: + self.queue.close() + self.queue = None + + +class ManagerGroup(object): + def __init__(self, suite_name, size, test_source_cls, test_source_kwargs, + browser_cls, browser_kwargs, + executor_cls, executor_kwargs, + pause_after_test=False, + pause_on_unexpected=False, + debug_info=None): + """Main thread object that owns all the TestManager threads.""" + self.suite_name = suite_name + self.size = size + self.test_source_cls = test_source_cls + self.test_source_kwargs = test_source_kwargs + self.browser_cls = browser_cls + self.browser_kwargs = browser_kwargs + self.executor_cls = executor_cls + self.executor_kwargs = executor_kwargs + self.pause_after_test = pause_after_test + self.pause_on_unexpected = pause_on_unexpected + self.debug_info = debug_info + + self.pool = set() + # Event that is polled by threads so that they can gracefully exit in the face + # of sigint + self.stop_flag = threading.Event() + self.logger = structuredlog.StructuredLogger(suite_name) + self.test_queue = None + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.stop() + + def run(self, test_type, tests): + """Start all managers in the group""" + self.logger.debug("Using %i processes" % self.size) + + self.test_queue = TestQueue(self.test_source_cls, + test_type, + tests, + **self.test_source_kwargs) + with self.test_queue as test_queue: + if test_queue is None: + self.logger.info("No %s tests to run" % test_type) + return + for _ in range(self.size): + manager = TestRunnerManager(self.suite_name, + test_queue, + self.test_source_cls, + self.browser_cls, + self.browser_kwargs, + self.executor_cls, + self.executor_kwargs, + self.stop_flag, + self.pause_after_test, + self.pause_on_unexpected, + self.debug_info) + manager.start() + self.pool.add(manager) + self.wait() + + def is_alive(self): + """Boolean indicating whether any manager in the group is still alive""" + return any(manager.is_alive() for manager in self.pool) + + def wait(self): + """Wait for all the managers in the group to finish""" + for item in self.pool: + item.join() + + def stop(self): + """Set the stop flag so that all managers in the group stop as soon + as possible""" + self.stop_flag.set() + self.logger.debug("Stop flag set in ManagerGroup") + + def unexpected_count(self): + return sum(item.unexpected_count for item in self.pool) |