diff options
Diffstat (limited to 'testing/mozbase/moztest/moztest/results.py')
-rw-r--r-- | testing/mozbase/moztest/moztest/results.py | 323 |
1 files changed, 323 insertions, 0 deletions
diff --git a/testing/mozbase/moztest/moztest/results.py b/testing/mozbase/moztest/moztest/results.py new file mode 100644 index 000000000..435665c67 --- /dev/null +++ b/testing/mozbase/moztest/moztest/results.py @@ -0,0 +1,323 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this file, +# You can obtain one at http://mozilla.org/MPL/2.0/. + + +import time +import os +import mozinfo + + +class TestContext(object): + """ Stores context data about the test """ + + attrs = ['hostname', 'arch', 'env', 'os', 'os_version', 'tree', 'revision', + 'product', 'logfile', 'testgroup', 'harness', 'buildtype'] + + def __init__(self, hostname='localhost', tree='', revision='', product='', + logfile=None, arch='', operating_system='', testgroup='', + harness='moztest', buildtype=''): + self.hostname = hostname + self.arch = arch or mozinfo.processor + self.env = os.environ.copy() + self.os = operating_system or mozinfo.os + self.os_version = mozinfo.version + self.tree = tree + self.revision = revision + self.product = product + self.logfile = logfile + self.testgroup = testgroup + self.harness = harness + self.buildtype = buildtype + + def __str__(self): + return '%s (%s, %s)' % (self.hostname, self.os, self.arch) + + def __repr__(self): + return '<%s>' % self.__str__() + + def __eq__(self, other): + if not isinstance(other, TestContext): + return False + diffs = [a for a in self.attrs if getattr(self, a) != getattr(other, a)] + return len(diffs) == 0 + + def __hash__(self): + def get(attr): + value = getattr(self, attr) + if isinstance(value, dict): + value = frozenset(value.items()) + return value + return hash(frozenset([get(a) for a in self.attrs])) + + +class TestResult(object): + """ Stores test result data """ + + FAIL_RESULTS = [ + 'UNEXPECTED-PASS', + 'UNEXPECTED-FAIL', + 'ERROR', + ] + COMPUTED_RESULTS = FAIL_RESULTS + [ + 'PASS', + 'KNOWN-FAIL', + 'SKIPPED', + ] + POSSIBLE_RESULTS = [ + 'PASS', + 'FAIL', + 'SKIP', + 'ERROR', + ] + + def __init__(self, name, test_class='', time_start=None, context=None, + result_expected='PASS'): + """ Create a TestResult instance. + name = name of the test that is running + test_class = the class that the test belongs to + time_start = timestamp (seconds since UNIX epoch) of when the test started + running; if not provided, defaults to the current time + ! Provide 0 if you only have the duration + context = TestContext instance; can be None + result_expected = string representing the expected outcome of the test""" + + msg = "Result '%s' not in possible results: %s" %\ + (result_expected, ', '.join(self.POSSIBLE_RESULTS)) + assert isinstance(name, basestring), "name has to be a string" + assert result_expected in self.POSSIBLE_RESULTS, msg + + self.name = name + self.test_class = test_class + self.context = context + self.time_start = time_start if time_start is not None else time.time() + self.time_end = None + self._result_expected = result_expected + self._result_actual = None + self.result = None + self.filename = None + self.description = None + self.output = [] + self.reason = None + + @property + def test_name(self): + return '%s.py %s.%s' % (self.test_class.split('.')[0], + self.test_class, + self.name) + + def __str__(self): + return '%s | %s (%s) | %s' % (self.result or 'PENDING', + self.name, self.test_class, self.reason) + + def __repr__(self): + return '<%s>' % self.__str__() + + def calculate_result(self, expected, actual): + if actual == 'ERROR': + return 'ERROR' + if actual == 'SKIP': + return 'SKIPPED' + + if expected == 'PASS': + if actual == 'PASS': + return 'PASS' + if actual == 'FAIL': + return 'UNEXPECTED-FAIL' + + if expected == 'FAIL': + if actual == 'PASS': + return 'UNEXPECTED-PASS' + if actual == 'FAIL': + return 'KNOWN-FAIL' + + # if actual is skip or error, we return at the beginning, so if we get + # here it is definitely some kind of error + return 'ERROR' + + def infer_results(self, computed_result): + assert computed_result in self.COMPUTED_RESULTS + if computed_result == 'UNEXPECTED-PASS': + expected = 'FAIL' + actual = 'PASS' + elif computed_result == 'UNEXPECTED-FAIL': + expected = 'PASS' + actual = 'FAIL' + elif computed_result == 'KNOWN-FAIL': + expected = actual = 'FAIL' + elif computed_result == 'SKIPPED': + expected = actual = 'SKIP' + else: + return + self._result_expected = expected + self._result_actual = actual + + def finish(self, result, time_end=None, output=None, reason=None): + """ Marks the test as finished, storing its end time and status + ! Provide the duration as time_end if you only have that. """ + + if result in self.POSSIBLE_RESULTS: + self._result_actual = result + self.result = self.calculate_result(self._result_expected, + self._result_actual) + elif result in self.COMPUTED_RESULTS: + self.infer_results(result) + self.result = result + else: + valid = self.POSSIBLE_RESULTS + self.COMPUTED_RESULTS + msg = "Result '%s' not valid. Need one of: %s" %\ + (result, ', '.join(valid)) + raise ValueError(msg) + + # use lists instead of multiline strings + if isinstance(output, basestring): + output = output.splitlines() + + self.time_end = time_end if time_end is not None else time.time() + self.output = output or self.output + self.reason = reason + + @property + def finished(self): + """ Boolean saying if the test is finished or not """ + return self.result is not None + + @property + def duration(self): + """ Returns the time it took for the test to finish. If the test is + not finished, returns the elapsed time so far """ + if self.result is not None: + return self.time_end - self.time_start + else: + # returns the elapsed time + return time.time() - self.time_start + + +class TestResultCollection(list): + """ Container class that stores test results """ + + resultClass = TestResult + + def __init__(self, suite_name, time_taken=0, resultClass=None): + list.__init__(self) + self.suite_name = suite_name + self.time_taken = time_taken + if resultClass is not None: + self.resultClass = resultClass + + def __str__(self): + return "%s (%.2fs)\n%s" % (self.suite_name, self.time_taken, + list.__str__(self)) + + def subset(self, predicate): + tests = self.filter(predicate) + duration = 0 + sub = TestResultCollection(self.suite_name) + for t in tests: + sub.append(t) + duration += t.duration + sub.time_taken = duration + return sub + + @property + def contexts(self): + """ List of unique contexts for the test results contained """ + cs = [tr.context for tr in self] + return list(set(cs)) + + def filter(self, predicate): + """ Returns a generator of TestResults that satisfy a given predicate """ + return (tr for tr in self if predicate(tr)) + + def tests_with_result(self, result): + """ Returns a generator of TestResults with the given result """ + msg = "Result '%s' not in possible results: %s" %\ + (result, ', '.join(self.resultClass.COMPUTED_RESULTS)) + assert result in self.resultClass.COMPUTED_RESULTS, msg + return self.filter(lambda t: t.result == result) + + @property + def tests(self): + """ Generator of all tests in the collection """ + return (t for t in self) + + def add_result(self, test, result_expected='PASS', + result_actual='PASS', output='', context=None): + def get_class(test): + return test.__class__.__module__ + '.' + test.__class__.__name__ + + t = self.resultClass(name=str(test).split()[0], test_class=get_class(test), + time_start=0, result_expected=result_expected, + context=context) + t.finish(result_actual, time_end=0, reason=relevant_line(output), + output=output) + self.append(t) + + @property + def num_failures(self): + fails = 0 + for t in self: + if t.result in self.resultClass.FAIL_RESULTS: + fails += 1 + return fails + + def add_unittest_result(self, result, context=None): + """ Adds the python unittest result provided to the collection""" + if hasattr(result, 'time_taken'): + self.time_taken += result.time_taken + + for test, output in result.errors: + self.add_result(test, result_actual='ERROR', output=output) + + for test, output in result.failures: + self.add_result(test, result_actual='FAIL', + output=output) + + if hasattr(result, 'unexpectedSuccesses'): + for test in result.unexpectedSuccesses: + self.add_result(test, result_expected='FAIL', + result_actual='PASS') + + if hasattr(result, 'skipped'): + for test, output in result.skipped: + self.add_result(test, result_expected='SKIP', + result_actual='SKIP', output=output) + + if hasattr(result, 'expectedFailures'): + for test, output in result.expectedFailures: + self.add_result(test, result_expected='FAIL', + result_actual='FAIL', output=output) + + # unittest does not store these by default + if hasattr(result, 'tests_passed'): + for test in result.tests_passed: + self.add_result(test) + + @classmethod + def from_unittest_results(cls, context, *results): + """ Creates a TestResultCollection containing the given python + unittest results """ + + if not results: + return cls('from unittest') + + # all the TestResult instances share the same context + context = context or TestContext() + + collection = cls('from %s' % results[0].__class__.__name__) + + for result in results: + collection.add_unittest_result(result, context) + + return collection + + +# used to get exceptions/errors from tracebacks +def relevant_line(s): + KEYWORDS = ('Error:', 'Exception:', 'error:', 'exception:') + lines = s.splitlines() + for line in lines: + for keyword in KEYWORDS: + if keyword in line: + return line + return 'N/A' |