# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import imp import os import re import sys import time import types import unittest import warnings import weakref from unittest.case import ( _ExpectedFailure, _UnexpectedSuccess, SkipTest, ) from marionette_driver.errors import ( MarionetteException, ScriptTimeoutException, TimeoutException, ) from mozlog import get_default_logger def _wraps_parameterized(func, func_suffix, args, kwargs): """Internal: Decorator used in class MetaParameterized.""" def wrapper(self): return func(self, *args, **kwargs) wrapper.__name__ = func.__name__ + '_' + str(func_suffix) wrapper.__doc__ = '[{0}] {1}'.format(func_suffix, func.__doc__) return wrapper class MetaParameterized(type): """ A metaclass that allow a class to use decorators. It can be used like :func:`parameterized` or :func:`with_parameters` to generate new methods. """ RE_ESCAPE_BAD_CHARS = re.compile(r'[\.\(\) -/]') def __new__(cls, name, bases, attrs): for k, v in attrs.items(): if callable(v) and hasattr(v, 'metaparameters'): for func_suffix, args, kwargs in v.metaparameters: func_suffix = cls.RE_ESCAPE_BAD_CHARS.sub('_', func_suffix) wrapper = _wraps_parameterized(v, func_suffix, args, kwargs) if wrapper.__name__ in attrs: raise KeyError("{0} is already a defined method on {1}" .format(wrapper.__name__, name)) attrs[wrapper.__name__] = wrapper del attrs[k] return type.__new__(cls, name, bases, attrs) class JSTest: head_js_re = re.compile(r"MARIONETTE_HEAD_JS(\s*)=(\s*)['|\"](.*?)['|\"];") context_re = re.compile(r"MARIONETTE_CONTEXT(\s*)=(\s*)['|\"](.*?)['|\"];") timeout_re = re.compile(r"MARIONETTE_TIMEOUT(\s*)=(\s*)(\d+);") inactivity_timeout_re = re.compile(r"MARIONETTE_INACTIVITY_TIMEOUT(\s*)=(\s*)(\d+);") class CommonTestCase(unittest.TestCase): __metaclass__ = MetaParameterized match_re = None failureException = AssertionError pydebugger = None def __init__(self, methodName, marionette_weakref, fixtures, **kwargs): super(CommonTestCase, self).__init__(methodName) self.methodName = methodName self._marionette_weakref = marionette_weakref self.fixtures = fixtures self.loglines = [] self.duration = 0 self.start_time = 0 self.expected = kwargs.pop('expected', 'pass') self.logger = get_default_logger() def _enter_pm(self): if self.pydebugger: self.pydebugger.post_mortem(sys.exc_info()[2]) def _addSkip(self, result, reason): addSkip = getattr(result, 'addSkip', None) if addSkip is not None: addSkip(self, reason) else: warnings.warn("TestResult has no addSkip method, skips not reported", RuntimeWarning, 2) result.addSuccess(self) def run(self, result=None): # Bug 967566 suggests refactoring run, which would hopefully # mean getting rid of this inner function, which only sits # here to reduce code duplication: def expected_failure(result, exc_info): addExpectedFailure = getattr(result, "addExpectedFailure", None) if addExpectedFailure is not None: addExpectedFailure(self, exc_info) else: warnings.warn("TestResult has no addExpectedFailure method, " "reporting as passes", RuntimeWarning) result.addSuccess(self) self.start_time = time.time() orig_result = result if result is None: result = self.defaultTestResult() startTestRun = getattr(result, 'startTestRun', None) if startTestRun is not None: startTestRun() result.startTest(self) testMethod = getattr(self, self._testMethodName) if (getattr(self.__class__, "__unittest_skip__", False) or getattr(testMethod, "__unittest_skip__", False)): # If the class or method was skipped. try: skip_why = (getattr(self.__class__, '__unittest_skip_why__', '') or getattr(testMethod, '__unittest_skip_why__', '')) self._addSkip(result, skip_why) finally: result.stopTest(self) self.stop_time = time.time() return try: success = False try: if self.expected == "fail": try: self.setUp() except Exception: raise _ExpectedFailure(sys.exc_info()) else: self.setUp() except SkipTest as e: self._addSkip(result, str(e)) except KeyboardInterrupt: raise except _ExpectedFailure as e: expected_failure(result, e.exc_info) except: self._enter_pm() result.addError(self, sys.exc_info()) else: try: if self.expected == 'fail': try: testMethod() except: raise _ExpectedFailure(sys.exc_info()) raise _UnexpectedSuccess else: testMethod() except self.failureException: self._enter_pm() result.addFailure(self, sys.exc_info()) except KeyboardInterrupt: raise except _ExpectedFailure as e: expected_failure(result, e.exc_info) except _UnexpectedSuccess: addUnexpectedSuccess = getattr(result, 'addUnexpectedSuccess', None) if addUnexpectedSuccess is not None: addUnexpectedSuccess(self) else: warnings.warn("TestResult has no addUnexpectedSuccess method, " "reporting as failures", RuntimeWarning) result.addFailure(self, sys.exc_info()) except SkipTest as e: self._addSkip(result, str(e)) except: self._enter_pm() result.addError(self, sys.exc_info()) else: success = True try: if self.expected == "fail": try: self.tearDown() except: raise _ExpectedFailure(sys.exc_info()) else: self.tearDown() except KeyboardInterrupt: raise except _ExpectedFailure as e: expected_failure(result, e.exc_info) except: self._enter_pm() result.addError(self, sys.exc_info()) success = False # Here we could handle doCleanups() instead of calling cleanTest directly self.cleanTest() if success: result.addSuccess(self) finally: result.stopTest(self) if orig_result is None: stopTestRun = getattr(result, 'stopTestRun', None) if stopTestRun is not None: stopTestRun() @classmethod def match(cls, filename): """Determine if the specified filename should be handled by this test class. This is done by looking for a match for the filename using cls.match_re. """ if not cls.match_re: return False m = cls.match_re.match(filename) return m is not None @classmethod def add_tests_to_suite(cls, mod_name, filepath, suite, testloader, marionette, fixtures, testvars, **kwargs): """Add all the tests in the specified file to the specified suite.""" raise NotImplementedError @property def test_name(self): if hasattr(self, 'jsFile'): return os.path.basename(self.jsFile) else: return '{0}.py {1}.{2}'.format(self.__class__.__module__, self.__class__.__name__, self._testMethodName) def id(self): # TBPL starring requires that the "test name" field of a failure message # not differ over time. The test name to be used is passed to # mozlog via the test id, so this is overriden to maintain # consistency. return self.test_name def setUp(self): # Convert the marionette weakref to an object, just for the # duration of the test; this is deleted in tearDown() to prevent # a persistent circular reference which in turn would prevent # proper garbage collection. self.start_time = time.time() self.marionette = self._marionette_weakref() if self.marionette.session is None: self.marionette.start_session() self.marionette.timeout.reset() super(CommonTestCase, self).setUp() def cleanTest(self): self._deleteSession() def _deleteSession(self): if hasattr(self, 'start_time'): self.duration = time.time() - self.start_time if hasattr(self.marionette, 'session'): if self.marionette.session is not None: try: self.loglines.extend(self.marionette.get_logs()) except Exception, inst: self.loglines = [['Error getting log: {}'.format(inst)]] try: self.marionette.delete_session() except IOError: # Gecko has crashed? pass self.marionette = None def setup_SpecialPowers_observer(self): self.marionette.set_context("chrome") self.marionette.execute_script(""" let SECURITY_PREF = "security.turn_off_all_security_so_that_viruses_can_take_over_this_computer"; Components.utils.import("resource://gre/modules/Preferences.jsm"); Preferences.set(SECURITY_PREF, true); if (!testUtils.hasOwnProperty("specialPowersObserver")) { let loader = Components.classes["@mozilla.org/moz/jssubscript-loader;1"] .getService(Components.interfaces.mozIJSSubScriptLoader); loader.loadSubScript("chrome://specialpowers/content/SpecialPowersObserver.jsm", testUtils); testUtils.specialPowersObserver = new testUtils.SpecialPowersObserver(); testUtils.specialPowersObserver.init(); } """) def run_js_test(self, filename, marionette=None): """Run a JavaScript test file. It collects its set of assertions into the current test's results. :param filename: The path to the JavaScript test file to execute. May be relative to the current script. :param marionette: The Marionette object in which to execute the test. Defaults to self.marionette. """ marionette = marionette or self.marionette if not os.path.isabs(filename): # Find the caller's filename and make the path relative to that. caller_file = sys._getframe(1).f_globals.get('__file__', '') caller_file = os.path.abspath(caller_file) filename = os.path.join(os.path.dirname(caller_file), filename) self.assert_(os.path.exists(filename), 'Script "{}" must exist' .format(filename)) original_test_name = self.marionette.test_name self.marionette.test_name = os.path.basename(filename) f = open(filename, 'r') js = f.read() args = [] head_js = JSTest.head_js_re.search(js) if head_js: head_js = head_js.group(3) head = open(os.path.join(os.path.dirname(filename), head_js), 'r') js = head.read() + js context = JSTest.context_re.search(js) if context: context = context.group(3) else: context = 'content' if 'SpecialPowers' in js: self.setup_SpecialPowers_observer() if context == 'content': js = "var SpecialPowers = window.wrappedJSObject.SpecialPowers;\n" + js else: marionette.execute_script(""" if (typeof(SpecialPowers) == 'undefined') { let loader = Components.classes["@mozilla.org/moz/jssubscript-loader;1"] .getService(Components.interfaces.mozIJSSubScriptLoader); loader.loadSubScript("chrome://specialpowers/content/specialpowersAPI.js"); loader.loadSubScript("chrome://specialpowers/content/SpecialPowersObserverAPI.js"); loader.loadSubScript("chrome://specialpowers/content/ChromePowers.js"); } """) marionette.set_context(context) if context != 'chrome': marionette.navigate('data:text/html,test page') timeout = JSTest.timeout_re.search(js) if timeout: ms = timeout.group(3) marionette.timeout.script = int(ms) / 1000.0 inactivity_timeout = JSTest.inactivity_timeout_re.search(js) if inactivity_timeout: inactivity_timeout = int(inactivity_timeout.group(3)) try: results = marionette.execute_js_script( js, args, inactivity_timeout=inactivity_timeout, filename=os.path.basename(filename) ) self.assertTrue('timeout' not in filename, 'expected timeout not triggered') if 'fail' in filename: self.assertTrue(len(results['failures']) > 0, "expected test failures didn't occur") else: for failure in results['failures']: diag = "" if failure.get('diag') is None else failure['diag'] name = ("got false, expected true" if failure.get('name') is None else failure['name']) self.logger.test_status(self.test_name, name, 'FAIL', message=diag) for failure in results['expectedFailures']: diag = "" if failure.get('diag') is None else failure['diag'] name = ("got false, expected false" if failure.get('name') is None else failure['name']) self.logger.test_status(self.test_name, name, 'FAIL', expected='FAIL', message=diag) for failure in results['unexpectedSuccesses']: diag = "" if failure.get('diag') is None else failure['diag'] name = ("got true, expected false" if failure.get('name') is None else failure['name']) self.logger.test_status(self.test_name, name, 'PASS', expected='FAIL', message=diag) self.assertEqual(0, len(results['failures']), '{} tests failed' .format(len(results['failures']))) if len(results['unexpectedSuccesses']) > 0: raise _UnexpectedSuccess('') if len(results['expectedFailures']) > 0: raise _ExpectedFailure((AssertionError, AssertionError(''), None)) self.assertTrue(results['passed'] + len(results['failures']) + len(results['expectedFailures']) + len(results['unexpectedSuccesses']) > 0, 'no tests run') except ScriptTimeoutException: if 'timeout' in filename: # expected exception pass else: self.loglines = marionette.get_logs() raise self.marionette.test_name = original_test_name class MarionetteTestCase(CommonTestCase): match_re = re.compile(r"test_(.*)\.py$") def __init__(self, marionette_weakref, fixtures, methodName='runTest', filepath='', **kwargs): self.filepath = filepath self.testvars = kwargs.pop('testvars', None) super(MarionetteTestCase, self).__init__( methodName, marionette_weakref=marionette_weakref, fixtures=fixtures, **kwargs) @classmethod def add_tests_to_suite(cls, mod_name, filepath, suite, testloader, marionette, fixtures, testvars, **kwargs): # since we use imp.load_source to load test modules, if a module # is loaded with the same name as another one the module would just be # reloaded. # # We may end up by finding too many test in a module then since # reload() only update the module dict (so old keys are still there!) # see https://docs.python.org/2/library/functions.html#reload # # we get rid of that by removing the module from sys.modules, # so we ensure that it will be fully loaded by the # imp.load_source call. if mod_name in sys.modules: del sys.modules[mod_name] test_mod = imp.load_source(mod_name, filepath) for name in dir(test_mod): obj = getattr(test_mod, name) if (isinstance(obj, (type, types.ClassType)) and issubclass(obj, unittest.TestCase)): testnames = testloader.getTestCaseNames(obj) for testname in testnames: suite.addTest(obj(weakref.ref(marionette), fixtures, methodName=testname, filepath=filepath, testvars=testvars, **kwargs)) def setUp(self): super(MarionetteTestCase, self).setUp() self.marionette.test_name = self.test_name self.marionette.execute_script("log('TEST-START: {0}:{1}')" .format(self.filepath.replace('\\', '\\\\'), self.methodName), sandbox="simpletest") def tearDown(self): # In the case no session is active (eg. the application was quit), start # a new session for clean-up steps. if not self.marionette.session: self.marionette.start_session() if not self.marionette.crashed: try: self.marionette.clear_imported_scripts() self.marionette.execute_script("log('TEST-END: {0}:{1}')" .format(self.filepath.replace('\\', '\\\\'), self.methodName), sandbox="simpletest") self.marionette.test_name = None except (MarionetteException, IOError): # We have tried to log the test end when there is no listener # object that we can access pass super(MarionetteTestCase, self).tearDown() def wait_for_condition(self, method, timeout=30): timeout = float(timeout) + time.time() while time.time() < timeout: value = method(self.marionette) if value: return value time.sleep(0.5) else: raise TimeoutException("wait_for_condition timed out")