diff options
Diffstat (limited to 'testing/mozbase/mozlog/tests/test_structured.py')
-rw-r--r-- | testing/mozbase/mozlog/tests/test_structured.py | 1098 |
1 files changed, 1098 insertions, 0 deletions
diff --git a/testing/mozbase/mozlog/tests/test_structured.py b/testing/mozbase/mozlog/tests/test_structured.py new file mode 100644 index 000000000..ab4b5f582 --- /dev/null +++ b/testing/mozbase/mozlog/tests/test_structured.py @@ -0,0 +1,1098 @@ +# -*- coding: utf-8 -*- +import argparse +import json +import optparse +import os +import StringIO +import sys +import unittest +import signal +import xml.etree.ElementTree as ET + +import mozfile + +from mozlog import ( + commandline, + reader, + structuredlog, + stdadapter, + handlers, + formatters, +) + + +class TestHandler(object): + + def __init__(self): + self.items = [] + + def __call__(self, data): + self.items.append(data) + + @property + def last_item(self): + return self.items[-1] + + @property + def empty(self): + return not self.items + + +class BaseStructuredTest(unittest.TestCase): + + def setUp(self): + self.logger = structuredlog.StructuredLogger("test") + self.handler = TestHandler() + self.logger.add_handler(self.handler) + + def pop_last_item(self): + return self.handler.items.pop() + + def assert_log_equals(self, expected, actual=None): + if actual is None: + actual = self.pop_last_item() + + all_expected = {"pid": os.getpid(), + "thread": "MainThread", + "source": "test"} + specials = set(["time"]) + + all_expected.update(expected) + for key, value in all_expected.iteritems(): + self.assertEqual(actual[key], value) + + self.assertEquals(set(all_expected.keys()) | + specials, set(actual.keys())) + + +class TestStatusHandler(BaseStructuredTest): + + def setUp(self): + super(TestStatusHandler, self).setUp() + self.handler = handlers.StatusHandler() + self.logger.add_handler(self.handler) + + def test_failure_run(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.test_status("test1", "sub1", status='PASS') + self.logger.test_status("test1", "sub2", status='TIMEOUT') + self.logger.test_end("test1", status='OK') + self.logger.suite_end() + summary = self.handler.summarize() + self.assertIn('TIMEOUT', summary.unexpected_statuses) + self.assertEqual(1, summary.unexpected_statuses['TIMEOUT']) + self.assertIn('PASS', summary.expected_statuses) + self.assertEqual(1, summary.expected_statuses['PASS']) + self.assertIn('OK', summary.expected_statuses) + self.assertEqual(1, summary.expected_statuses['OK']) + self.assertEqual(2, summary.action_counts['test_status']) + self.assertEqual(1, summary.action_counts['test_end']) + + def test_error_run(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.error("ERRR!") + self.logger.test_end("test1", status='OK') + self.logger.test_start("test2") + self.logger.test_end("test2", status='OK') + self.logger.suite_end() + summary = self.handler.summarize() + self.assertIn('ERROR', summary.log_level_counts) + self.assertEqual(1, summary.log_level_counts['ERROR']) + self.assertIn('OK', summary.expected_statuses) + self.assertEqual(2, summary.expected_statuses['OK']) + + +class TestStructuredLog(BaseStructuredTest): + + def test_suite_start(self): + self.logger.suite_start(["test"]) + self.assert_log_equals({"action": "suite_start", + "tests": ["test"]}) + self.logger.suite_end() + + def test_suite_end(self): + self.logger.suite_start([]) + self.logger.suite_end() + self.assert_log_equals({"action": "suite_end"}) + + def test_start(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.assert_log_equals({"action": "test_start", + "test": "test1"}) + + self.logger.test_start( + ("test1", "==", "test1-ref"), path="path/to/test") + self.assert_log_equals({"action": "test_start", + "test": ("test1", "==", "test1-ref"), + "path": "path/to/test"}) + self.logger.suite_end() + + def test_start_inprogress(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.test_start("test1") + self.assert_log_equals({"action": "log", + "message": "test_start for test1 logged while in progress.", + "level": "ERROR"}) + self.logger.suite_end() + + def test_status(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.test_status("test1", "subtest name", "fail", expected="FAIL", + message="Test message") + self.assert_log_equals({"action": "test_status", + "subtest": "subtest name", + "status": "FAIL", + "message": "Test message", + "test": "test1"}) + self.logger.test_end("test1", "OK") + self.logger.suite_end() + + def test_status_1(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.test_status("test1", "subtest name", "fail") + self.assert_log_equals({"action": "test_status", + "subtest": "subtest name", + "status": "FAIL", + "expected": "PASS", + "test": "test1"}) + self.logger.test_end("test1", "OK") + self.logger.suite_end() + + def test_status_2(self): + self.assertRaises(ValueError, self.logger.test_status, "test1", "subtest name", + "XXXUNKNOWNXXX") + + def test_status_extra(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.test_status("test1", "subtest name", "FAIL", expected="PASS", + extra={"data": 42}) + self.assert_log_equals({"action": "test_status", + "subtest": "subtest name", + "status": "FAIL", + "expected": "PASS", + "test": "test1", + "extra": {"data": 42}}) + self.logger.test_end("test1", "OK") + self.logger.suite_end() + + def test_status_stack(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.test_status("test1", "subtest name", "FAIL", expected="PASS", + stack="many\nlines\nof\nstack") + self.assert_log_equals({"action": "test_status", + "subtest": "subtest name", + "status": "FAIL", + "expected": "PASS", + "test": "test1", + "stack": "many\nlines\nof\nstack"}) + self.logger.test_end("test1", "OK") + self.logger.suite_end() + + def test_status_not_started(self): + self.logger.test_status("test_UNKNOWN", "subtest", "PASS") + self.assertTrue(self.pop_last_item()["message"].startswith( + "test_status for test_UNKNOWN logged while not in progress. Logged with data: {")) + + def test_end(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.test_end("test1", "fail", message="Test message") + self.assert_log_equals({"action": "test_end", + "status": "FAIL", + "expected": "OK", + "message": "Test message", + "test": "test1"}) + self.logger.suite_end() + + def test_end_1(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.test_end( + "test1", "PASS", expected="PASS", extra={"data": 123}) + self.assert_log_equals({"action": "test_end", + "status": "PASS", + "extra": {"data": 123}, + "test": "test1"}) + self.logger.suite_end() + + def test_end_2(self): + self.assertRaises(ValueError, self.logger.test_end, + "test1", "XXXUNKNOWNXXX") + + def test_end_stack(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.test_end("test1", "PASS", expected="PASS", + stack="many\nlines\nof\nstack") + self.assert_log_equals({"action": "test_end", + "status": "PASS", + "test": "test1", + "stack": "many\nlines\nof\nstack"}) + self.logger.suite_end() + + def test_end_no_start(self): + self.logger.test_end("test1", "PASS", expected="PASS") + self.assertTrue(self.pop_last_item()["message"].startswith( + "test_end for test1 logged while not in progress. Logged with data: {")) + self.logger.suite_end() + + def test_end_twice(self): + self.logger.suite_start([]) + self.logger.test_start("test2") + self.logger.test_end("test2", "PASS", expected="PASS") + self.assert_log_equals({"action": "test_end", + "status": "PASS", + "test": "test2"}) + self.logger.test_end("test2", "PASS", expected="PASS") + last_item = self.pop_last_item() + self.assertEquals(last_item["action"], "log") + self.assertEquals(last_item["level"], "ERROR") + self.assertTrue(last_item["message"].startswith( + "test_end for test2 logged while not in progress. Logged with data: {")) + self.logger.suite_end() + + def test_suite_start_twice(self): + self.logger.suite_start([]) + self.assert_log_equals({"action": "suite_start", + "tests": []}) + self.logger.suite_start([]) + last_item = self.pop_last_item() + self.assertEquals(last_item["action"], "log") + self.assertEquals(last_item["level"], "ERROR") + self.logger.suite_end() + + def test_suite_end_no_start(self): + self.logger.suite_start([]) + self.assert_log_equals({"action": "suite_start", + "tests": []}) + self.logger.suite_end() + self.assert_log_equals({"action": "suite_end"}) + self.logger.suite_end() + last_item = self.pop_last_item() + self.assertEquals(last_item["action"], "log") + self.assertEquals(last_item["level"], "ERROR") + + def test_multiple_loggers_suite_start(self): + logger1 = structuredlog.StructuredLogger("test") + self.logger.suite_start([]) + logger1.suite_start([]) + last_item = self.pop_last_item() + self.assertEquals(last_item["action"], "log") + self.assertEquals(last_item["level"], "ERROR") + + def test_multiple_loggers_test_start(self): + logger1 = structuredlog.StructuredLogger("test") + self.logger.suite_start([]) + self.logger.test_start("test") + logger1.test_start("test") + last_item = self.pop_last_item() + self.assertEquals(last_item["action"], "log") + self.assertEquals(last_item["level"], "ERROR") + + def test_process(self): + self.logger.process_output(1234, "test output") + self.assert_log_equals({"action": "process_output", + "process": "1234", + "data": "test output"}) + + def test_process_start(self): + self.logger.process_start(1234) + self.assert_log_equals({"action": "process_start", + "process": "1234"}) + + def test_process_exit(self): + self.logger.process_exit(1234, 0) + self.assert_log_equals({"action": "process_exit", + "process": "1234", + "exitcode": 0}) + + def test_log(self): + for level in ["critical", "error", "warning", "info", "debug"]: + getattr(self.logger, level)("message") + self.assert_log_equals({"action": "log", + "level": level.upper(), + "message": "message"}) + + def test_logging_adapter(self): + import logging + logging.basicConfig(level="DEBUG") + old_level = logging.root.getEffectiveLevel() + logging.root.setLevel("DEBUG") + + std_logger = logging.getLogger("test") + std_logger.setLevel("DEBUG") + + logger = stdadapter.std_logging_adapter(std_logger) + + try: + for level in ["critical", "error", "warning", "info", "debug"]: + getattr(logger, level)("message") + self.assert_log_equals({"action": "log", + "level": level.upper(), + "message": "message"}) + finally: + logging.root.setLevel(old_level) + + def test_add_remove_handlers(self): + handler = TestHandler() + self.logger.add_handler(handler) + self.logger.info("test1") + + self.assert_log_equals({"action": "log", + "level": "INFO", + "message": "test1"}) + + self.assert_log_equals({"action": "log", + "level": "INFO", + "message": "test1"}, actual=handler.last_item) + + self.logger.remove_handler(handler) + self.logger.info("test2") + + self.assert_log_equals({"action": "log", + "level": "INFO", + "message": "test2"}) + + self.assert_log_equals({"action": "log", + "level": "INFO", + "message": "test1"}, actual=handler.last_item) + + def test_wrapper(self): + file_like = structuredlog.StructuredLogFileLike(self.logger) + + file_like.write("line 1") + + self.assert_log_equals({"action": "log", + "level": "INFO", + "message": "line 1"}) + + file_like.write("line 2\n") + + self.assert_log_equals({"action": "log", + "level": "INFO", + "message": "line 2"}) + + file_like.write("line 3\r") + + self.assert_log_equals({"action": "log", + "level": "INFO", + "message": "line 3"}) + + file_like.write("line 4\r\n") + + self.assert_log_equals({"action": "log", + "level": "INFO", + "message": "line 4"}) + + +class TestTypeConversions(BaseStructuredTest): + + def test_raw(self): + self.logger.log_raw({"action": "suite_start", + "tests": [1], + "time": "1234"}) + self.assert_log_equals({"action": "suite_start", + "tests": ["1"], + "time": 1234}) + self.logger.suite_end() + + def test_tuple(self): + self.logger.suite_start([]) + self.logger.test_start(("\xf0\x90\x8d\x84\xf0\x90\x8c\xb4\xf0\x90\x8d\x83\xf0\x90\x8d\x84", + 42, u"\u16a4")) + self.assert_log_equals({"action": "test_start", + "test": (u'\U00010344\U00010334\U00010343\U00010344', + u"42", u"\u16a4")}) + self.logger.suite_end() + + def test_non_string_messages(self): + self.logger.suite_start([]) + self.logger.info(1) + self.assert_log_equals({"action": "log", + "message": "1", + "level": "INFO"}) + self.logger.info([1, (2, '3'), "s", "s" + chr(255)]) + self.assert_log_equals({"action": "log", + "message": "[1, (2, '3'), 's', 's\\xff']", + "level": "INFO"}) + self.logger.suite_end() + + def test_utf8str_write(self): + with mozfile.NamedTemporaryFile() as logfile: + _fmt = formatters.TbplFormatter() + _handler = handlers.StreamHandler(logfile, _fmt) + self.logger.add_handler(_handler) + self.logger.suite_start([]) + self.logger.info("☺") + logfile.seek(0) + data = logfile.readlines()[-1].strip() + self.assertEquals(data, "☺") + self.logger.suite_end() + + def test_arguments(self): + self.logger.info(message="test") + self.assert_log_equals({"action": "log", + "message": "test", + "level": "INFO"}) + + self.logger.suite_start([], {}) + self.assert_log_equals({"action": "suite_start", + "tests": [], + "run_info": {}}) + self.logger.test_start(test="test1") + self.logger.test_status( + "subtest1", "FAIL", test="test1", status="PASS") + self.assert_log_equals({"action": "test_status", + "test": "test1", + "subtest": "subtest1", + "status": "PASS", + "expected": "FAIL"}) + self.logger.process_output(123, "data", "test") + self.assert_log_equals({"action": "process_output", + "process": "123", + "command": "test", + "data": "data"}) + self.assertRaises(TypeError, self.logger.test_status, subtest="subtest2", + status="FAIL", expected="PASS") + self.assertRaises(TypeError, self.logger.test_status, "test1", "subtest1", + "PASS", "FAIL", "message", "stack", {}, "unexpected") + self.assertRaises(TypeError, self.logger.test_status, + "test1", test="test2") + self.logger.suite_end() + + +class TestComponentFilter(BaseStructuredTest): + + def test_filter_component(self): + component_logger = structuredlog.StructuredLogger(self.logger.name, + "test_component") + component_logger.component_filter = handlers.LogLevelFilter( + lambda x: x, "info") + + self.logger.debug("Test") + self.assertFalse(self.handler.empty) + self.assert_log_equals({"action": "log", + "level": "DEBUG", + "message": "Test"}) + self.assertTrue(self.handler.empty) + + component_logger.info("Test 1") + self.assertFalse(self.handler.empty) + self.assert_log_equals({"action": "log", + "level": "INFO", + "message": "Test 1", + "component": "test_component"}) + + component_logger.debug("Test 2") + self.assertTrue(self.handler.empty) + + component_logger.component_filter = None + + component_logger.debug("Test 3") + self.assertFalse(self.handler.empty) + self.assert_log_equals({"action": "log", + "level": "DEBUG", + "message": "Test 3", + "component": "test_component"}) + + def test_filter_default_component(self): + component_logger = structuredlog.StructuredLogger(self.logger.name, + "test_component") + + self.logger.debug("Test") + self.assertFalse(self.handler.empty) + self.assert_log_equals({"action": "log", + "level": "DEBUG", + "message": "Test"}) + + self.logger.component_filter = handlers.LogLevelFilter( + lambda x: x, "info") + + self.logger.debug("Test 1") + self.assertTrue(self.handler.empty) + + component_logger.debug("Test 2") + self.assertFalse(self.handler.empty) + self.assert_log_equals({"action": "log", + "level": "DEBUG", + "message": "Test 2", + "component": "test_component"}) + + self.logger.component_filter = None + + self.logger.debug("Test 3") + self.assertFalse(self.handler.empty) + self.assert_log_equals({"action": "log", + "level": "DEBUG", + "message": "Test 3"}) + + def test_filter_message_mutuate(self): + def filter_mutate(msg): + if msg["action"] == "log": + msg["message"] = "FILTERED! %s" % msg["message"] + return msg + + self.logger.component_filter = filter_mutate + self.logger.debug("Test") + self.assert_log_equals({"action": "log", + "level": "DEBUG", + "message": "FILTERED! Test"}) + self.logger.component_filter = None + + +class FormatterTest(unittest.TestCase): + + def setUp(self): + self.position = 0 + self.logger = structuredlog.StructuredLogger( + "test_%s" % type(self).__name__) + self.output_file = StringIO.StringIO() + self.handler = handlers.StreamHandler( + self.output_file, self.get_formatter()) + self.logger.add_handler(self.handler) + + def set_position(self, pos=None): + if pos is None: + pos = self.output_file.tell() + self.position = pos + + def get_formatter(self): + raise NotImplementedError( + "FormatterTest subclasses must implement get_formatter") + + @property + def loglines(self): + self.output_file.seek(self.position) + return [line.rstrip() for line in self.output_file.readlines()] + + +class TestHTMLFormatter(FormatterTest): + + def get_formatter(self): + return formatters.HTMLFormatter() + + def test_base64_string(self): + self.logger.suite_start([]) + self.logger.test_start("string_test") + self.logger.test_end("string_test", "FAIL", + extra={"data": "foobar"}) + self.logger.suite_end() + self.assertIn("data:text/html;charset=utf-8;base64,Zm9vYmFy", + ''.join(self.loglines)) + + def test_base64_unicode(self): + self.logger.suite_start([]) + self.logger.test_start("unicode_test") + self.logger.test_end("unicode_test", "FAIL", + extra={"data": unichr(0x02A9)}) + self.logger.suite_end() + self.assertIn("data:text/html;charset=utf-8;base64,yqk=", + ''.join(self.loglines)) + + def test_base64_other(self): + self.logger.suite_start([]) + self.logger.test_start("int_test") + self.logger.test_end("int_test", "FAIL", + extra={"data": {"foo": "bar"}}) + self.logger.suite_end() + self.assertIn("data:text/html;charset=utf-8;base64,eydmb28nOiAnYmFyJ30=", + ''.join(self.loglines)) + + +class TestTBPLFormatter(FormatterTest): + + def get_formatter(self): + return formatters.TbplFormatter() + + def test_unexpected_message(self): + self.logger.suite_start([]) + self.logger.test_start("timeout_test") + self.logger.test_end("timeout_test", + "TIMEOUT", + message="timed out") + self.assertIn("TEST-UNEXPECTED-TIMEOUT | timeout_test | timed out", + self.loglines) + self.logger.suite_end() + + def test_default_unexpected_end_message(self): + self.logger.suite_start([]) + self.logger.test_start("timeout_test") + self.logger.test_end("timeout_test", + "TIMEOUT") + self.assertIn("TEST-UNEXPECTED-TIMEOUT | timeout_test | expected OK", + self.loglines) + self.logger.suite_end() + + def test_default_unexpected_status_message(self): + self.logger.suite_start([]) + self.logger.test_start("timeout_test") + self.logger.test_status("timeout_test", + "subtest", + status="TIMEOUT") + self.assertIn("TEST-UNEXPECTED-TIMEOUT | timeout_test | subtest - expected PASS", + self.loglines) + self.logger.test_end("timeout_test", "OK") + self.logger.suite_end() + + def test_single_newline(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.set_position() + self.logger.test_status("test1", "subtest", + status="PASS", + expected="FAIL") + self.logger.test_end("test1", "OK") + self.logger.suite_end() + + # This sequence should not produce blanklines + for line in self.loglines: + self.assertNotEqual("", line, "No blank line should be present in: %s" % + self.loglines) + + def test_process_exit(self): + self.logger.process_exit(1234, 0) + self.assertIn('TEST-INFO | 1234: exit 0', self.loglines) + + @unittest.skipUnless(os.name == 'posix', 'posix only') + def test_process_exit_with_sig(self): + # subprocess return code is negative when process + # has been killed by signal on posix. + self.logger.process_exit(1234, -signal.SIGTERM) + self.assertIn, ('TEST-INFO | 1234: killed by SIGTERM', self.loglines) + + +class TestMachFormatter(FormatterTest): + + def get_formatter(self): + return formatters.MachFormatter(disable_colors=True) + + def test_summary(self): + self.logger.suite_start([]) + + # Some tests that pass + self.logger.test_start("test1") + self.logger.test_end("test1", status="PASS", expected="PASS") + + self.logger.test_start("test2") + self.logger.test_end("test2", status="PASS", expected="TIMEOUT") + + self.logger.test_start("test3") + self.logger.test_end("test3", status="FAIL", expected="PASS") + + self.set_position() + self.logger.suite_end() + + self.assertIn("Ran 3 tests", self.loglines) + self.assertIn("Expected results: 1", self.loglines) + self.assertIn( + "Unexpected results: 2 (FAIL: 1, PASS: 1)", self.loglines) + self.assertNotIn("test1", self.loglines) + self.assertIn("PASS expected TIMEOUT test2", self.loglines) + self.assertIn("FAIL test3", self.loglines) + + def test_summary_subtests(self): + self.logger.suite_start([]) + + self.logger.test_start("test1") + self.logger.test_status("test1", "subtest1", status="PASS") + self.logger.test_status("test1", "subtest2", status="FAIL") + self.logger.test_end("test1", status="OK", expected="OK") + + self.logger.test_start("test2") + self.logger.test_status("test2", "subtest1", + status="TIMEOUT", expected="PASS") + self.logger.test_end("test2", status="TIMEOUT", expected="OK") + + self.set_position() + self.logger.suite_end() + + self.assertIn("Ran 5 tests (2 parents, 3 subtests)", self.loglines) + self.assertIn("Expected results: 2", self.loglines) + self.assertIn( + "Unexpected results: 3 (FAIL: 1, TIMEOUT: 2)", self.loglines) + + def test_summary_ok(self): + self.logger.suite_start([]) + + self.logger.test_start("test1") + self.logger.test_status("test1", "subtest1", status="PASS") + self.logger.test_status("test1", "subtest2", status="PASS") + self.logger.test_end("test1", status="OK", expected="OK") + + self.logger.test_start("test2") + self.logger.test_status("test2", "subtest1", + status="PASS", expected="PASS") + self.logger.test_end("test2", status="OK", expected="OK") + + self.set_position() + self.logger.suite_end() + + self.assertIn("OK", self.loglines) + self.assertIn("Expected results: 5", self.loglines) + self.assertIn("Unexpected results: 0", self.loglines) + + def test_process_start(self): + self.logger.process_start(1234) + self.assertIn("Started process `1234`", self.loglines[0]) + + def test_process_start_with_command(self): + self.logger.process_start(1234, command='test cmd') + self.assertIn("Started process `1234` (test cmd)", self.loglines[0]) + + def test_process_exit(self): + self.logger.process_exit(1234, 0) + self.assertIn('1234: exit 0', self.loglines[0]) + + @unittest.skipUnless(os.name == 'posix', 'posix only') + def test_process_exit_with_sig(self): + # subprocess return code is negative when process + # has been killed by signal on posix. + self.logger.process_exit(1234, -signal.SIGTERM) + self.assertIn('1234: killed by SIGTERM', self.loglines[0]) + + +class TestXUnitFormatter(FormatterTest): + + def get_formatter(self): + return formatters.XUnitFormatter() + + def log_as_xml(self): + return ET.fromstring('\n'.join(self.loglines)) + + def test_stacktrace_is_present(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.test_end( + "test1", "fail", message="Test message", stack='this\nis\na\nstack') + self.logger.suite_end() + + root = self.log_as_xml() + self.assertIn('this\nis\na\nstack', root.find('testcase/failure').text) + + def test_failure_message(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.test_end("test1", "fail", message="Test message") + self.logger.suite_end() + + root = self.log_as_xml() + self.assertEquals('Expected OK, got FAIL', root.find( + 'testcase/failure').get('message')) + + def test_suite_attrs(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.test_end("test1", "ok", message="Test message") + self.logger.suite_end() + + root = self.log_as_xml() + self.assertEqual(root.get('skips'), '0') + self.assertEqual(root.get('failures'), '0') + self.assertEqual(root.get('errors'), '0') + self.assertEqual(root.get('tests'), '1') + self.assertEqual(root.get('time'), '0.00') + + def test_time_is_not_rounded(self): + # call formatter directly, it is easier here + formatter = self.get_formatter() + formatter.suite_start(dict(time=55000)) + formatter.test_start(dict(time=55100)) + formatter.test_end( + dict(time=55558, test='id', message='message', status='PASS')) + xml_string = formatter.suite_end(dict(time=55559)) + + root = ET.fromstring(xml_string) + self.assertEqual(root.get('time'), '0.56') + self.assertEqual(root.find('testcase').get('time'), '0.46') + + +class TestCommandline(unittest.TestCase): + + def setUp(self): + self.logfile = mozfile.NamedTemporaryFile() + + @property + def loglines(self): + self.logfile.seek(0) + return [line.rstrip() for line in self.logfile.readlines()] + + def test_setup_logging(self): + parser = argparse.ArgumentParser() + commandline.add_logging_group(parser) + args = parser.parse_args(["--log-raw=-"]) + logger = commandline.setup_logging("test_setup_logging", args, {}) + self.assertEqual(len(logger.handlers), 1) + + def test_setup_logging_optparse(self): + parser = optparse.OptionParser() + commandline.add_logging_group(parser) + args, _ = parser.parse_args(["--log-raw=-"]) + logger = commandline.setup_logging("test_optparse", args, {}) + self.assertEqual(len(logger.handlers), 1) + self.assertIsInstance(logger.handlers[0], handlers.StreamHandler) + + def test_limit_formatters(self): + parser = argparse.ArgumentParser() + commandline.add_logging_group(parser, include_formatters=['raw']) + other_formatters = [fmt for fmt in commandline.log_formatters + if fmt != 'raw'] + # check that every formatter except raw is not present + for fmt in other_formatters: + with self.assertRaises(SystemExit): + parser.parse_args(["--log-%s=-" % fmt]) + with self.assertRaises(SystemExit): + parser.parse_args(["--log-%s-level=error" % fmt]) + # raw is still ok + args = parser.parse_args(["--log-raw=-"]) + logger = commandline.setup_logging("test_setup_logging2", args, {}) + self.assertEqual(len(logger.handlers), 1) + + def test_setup_logging_optparse_unicode(self): + parser = optparse.OptionParser() + commandline.add_logging_group(parser) + args, _ = parser.parse_args([u"--log-raw=-"]) + logger = commandline.setup_logging("test_optparse_unicode", args, {}) + self.assertEqual(len(logger.handlers), 1) + self.assertEqual(logger.handlers[0].stream, sys.stdout) + self.assertIsInstance(logger.handlers[0], handlers.StreamHandler) + + def test_logging_defaultlevel(self): + parser = argparse.ArgumentParser() + commandline.add_logging_group(parser) + + args = parser.parse_args(["--log-tbpl=%s" % self.logfile.name]) + logger = commandline.setup_logging("test_fmtopts", args, {}) + logger.info("INFO message") + logger.debug("DEBUG message") + logger.error("ERROR message") + # The debug level is not logged by default. + self.assertEqual(["INFO message", + "ERROR message"], + self.loglines) + + def test_logging_errorlevel(self): + parser = argparse.ArgumentParser() + commandline.add_logging_group(parser) + args = parser.parse_args( + ["--log-tbpl=%s" % self.logfile.name, "--log-tbpl-level=error"]) + logger = commandline.setup_logging("test_fmtopts", args, {}) + logger.info("INFO message") + logger.debug("DEBUG message") + logger.error("ERROR message") + + # Only the error level and above were requested. + self.assertEqual(["ERROR message"], + self.loglines) + + def test_logging_debuglevel(self): + parser = argparse.ArgumentParser() + commandline.add_logging_group(parser) + args = parser.parse_args( + ["--log-tbpl=%s" % self.logfile.name, "--log-tbpl-level=debug"]) + logger = commandline.setup_logging("test_fmtopts", args, {}) + logger.info("INFO message") + logger.debug("DEBUG message") + logger.error("ERROR message") + # Requesting a lower log level than default works as expected. + self.assertEqual(["INFO message", + "DEBUG message", + "ERROR message"], + self.loglines) + + def test_unused_options(self): + parser = argparse.ArgumentParser() + commandline.add_logging_group(parser) + args = parser.parse_args(["--log-tbpl-level=error"]) + self.assertRaises(ValueError, commandline.setup_logging, + "test_fmtopts", args, {}) + + +class TestBuffer(BaseStructuredTest): + + def assert_log_equals(self, expected, actual=None): + if actual is None: + actual = self.pop_last_item() + + all_expected = {"pid": os.getpid(), + "thread": "MainThread", + "source": "testBuffer"} + specials = set(["time"]) + + all_expected.update(expected) + for key, value in all_expected.iteritems(): + self.assertEqual(actual[key], value) + + self.assertEquals(set(all_expected.keys()) | + specials, set(actual.keys())) + + def setUp(self): + self.logger = structuredlog.StructuredLogger("testBuffer") + self.handler = handlers.BufferHandler(TestHandler(), message_limit=4) + self.logger.add_handler(self.handler) + + def tearDown(self): + self.logger.remove_handler(self.handler) + + def pop_last_item(self): + return self.handler.inner.items.pop() + + def test_buffer_messages(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.send_message("buffer", "off") + self.logger.test_status("test1", "sub1", status="PASS") + # Even for buffered actions, the buffer does not interfere if + # buffering is turned off. + self.assert_log_equals({"action": "test_status", + "test": "test1", + "status": "PASS", + "subtest": "sub1"}) + self.logger.send_message("buffer", "on") + self.logger.test_status("test1", "sub2", status="PASS") + self.logger.test_status("test1", "sub3", status="PASS") + self.logger.test_status("test1", "sub4", status="PASS") + self.logger.test_status("test1", "sub5", status="PASS") + self.logger.test_status("test1", "sub6", status="PASS") + self.logger.test_status("test1", "sub7", status="PASS") + self.logger.test_end("test1", status="OK") + self.logger.send_message("buffer", "clear") + self.assert_log_equals({"action": "test_end", + "test": "test1", + "status": "OK"}) + self.logger.suite_end() + + def test_buffer_size(self): + self.logger.suite_start([]) + self.logger.test_start("test1") + self.logger.test_status("test1", "sub1", status="PASS") + self.logger.test_status("test1", "sub2", status="PASS") + self.logger.test_status("test1", "sub3", status="PASS") + self.logger.test_status("test1", "sub4", status="PASS") + self.logger.test_status("test1", "sub5", status="PASS") + self.logger.test_status("test1", "sub6", status="PASS") + self.logger.test_status("test1", "sub7", status="PASS") + + # No test status messages made it to the underlying handler. + self.assert_log_equals({"action": "test_start", + "test": "test1"}) + + # The buffer's actual size never grows beyond the specified limit. + self.assertEquals(len(self.handler._buffer), 4) + + self.logger.test_status("test1", "sub8", status="FAIL") + # The number of messages deleted comes back in a list. + self.assertEquals([4], self.logger.send_message("buffer", "flush")) + + # When the buffer is dumped, the failure is the last thing logged + self.assert_log_equals({"action": "test_status", + "test": "test1", + "subtest": "sub8", + "status": "FAIL", + "expected": "PASS"}) + # Three additional messages should have been retained for context + self.assert_log_equals({"action": "test_status", + "test": "test1", + "status": "PASS", + "subtest": "sub7"}) + self.assert_log_equals({"action": "test_status", + "test": "test1", + "status": "PASS", + "subtest": "sub6"}) + self.assert_log_equals({"action": "test_status", + "test": "test1", + "status": "PASS", + "subtest": "sub5"}) + self.assert_log_equals({"action": "suite_start", + "tests": []}) + + +class TestReader(unittest.TestCase): + + def to_file_like(self, obj): + data_str = "\n".join(json.dumps(item) for item in obj) + return StringIO.StringIO(data_str) + + def test_read(self): + data = [{"action": "action_0", "data": "data_0"}, + {"action": "action_1", "data": "data_1"}] + + f = self.to_file_like(data) + self.assertEquals(data, list(reader.read(f))) + + def test_imap_log(self): + data = [{"action": "action_0", "data": "data_0"}, + {"action": "action_1", "data": "data_1"}] + + f = self.to_file_like(data) + + def f_action_0(item): + return ("action_0", item["data"]) + + def f_action_1(item): + return ("action_1", item["data"]) + + res_iter = reader.imap_log(reader.read(f), + {"action_0": f_action_0, + "action_1": f_action_1}) + self.assertEquals([("action_0", "data_0"), ("action_1", "data_1")], + list(res_iter)) + + def test_each_log(self): + data = [{"action": "action_0", "data": "data_0"}, + {"action": "action_1", "data": "data_1"}] + + f = self.to_file_like(data) + + count = {"action_0": 0, + "action_1": 0} + + def f_action_0(item): + count[item["action"]] += 1 + + def f_action_1(item): + count[item["action"]] += 2 + + reader.each_log(reader.read(f), + {"action_0": f_action_0, + "action_1": f_action_1}) + + self.assertEquals({"action_0": 1, "action_1": 2}, count) + + def test_handler(self): + data = [{"action": "action_0", "data": "data_0"}, + {"action": "action_1", "data": "data_1"}] + + f = self.to_file_like(data) + + test = self + + class ReaderTestHandler(reader.LogHandler): + + def __init__(self): + self.action_0_count = 0 + self.action_1_count = 0 + + def action_0(self, item): + test.assertEquals(item["action"], "action_0") + self.action_0_count += 1 + + def action_1(self, item): + test.assertEquals(item["action"], "action_1") + self.action_1_count += 1 + + handler = ReaderTestHandler() + reader.handle_log(reader.read(f), handler) + + self.assertEquals(handler.action_0_count, 1) + self.assertEquals(handler.action_1_count, 1) + +if __name__ == "__main__": + unittest.main() |