# -*- coding: utf-8 -*- import argparse import json import optparse import os import StringIO import sys import unittest import signal import xml.etree.ElementTree as ET import mozfile from mozlog import ( commandline, reader, structuredlog, stdadapter, handlers, formatters, ) class TestHandler(object): def __init__(self): self.items = [] def __call__(self, data): self.items.append(data) @property def last_item(self): return self.items[-1] @property def empty(self): return not self.items class BaseStructuredTest(unittest.TestCase): def setUp(self): self.logger = structuredlog.StructuredLogger("test") self.handler = TestHandler() self.logger.add_handler(self.handler) def pop_last_item(self): return self.handler.items.pop() def assert_log_equals(self, expected, actual=None): if actual is None: actual = self.pop_last_item() all_expected = {"pid": os.getpid(), "thread": "MainThread", "source": "test"} specials = set(["time"]) all_expected.update(expected) for key, value in all_expected.iteritems(): self.assertEqual(actual[key], value) self.assertEquals(set(all_expected.keys()) | specials, set(actual.keys())) class TestStatusHandler(BaseStructuredTest): def setUp(self): super(TestStatusHandler, self).setUp() self.handler = handlers.StatusHandler() self.logger.add_handler(self.handler) def test_failure_run(self): self.logger.suite_start([]) self.logger.test_start("test1") self.logger.test_status("test1", "sub1", status='PASS') self.logger.test_status("test1", "sub2", status='TIMEOUT') self.logger.test_end("test1", status='OK') self.logger.suite_end() summary = self.handler.summarize() self.assertIn('TIMEOUT', summary.unexpected_statuses) self.assertEqual(1, summary.unexpected_statuses['TIMEOUT']) self.assertIn('PASS', summary.expected_statuses) self.assertEqual(1, summary.expected_statuses['PASS']) self.assertIn('OK', summary.expected_statuses) self.assertEqual(1, summary.expected_statuses['OK']) self.assertEqual(2, summary.action_counts['test_status']) self.assertEqual(1, summary.action_counts['test_end']) def test_error_run(self): self.logger.suite_start([]) self.logger.test_start("test1") self.logger.error("ERRR!") self.logger.test_end("test1", status='OK') self.logger.test_start("test2") self.logger.test_end("test2", status='OK') self.logger.suite_end() summary = self.handler.summarize() self.assertIn('ERROR', summary.log_level_counts) self.assertEqual(1, summary.log_level_counts['ERROR']) self.assertIn('OK', summary.expected_statuses) self.assertEqual(2, summary.expected_statuses['OK']) class TestStructuredLog(BaseStructuredTest): def test_suite_start(self): self.logger.suite_start(["test"]) self.assert_log_equals({"action": "suite_start", "tests": ["test"]}) self.logger.suite_end() def test_suite_end(self): self.logger.suite_start([]) self.logger.suite_end() self.assert_log_equals({"action": "suite_end"}) def test_start(self): self.logger.suite_start([]) self.logger.test_start("test1") self.assert_log_equals({"action": "test_start", "test": "test1"}) self.logger.test_start( ("test1", "==", "test1-ref"), path="path/to/test") self.assert_log_equals({"action": "test_start", "test": ("test1", "==", "test1-ref"), "path": "path/to/test"}) self.logger.suite_end() def test_start_inprogress(self): self.logger.suite_start([]) self.logger.test_start("test1") self.logger.test_start("test1") self.assert_log_equals({"action": "log", "message": "test_start for test1 logged while in progress.", "level": "ERROR"}) self.logger.suite_end() def test_status(self): self.logger.suite_start([]) self.logger.test_start("test1") self.logger.test_status("test1", "subtest name", "fail", expected="FAIL", message="Test message") self.assert_log_equals({"action": "test_status", "subtest": "subtest name", "status": "FAIL", "message": "Test message", "test": "test1"}) self.logger.test_end("test1", "OK") self.logger.suite_end() def test_status_1(self): self.logger.suite_start([]) self.logger.test_start("test1") self.logger.test_status("test1", "subtest name", "fail") self.assert_log_equals({"action": "test_status", "subtest": "subtest name", "status": "FAIL", "expected": "PASS", "test": "test1"}) self.logger.test_end("test1", "OK") self.logger.suite_end() def test_status_2(self): self.assertRaises(ValueError, self.logger.test_status, "test1", "subtest name", "XXXUNKNOWNXXX") def test_status_extra(self): self.logger.suite_start([]) self.logger.test_start("test1") self.logger.test_status("test1", "subtest name", "FAIL", expected="PASS", extra={"data": 42}) self.assert_log_equals({"action": "test_status", "subtest": "subtest name", "status": "FAIL", "expected": "PASS", "test": "test1", "extra": {"data": 42}}) self.logger.test_end("test1", "OK") self.logger.suite_end() def test_status_stack(self): self.logger.suite_start([]) self.logger.test_start("test1") self.logger.test_status("test1", "subtest name", "FAIL", expected="PASS", stack="many\nlines\nof\nstack") self.assert_log_equals({"action": "test_status", "subtest": "subtest name", "status": "FAIL", "expected": "PASS", "test": "test1", "stack": "many\nlines\nof\nstack"}) self.logger.test_end("test1", "OK") self.logger.suite_end() def test_status_not_started(self): self.logger.test_status("test_UNKNOWN", "subtest", "PASS") self.assertTrue(self.pop_last_item()["message"].startswith( "test_status for test_UNKNOWN logged while not in progress. Logged with data: {")) def test_end(self): self.logger.suite_start([]) self.logger.test_start("test1") self.logger.test_end("test1", "fail", message="Test message") self.assert_log_equals({"action": "test_end", "status": "FAIL", "expected": "OK", "message": "Test message", "test": "test1"}) self.logger.suite_end() def test_end_1(self): self.logger.suite_start([]) self.logger.test_start("test1") self.logger.test_end( "test1", "PASS", expected="PASS", extra={"data": 123}) self.assert_log_equals({"action": "test_end", "status": "PASS", "extra": {"data": 123}, "test": "test1"}) self.logger.suite_end() def test_end_2(self): self.assertRaises(ValueError, self.logger.test_end, "test1", "XXXUNKNOWNXXX") def test_end_stack(self): self.logger.suite_start([]) self.logger.test_start("test1") self.logger.test_end("test1", "PASS", expected="PASS", stack="many\nlines\nof\nstack") self.assert_log_equals({"action": "test_end", "status": "PASS", "test": "test1", "stack": "many\nlines\nof\nstack"}) self.logger.suite_end() def test_end_no_start(self): self.logger.test_end("test1", "PASS", expected="PASS") self.assertTrue(self.pop_last_item()["message"].startswith( "test_end for test1 logged while not in progress. Logged with data: {")) self.logger.suite_end() def test_end_twice(self): self.logger.suite_start([]) self.logger.test_start("test2") self.logger.test_end("test2", "PASS", expected="PASS") self.assert_log_equals({"action": "test_end", "status": "PASS", "test": "test2"}) self.logger.test_end("test2", "PASS", expected="PASS") last_item = self.pop_last_item() self.assertEquals(last_item["action"], "log") self.assertEquals(last_item["level"], "ERROR") self.assertTrue(last_item["message"].startswith( "test_end for test2 logged while not in progress. Logged with data: {")) self.logger.suite_end() def test_suite_start_twice(self): self.logger.suite_start([]) self.assert_log_equals({"action": "suite_start", "tests": []}) self.logger.suite_start([]) last_item = self.pop_last_item() self.assertEquals(last_item["action"], "log") self.assertEquals(last_item["level"], "ERROR") self.logger.suite_end() def test_suite_end_no_start(self): self.logger.suite_start([]) self.assert_log_equals({"action": "suite_start", "tests": []}) self.logger.suite_end() self.assert_log_equals({"action": "suite_end"}) self.logger.suite_end() last_item = self.pop_last_item() self.assertEquals(last_item["action"], "log") self.assertEquals(last_item["level"], "ERROR") def test_multiple_loggers_suite_start(self): logger1 = structuredlog.StructuredLogger("test") self.logger.suite_start([]) logger1.suite_start([]) last_item = self.pop_last_item() self.assertEquals(last_item["action"], "log") self.assertEquals(last_item["level"], "ERROR") def test_multiple_loggers_test_start(self): logger1 = structuredlog.StructuredLogger("test") self.logger.suite_start([]) self.logger.test_start("test") logger1.test_start("test") last_item = self.pop_last_item() self.assertEquals(last_item["action"], "log") self.assertEquals(last_item["level"], "ERROR") def test_process(self): self.logger.process_output(1234, "test output") self.assert_log_equals({"action": "process_output", "process": "1234", "data": "test output"}) def test_process_start(self): self.logger.process_start(1234) self.assert_log_equals({"action": "process_start", "process": "1234"}) def test_process_exit(self): self.logger.process_exit(1234, 0) self.assert_log_equals({"action": "process_exit", "process": "1234", "exitcode": 0}) def test_log(self): for level in ["critical", "error", "warning", "info", "debug"]: getattr(self.logger, level)("message") self.assert_log_equals({"action": "log", "level": level.upper(), "message": "message"}) def test_logging_adapter(self): import logging logging.basicConfig(level="DEBUG") old_level = logging.root.getEffectiveLevel() logging.root.setLevel("DEBUG") std_logger = logging.getLogger("test") std_logger.setLevel("DEBUG") logger = stdadapter.std_logging_adapter(std_logger) try: for level in ["critical", "error", "warning", "info", "debug"]: getattr(logger, level)("message") self.assert_log_equals({"action": "log", "level": level.upper(), "message": "message"}) finally: logging.root.setLevel(old_level) def test_add_remove_handlers(self): handler = TestHandler() self.logger.add_handler(handler) self.logger.info("test1") self.assert_log_equals({"action": "log", "level": "INFO", "message": "test1"}) self.assert_log_equals({"action": "log", "level": "INFO", "message": "test1"}, actual=handler.last_item) self.logger.remove_handler(handler) self.logger.info("test2") self.assert_log_equals({"action": "log", "level": "INFO", "message": "test2"}) self.assert_log_equals({"action": "log", "level": "INFO", "message": "test1"}, actual=handler.last_item) def test_wrapper(self): file_like = structuredlog.StructuredLogFileLike(self.logger) file_like.write("line 1") self.assert_log_equals({"action": "log", "level": "INFO", "message": "line 1"}) file_like.write("line 2\n") self.assert_log_equals({"action": "log", "level": "INFO", "message": "line 2"}) file_like.write("line 3\r") self.assert_log_equals({"action": "log", "level": "INFO", "message": "line 3"}) file_like.write("line 4\r\n") self.assert_log_equals({"action": "log", "level": "INFO", "message": "line 4"}) class TestTypeConversions(BaseStructuredTest): def test_raw(self): self.logger.log_raw({"action": "suite_start", "tests": [1], "time": "1234"}) self.assert_log_equals({"action": "suite_start", "tests": ["1"], "time": 1234}) self.logger.suite_end() def test_tuple(self): self.logger.suite_start([]) self.logger.test_start(("\xf0\x90\x8d\x84\xf0\x90\x8c\xb4\xf0\x90\x8d\x83\xf0\x90\x8d\x84", 42, u"\u16a4")) self.assert_log_equals({"action": "test_start", "test": (u'\U00010344\U00010334\U00010343\U00010344', u"42", u"\u16a4")}) self.logger.suite_end() def test_non_string_messages(self): self.logger.suite_start([]) self.logger.info(1) self.assert_log_equals({"action": "log", "message": "1", "level": "INFO"}) self.logger.info([1, (2, '3'), "s", "s" + chr(255)]) self.assert_log_equals({"action": "log", "message": "[1, (2, '3'), 's', 's\\xff']", "level": "INFO"}) self.logger.suite_end() def test_utf8str_write(self): with mozfile.NamedTemporaryFile() as logfile: _fmt = formatters.TbplFormatter() _handler = handlers.StreamHandler(logfile, _fmt) self.logger.add_handler(_handler) self.logger.suite_start([]) self.logger.info("☺") logfile.seek(0) data = logfile.readlines()[-1].strip() self.assertEquals(data, "☺") self.logger.suite_end() def test_arguments(self): self.logger.info(message="test") self.assert_log_equals({"action": "log", "message": "test", "level": "INFO"}) self.logger.suite_start([], {}) self.assert_log_equals({"action": "suite_start", "tests": [], "run_info": {}}) self.logger.test_start(test="test1") self.logger.test_status( "subtest1", "FAIL", test="test1", status="PASS") self.assert_log_equals({"action": "test_status", "test": "test1", "subtest": "subtest1", "status": "PASS", "expected": "FAIL"}) self.logger.process_output(123, "data", "test") self.assert_log_equals({"action": "process_output", "process": "123", "command": "test", "data": "data"}) self.assertRaises(TypeError, self.logger.test_status, subtest="subtest2", status="FAIL", expected="PASS") self.assertRaises(TypeError, self.logger.test_status, "test1", "subtest1", "PASS", "FAIL", "message", "stack", {}, "unexpected") self.assertRaises(TypeError, self.logger.test_status, "test1", test="test2") self.logger.suite_end() class TestComponentFilter(BaseStructuredTest): def test_filter_component(self): component_logger = structuredlog.StructuredLogger(self.logger.name, "test_component") component_logger.component_filter = handlers.LogLevelFilter( lambda x: x, "info") self.logger.debug("Test") self.assertFalse(self.handler.empty) self.assert_log_equals({"action": "log", "level": "DEBUG", "message": "Test"}) self.assertTrue(self.handler.empty) component_logger.info("Test 1") self.assertFalse(self.handler.empty) self.assert_log_equals({"action": "log", "level": "INFO", "message": "Test 1", "component": "test_component"}) component_logger.debug("Test 2") self.assertTrue(self.handler.empty) component_logger.component_filter = None component_logger.debug("Test 3") self.assertFalse(self.handler.empty) self.assert_log_equals({"action": "log", "level": "DEBUG", "message": "Test 3", "component": "test_component"}) def test_filter_default_component(self): component_logger = structuredlog.StructuredLogger(self.logger.name, "test_component") self.logger.debug("Test") self.assertFalse(self.handler.empty) self.assert_log_equals({"action": "log", "level": "DEBUG", "message": "Test"}) self.logger.component_filter = handlers.LogLevelFilter( lambda x: x, "info") self.logger.debug("Test 1") self.assertTrue(self.handler.empty) component_logger.debug("Test 2") self.assertFalse(self.handler.empty) self.assert_log_equals({"action": "log", "level": "DEBUG", "message": "Test 2", "component": "test_component"}) self.logger.component_filter = None self.logger.debug("Test 3") self.assertFalse(self.handler.empty) self.assert_log_equals({"action": "log", "level": "DEBUG", "message": "Test 3"}) def test_filter_message_mutuate(self): def filter_mutate(msg): if msg["action"] == "log": msg["message"] = "FILTERED! %s" % msg["message"] return msg self.logger.component_filter = filter_mutate self.logger.debug("Test") self.assert_log_equals({"action": "log", "level": "DEBUG", "message": "FILTERED! Test"}) self.logger.component_filter = None class FormatterTest(unittest.TestCase): def setUp(self): self.position = 0 self.logger = structuredlog.StructuredLogger( "test_%s" % type(self).__name__) self.output_file = StringIO.StringIO() self.handler = handlers.StreamHandler( self.output_file, self.get_formatter()) self.logger.add_handler(self.handler) def set_position(self, pos=None): if pos is None: pos = self.output_file.tell() self.position = pos def get_formatter(self): raise NotImplementedError( "FormatterTest subclasses must implement get_formatter") @property def loglines(self): self.output_file.seek(self.position) return [line.rstrip() for line in self.output_file.readlines()] class TestHTMLFormatter(FormatterTest): def get_formatter(self): return formatters.HTMLFormatter() def test_base64_string(self): self.logger.suite_start([]) self.logger.test_start("string_test") self.logger.test_end("string_test", "FAIL", extra={"data": "foobar"}) self.logger.suite_end() self.assertIn("data:text/html;charset=utf-8;base64,Zm9vYmFy", ''.join(self.loglines)) def test_base64_unicode(self): self.logger.suite_start([]) self.logger.test_start("unicode_test") self.logger.test_end("unicode_test", "FAIL", extra={"data": unichr(0x02A9)}) self.logger.suite_end() self.assertIn("data:text/html;charset=utf-8;base64,yqk=", ''.join(self.loglines)) def test_base64_other(self): self.logger.suite_start([]) self.logger.test_start("int_test") self.logger.test_end("int_test", "FAIL", extra={"data": {"foo": "bar"}}) self.logger.suite_end() self.assertIn("data:text/html;charset=utf-8;base64,eydmb28nOiAnYmFyJ30=", ''.join(self.loglines)) class TestTBPLFormatter(FormatterTest): def get_formatter(self): return formatters.TbplFormatter() def test_unexpected_message(self): self.logger.suite_start([]) self.logger.test_start("timeout_test") self.logger.test_end("timeout_test", "TIMEOUT", message="timed out") self.assertIn("TEST-UNEXPECTED-TIMEOUT | timeout_test | timed out", self.loglines) self.logger.suite_end() def test_default_unexpected_end_message(self): self.logger.suite_start([]) self.logger.test_start("timeout_test") self.logger.test_end("timeout_test", "TIMEOUT") self.assertIn("TEST-UNEXPECTED-TIMEOUT | timeout_test | expected OK", self.loglines) self.logger.suite_end() def test_default_unexpected_status_message(self): self.logger.suite_start([]) self.logger.test_start("timeout_test") self.logger.test_status("timeout_test", "subtest", status="TIMEOUT") self.assertIn("TEST-UNEXPECTED-TIMEOUT | timeout_test | subtest - expected PASS", self.loglines) self.logger.test_end("timeout_test", "OK") self.logger.suite_end() def test_single_newline(self): self.logger.suite_start([]) self.logger.test_start("test1") self.set_position() self.logger.test_status("test1", "subtest", status="PASS", expected="FAIL") self.logger.test_end("test1", "OK") self.logger.suite_end() # This sequence should not produce blanklines for line in self.loglines: self.assertNotEqual("", line, "No blank line should be present in: %s" % self.loglines) def test_process_exit(self): self.logger.process_exit(1234, 0) self.assertIn('TEST-INFO | 1234: exit 0', self.loglines) @unittest.skipUnless(os.name == 'posix', 'posix only') def test_process_exit_with_sig(self): # subprocess return code is negative when process # has been killed by signal on posix. self.logger.process_exit(1234, -signal.SIGTERM) self.assertIn, ('TEST-INFO | 1234: killed by SIGTERM', self.loglines) class TestMachFormatter(FormatterTest): def get_formatter(self): return formatters.MachFormatter(disable_colors=True) def test_summary(self): self.logger.suite_start([]) # Some tests that pass self.logger.test_start("test1") self.logger.test_end("test1", status="PASS", expected="PASS") self.logger.test_start("test2") self.logger.test_end("test2", status="PASS", expected="TIMEOUT") self.logger.test_start("test3") self.logger.test_end("test3", status="FAIL", expected="PASS") self.set_position() self.logger.suite_end() self.assertIn("Ran 3 tests", self.loglines) self.assertIn("Expected results: 1", self.loglines) self.assertIn( "Unexpected results: 2 (FAIL: 1, PASS: 1)", self.loglines) self.assertNotIn("test1", self.loglines) self.assertIn("PASS expected TIMEOUT test2", self.loglines) self.assertIn("FAIL test3", self.loglines) def test_summary_subtests(self): self.logger.suite_start([]) self.logger.test_start("test1") self.logger.test_status("test1", "subtest1", status="PASS") self.logger.test_status("test1", "subtest2", status="FAIL") self.logger.test_end("test1", status="OK", expected="OK") self.logger.test_start("test2") self.logger.test_status("test2", "subtest1", status="TIMEOUT", expected="PASS") self.logger.test_end("test2", status="TIMEOUT", expected="OK") self.set_position() self.logger.suite_end() self.assertIn("Ran 5 tests (2 parents, 3 subtests)", self.loglines) self.assertIn("Expected results: 2", self.loglines) self.assertIn( "Unexpected results: 3 (FAIL: 1, TIMEOUT: 2)", self.loglines) def test_summary_ok(self): self.logger.suite_start([]) self.logger.test_start("test1") self.logger.test_status("test1", "subtest1", status="PASS") self.logger.test_status("test1", "subtest2", status="PASS") self.logger.test_end("test1", status="OK", expected="OK") self.logger.test_start("test2") self.logger.test_status("test2", "subtest1", status="PASS", expected="PASS") self.logger.test_end("test2", status="OK", expected="OK") self.set_position() self.logger.suite_end() self.assertIn("OK", self.loglines) self.assertIn("Expected results: 5", self.loglines) self.assertIn("Unexpected results: 0", self.loglines) def test_process_start(self): self.logger.process_start(1234) self.assertIn("Started process `1234`", self.loglines[0]) def test_process_start_with_command(self): self.logger.process_start(1234, command='test cmd') self.assertIn("Started process `1234` (test cmd)", self.loglines[0]) def test_process_exit(self): self.logger.process_exit(1234, 0) self.assertIn('1234: exit 0', self.loglines[0]) @unittest.skipUnless(os.name == 'posix', 'posix only') def test_process_exit_with_sig(self): # subprocess return code is negative when process # has been killed by signal on posix. self.logger.process_exit(1234, -signal.SIGTERM) self.assertIn('1234: killed by SIGTERM', self.loglines[0]) class TestXUnitFormatter(FormatterTest): def get_formatter(self): return formatters.XUnitFormatter() def log_as_xml(self): return ET.fromstring('\n'.join(self.loglines)) def test_stacktrace_is_present(self): self.logger.suite_start([]) self.logger.test_start("test1") self.logger.test_end( "test1", "fail", message="Test message", stack='this\nis\na\nstack') self.logger.suite_end() root = self.log_as_xml() self.assertIn('this\nis\na\nstack', root.find('testcase/failure').text) def test_failure_message(self): self.logger.suite_start([]) self.logger.test_start("test1") self.logger.test_end("test1", "fail", message="Test message") self.logger.suite_end() root = self.log_as_xml() self.assertEquals('Expected OK, got FAIL', root.find( 'testcase/failure').get('message')) def test_suite_attrs(self): self.logger.suite_start([]) self.logger.test_start("test1") self.logger.test_end("test1", "ok", message="Test message") self.logger.suite_end() root = self.log_as_xml() self.assertEqual(root.get('skips'), '0') self.assertEqual(root.get('failures'), '0') self.assertEqual(root.get('errors'), '0') self.assertEqual(root.get('tests'), '1') self.assertEqual(root.get('time'), '0.00') def test_time_is_not_rounded(self): # call formatter directly, it is easier here formatter = self.get_formatter() formatter.suite_start(dict(time=55000)) formatter.test_start(dict(time=55100)) formatter.test_end( dict(time=55558, test='id', message='message', status='PASS')) xml_string = formatter.suite_end(dict(time=55559)) root = ET.fromstring(xml_string) self.assertEqual(root.get('time'), '0.56') self.assertEqual(root.find('testcase').get('time'), '0.46') class TestCommandline(unittest.TestCase): def setUp(self): self.logfile = mozfile.NamedTemporaryFile() @property def loglines(self): self.logfile.seek(0) return [line.rstrip() for line in self.logfile.readlines()] def test_setup_logging(self): parser = argparse.ArgumentParser() commandline.add_logging_group(parser) args = parser.parse_args(["--log-raw=-"]) logger = commandline.setup_logging("test_setup_logging", args, {}) self.assertEqual(len(logger.handlers), 1) def test_setup_logging_optparse(self): parser = optparse.OptionParser() commandline.add_logging_group(parser) args, _ = parser.parse_args(["--log-raw=-"]) logger = commandline.setup_logging("test_optparse", args, {}) self.assertEqual(len(logger.handlers), 1) self.assertIsInstance(logger.handlers[0], handlers.StreamHandler) def test_limit_formatters(self): parser = argparse.ArgumentParser() commandline.add_logging_group(parser, include_formatters=['raw']) other_formatters = [fmt for fmt in commandline.log_formatters if fmt != 'raw'] # check that every formatter except raw is not present for fmt in other_formatters: with self.assertRaises(SystemExit): parser.parse_args(["--log-%s=-" % fmt]) with self.assertRaises(SystemExit): parser.parse_args(["--log-%s-level=error" % fmt]) # raw is still ok args = parser.parse_args(["--log-raw=-"]) logger = commandline.setup_logging("test_setup_logging2", args, {}) self.assertEqual(len(logger.handlers), 1) def test_setup_logging_optparse_unicode(self): parser = optparse.OptionParser() commandline.add_logging_group(parser) args, _ = parser.parse_args([u"--log-raw=-"]) logger = commandline.setup_logging("test_optparse_unicode", args, {}) self.assertEqual(len(logger.handlers), 1) self.assertEqual(logger.handlers[0].stream, sys.stdout) self.assertIsInstance(logger.handlers[0], handlers.StreamHandler) def test_logging_defaultlevel(self): parser = argparse.ArgumentParser() commandline.add_logging_group(parser) args = parser.parse_args(["--log-tbpl=%s" % self.logfile.name]) logger = commandline.setup_logging("test_fmtopts", args, {}) logger.info("INFO message") logger.debug("DEBUG message") logger.error("ERROR message") # The debug level is not logged by default. self.assertEqual(["INFO message", "ERROR message"], self.loglines) def test_logging_errorlevel(self): parser = argparse.ArgumentParser() commandline.add_logging_group(parser) args = parser.parse_args( ["--log-tbpl=%s" % self.logfile.name, "--log-tbpl-level=error"]) logger = commandline.setup_logging("test_fmtopts", args, {}) logger.info("INFO message") logger.debug("DEBUG message") logger.error("ERROR message") # Only the error level and above were requested. self.assertEqual(["ERROR message"], self.loglines) def test_logging_debuglevel(self): parser = argparse.ArgumentParser() commandline.add_logging_group(parser) args = parser.parse_args( ["--log-tbpl=%s" % self.logfile.name, "--log-tbpl-level=debug"]) logger = commandline.setup_logging("test_fmtopts", args, {}) logger.info("INFO message") logger.debug("DEBUG message") logger.error("ERROR message") # Requesting a lower log level than default works as expected. self.assertEqual(["INFO message", "DEBUG message", "ERROR message"], self.loglines) def test_unused_options(self): parser = argparse.ArgumentParser() commandline.add_logging_group(parser) args = parser.parse_args(["--log-tbpl-level=error"]) self.assertRaises(ValueError, commandline.setup_logging, "test_fmtopts", args, {}) class TestBuffer(BaseStructuredTest): def assert_log_equals(self, expected, actual=None): if actual is None: actual = self.pop_last_item() all_expected = {"pid": os.getpid(), "thread": "MainThread", "source": "testBuffer"} specials = set(["time"]) all_expected.update(expected) for key, value in all_expected.iteritems(): self.assertEqual(actual[key], value) self.assertEquals(set(all_expected.keys()) | specials, set(actual.keys())) def setUp(self): self.logger = structuredlog.StructuredLogger("testBuffer") self.handler = handlers.BufferHandler(TestHandler(), message_limit=4) self.logger.add_handler(self.handler) def tearDown(self): self.logger.remove_handler(self.handler) def pop_last_item(self): return self.handler.inner.items.pop() def test_buffer_messages(self): self.logger.suite_start([]) self.logger.test_start("test1") self.logger.send_message("buffer", "off") self.logger.test_status("test1", "sub1", status="PASS") # Even for buffered actions, the buffer does not interfere if # buffering is turned off. self.assert_log_equals({"action": "test_status", "test": "test1", "status": "PASS", "subtest": "sub1"}) self.logger.send_message("buffer", "on") self.logger.test_status("test1", "sub2", status="PASS") self.logger.test_status("test1", "sub3", status="PASS") self.logger.test_status("test1", "sub4", status="PASS") self.logger.test_status("test1", "sub5", status="PASS") self.logger.test_status("test1", "sub6", status="PASS") self.logger.test_status("test1", "sub7", status="PASS") self.logger.test_end("test1", status="OK") self.logger.send_message("buffer", "clear") self.assert_log_equals({"action": "test_end", "test": "test1", "status": "OK"}) self.logger.suite_end() def test_buffer_size(self): self.logger.suite_start([]) self.logger.test_start("test1") self.logger.test_status("test1", "sub1", status="PASS") self.logger.test_status("test1", "sub2", status="PASS") self.logger.test_status("test1", "sub3", status="PASS") self.logger.test_status("test1", "sub4", status="PASS") self.logger.test_status("test1", "sub5", status="PASS") self.logger.test_status("test1", "sub6", status="PASS") self.logger.test_status("test1", "sub7", status="PASS") # No test status messages made it to the underlying handler. self.assert_log_equals({"action": "test_start", "test": "test1"}) # The buffer's actual size never grows beyond the specified limit. self.assertEquals(len(self.handler._buffer), 4) self.logger.test_status("test1", "sub8", status="FAIL") # The number of messages deleted comes back in a list. self.assertEquals([4], self.logger.send_message("buffer", "flush")) # When the buffer is dumped, the failure is the last thing logged self.assert_log_equals({"action": "test_status", "test": "test1", "subtest": "sub8", "status": "FAIL", "expected": "PASS"}) # Three additional messages should have been retained for context self.assert_log_equals({"action": "test_status", "test": "test1", "status": "PASS", "subtest": "sub7"}) self.assert_log_equals({"action": "test_status", "test": "test1", "status": "PASS", "subtest": "sub6"}) self.assert_log_equals({"action": "test_status", "test": "test1", "status": "PASS", "subtest": "sub5"}) self.assert_log_equals({"action": "suite_start", "tests": []}) class TestReader(unittest.TestCase): def to_file_like(self, obj): data_str = "\n".join(json.dumps(item) for item in obj) return StringIO.StringIO(data_str) def test_read(self): data = [{"action": "action_0", "data": "data_0"}, {"action": "action_1", "data": "data_1"}] f = self.to_file_like(data) self.assertEquals(data, list(reader.read(f))) def test_imap_log(self): data = [{"action": "action_0", "data": "data_0"}, {"action": "action_1", "data": "data_1"}] f = self.to_file_like(data) def f_action_0(item): return ("action_0", item["data"]) def f_action_1(item): return ("action_1", item["data"]) res_iter = reader.imap_log(reader.read(f), {"action_0": f_action_0, "action_1": f_action_1}) self.assertEquals([("action_0", "data_0"), ("action_1", "data_1")], list(res_iter)) def test_each_log(self): data = [{"action": "action_0", "data": "data_0"}, {"action": "action_1", "data": "data_1"}] f = self.to_file_like(data) count = {"action_0": 0, "action_1": 0} def f_action_0(item): count[item["action"]] += 1 def f_action_1(item): count[item["action"]] += 2 reader.each_log(reader.read(f), {"action_0": f_action_0, "action_1": f_action_1}) self.assertEquals({"action_0": 1, "action_1": 2}, count) def test_handler(self): data = [{"action": "action_0", "data": "data_0"}, {"action": "action_1", "data": "data_1"}] f = self.to_file_like(data) test = self class ReaderTestHandler(reader.LogHandler): def __init__(self): self.action_0_count = 0 self.action_1_count = 0 def action_0(self, item): test.assertEquals(item["action"], "action_0") self.action_0_count += 1 def action_1(self, item): test.assertEquals(item["action"], "action_1") self.action_1_count += 1 handler = ReaderTestHandler() reader.handle_log(reader.read(f), handler) self.assertEquals(handler.action_0_count, 1) self.assertEquals(handler.action_1_count, 1) if __name__ == "__main__": unittest.main()