summaryrefslogtreecommitdiffstats
path: root/testing/mozbase/mozlog/tests
diff options
context:
space:
mode:
Diffstat (limited to 'testing/mozbase/mozlog/tests')
-rw-r--r--testing/mozbase/mozlog/tests/manifest.ini2
-rw-r--r--testing/mozbase/mozlog/tests/test_logger.py264
-rw-r--r--testing/mozbase/mozlog/tests/test_structured.py1098
3 files changed, 1364 insertions, 0 deletions
diff --git a/testing/mozbase/mozlog/tests/manifest.ini b/testing/mozbase/mozlog/tests/manifest.ini
new file mode 100644
index 000000000..62331ee30
--- /dev/null
+++ b/testing/mozbase/mozlog/tests/manifest.ini
@@ -0,0 +1,2 @@
+[test_logger.py]
+[test_structured.py]
diff --git a/testing/mozbase/mozlog/tests/test_logger.py b/testing/mozbase/mozlog/tests/test_logger.py
new file mode 100644
index 000000000..30f4eb569
--- /dev/null
+++ b/testing/mozbase/mozlog/tests/test_logger.py
@@ -0,0 +1,264 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this file,
+# You can obtain one at http://mozilla.org/MPL/2.0/.
+
+import datetime
+import json
+import socket
+import threading
+import time
+import unittest
+
+import mozfile
+
+import mozlog.unstructured as mozlog
+
+
+class ListHandler(mozlog.Handler):
+ """Mock handler appends messages to a list for later inspection."""
+
+ def __init__(self):
+ mozlog.Handler.__init__(self)
+ self.messages = []
+
+ def emit(self, record):
+ self.messages.append(self.format(record))
+
+
+class TestLogging(unittest.TestCase):
+ """Tests behavior of basic mozlog api."""
+
+ def test_logger_defaults(self):
+ """Tests the default logging format and behavior."""
+
+ default_logger = mozlog.getLogger('default.logger')
+ self.assertEqual(default_logger.name, 'default.logger')
+ self.assertEqual(len(default_logger.handlers), 1)
+ self.assertTrue(isinstance(default_logger.handlers[0],
+ mozlog.StreamHandler))
+
+ f = mozfile.NamedTemporaryFile()
+ list_logger = mozlog.getLogger('file.logger',
+ handler=mozlog.FileHandler(f.name))
+ self.assertEqual(len(list_logger.handlers), 1)
+ self.assertTrue(isinstance(list_logger.handlers[0],
+ mozlog.FileHandler))
+ f.close()
+
+ self.assertRaises(ValueError, mozlog.getLogger,
+ 'file.logger', handler=ListHandler())
+
+ def test_timestamps(self):
+ """Verifies that timestamps are included when asked for."""
+ log_name = 'test'
+ handler = ListHandler()
+ handler.setFormatter(mozlog.MozFormatter())
+ log = mozlog.getLogger(log_name, handler=handler)
+ log.info('no timestamp')
+ self.assertTrue(handler.messages[-1].startswith('%s ' % log_name))
+ handler.setFormatter(mozlog.MozFormatter(include_timestamp=True))
+ log.info('timestamp')
+ # Just verify that this raises no exceptions.
+ datetime.datetime.strptime(handler.messages[-1][:23],
+ '%Y-%m-%d %H:%M:%S,%f')
+
+
+class TestStructuredLogging(unittest.TestCase):
+ """Tests structured output in mozlog."""
+
+ def setUp(self):
+ self.handler = ListHandler()
+ self.handler.setFormatter(mozlog.JSONFormatter())
+ self.logger = mozlog.MozLogger('test.Logger')
+ self.logger.addHandler(self.handler)
+ self.logger.setLevel(mozlog.DEBUG)
+
+ def check_messages(self, expected, actual):
+ """Checks actual for equality with corresponding fields in actual.
+ The actual message should contain all fields in expected, and
+ should be identical, with the exception of the timestamp field.
+ The actual message should contain no fields other than the timestamp
+ field and those present in expected."""
+
+ self.assertTrue(isinstance(actual['_time'], (int, long)))
+
+ for k, v in expected.items():
+ self.assertEqual(v, actual[k])
+
+ for k in actual.keys():
+ if k != '_time':
+ self.assertTrue(expected.get(k) is not None)
+
+ def test_structured_output(self):
+ self.logger.log_structured('test_message',
+ {'_level': mozlog.INFO,
+ '_message': 'message one'})
+ self.logger.log_structured('test_message',
+ {'_level': mozlog.INFO,
+ '_message': 'message two'})
+ self.logger.log_structured('error_message',
+ {'_level': mozlog.ERROR,
+ 'diagnostic': 'unexpected error'})
+
+ message_one_expected = {'_namespace': 'test.Logger',
+ '_level': 'INFO',
+ '_message': 'message one',
+ 'action': 'test_message'}
+ message_two_expected = {'_namespace': 'test.Logger',
+ '_level': 'INFO',
+ '_message': 'message two',
+ 'action': 'test_message'}
+ message_three_expected = {'_namespace': 'test.Logger',
+ '_level': 'ERROR',
+ 'diagnostic': 'unexpected error',
+ 'action': 'error_message'}
+
+ message_one_actual = json.loads(self.handler.messages[0])
+ message_two_actual = json.loads(self.handler.messages[1])
+ message_three_actual = json.loads(self.handler.messages[2])
+
+ self.check_messages(message_one_expected, message_one_actual)
+ self.check_messages(message_two_expected, message_two_actual)
+ self.check_messages(message_three_expected, message_three_actual)
+
+ def test_unstructured_conversion(self):
+ """ Tests that logging to a logger with a structured formatter
+ via the traditional logging interface works as expected. """
+ self.logger.info('%s %s %d', 'Message', 'number', 1)
+ self.logger.error('Message number 2')
+ self.logger.debug('Message with %s', 'some extras',
+ extra={'params': {'action': 'mozlog_test_output',
+ 'is_failure': False}})
+ message_one_expected = {'_namespace': 'test.Logger',
+ '_level': 'INFO',
+ '_message': 'Message number 1'}
+ message_two_expected = {'_namespace': 'test.Logger',
+ '_level': 'ERROR',
+ '_message': 'Message number 2'}
+ message_three_expected = {'_namespace': 'test.Logger',
+ '_level': 'DEBUG',
+ '_message': 'Message with some extras',
+ 'action': 'mozlog_test_output',
+ 'is_failure': False}
+
+ message_one_actual = json.loads(self.handler.messages[0])
+ message_two_actual = json.loads(self.handler.messages[1])
+ message_three_actual = json.loads(self.handler.messages[2])
+
+ self.check_messages(message_one_expected, message_one_actual)
+ self.check_messages(message_two_expected, message_two_actual)
+ self.check_messages(message_three_expected, message_three_actual)
+
+ def message_callback(self):
+ if len(self.handler.messages) == 3:
+ message_one_expected = {'_namespace': 'test.Logger',
+ '_level': 'DEBUG',
+ '_message': 'socket message one',
+ 'action': 'test_message'}
+ message_two_expected = {'_namespace': 'test.Logger',
+ '_level': 'DEBUG',
+ '_message': 'socket message two',
+ 'action': 'test_message'}
+ message_three_expected = {'_namespace': 'test.Logger',
+ '_level': 'DEBUG',
+ '_message': 'socket message three',
+ 'action': 'test_message'}
+
+ message_one_actual = json.loads(self.handler.messages[0])
+
+ message_two_actual = json.loads(self.handler.messages[1])
+
+ message_three_actual = json.loads(self.handler.messages[2])
+
+ self.check_messages(message_one_expected, message_one_actual)
+ self.check_messages(message_two_expected, message_two_actual)
+ self.check_messages(message_three_expected, message_three_actual)
+
+ def test_log_listener(self):
+ connection = '127.0.0.1', 0
+ self.log_server = mozlog.LogMessageServer(connection,
+ self.logger,
+ message_callback=self.message_callback,
+ timeout=0.5)
+
+ message_string_one = json.dumps({'_message': 'socket message one',
+ 'action': 'test_message',
+ '_level': 'DEBUG'})
+ message_string_two = json.dumps({'_message': 'socket message two',
+ 'action': 'test_message',
+ '_level': 'DEBUG'})
+
+ message_string_three = json.dumps({'_message': 'socket message three',
+ 'action': 'test_message',
+ '_level': 'DEBUG'})
+
+ message_string = message_string_one + '\n' + \
+ message_string_two + '\n' + \
+ message_string_three + '\n'
+
+ server_thread = threading.Thread(target=self.log_server.handle_request)
+ server_thread.start()
+
+ host, port = self.log_server.server_address
+
+ sock = socket.socket()
+ sock.connect((host, port))
+
+ # Sleeps prevent listener from receiving entire message in a single call
+ # to recv in order to test reconstruction of partial messages.
+ sock.sendall(message_string[:8])
+ time.sleep(.01)
+ sock.sendall(message_string[8:32])
+ time.sleep(.01)
+ sock.sendall(message_string[32:64])
+ time.sleep(.01)
+ sock.sendall(message_string[64:128])
+ time.sleep(.01)
+ sock.sendall(message_string[128:])
+
+ server_thread.join()
+
+
+class Loggable(mozlog.LoggingMixin):
+ """Trivial class inheriting from LoggingMixin"""
+ pass
+
+
+class TestLoggingMixin(unittest.TestCase):
+ """Tests basic use of LoggingMixin"""
+
+ def test_mixin(self):
+ loggable = Loggable()
+ self.assertTrue(not hasattr(loggable, "_logger"))
+ loggable.log(mozlog.INFO, "This will instantiate the logger")
+ self.assertTrue(hasattr(loggable, "_logger"))
+ self.assertEqual(loggable._logger.name, "test_logger.Loggable")
+
+ self.assertRaises(ValueError, loggable.set_logger,
+ "not a logger")
+
+ logger = mozlog.MozLogger('test.mixin')
+ handler = ListHandler()
+ logger.addHandler(handler)
+ loggable.set_logger(logger)
+ self.assertTrue(isinstance(loggable._logger.handlers[0],
+ ListHandler))
+ self.assertEqual(loggable._logger.name, "test.mixin")
+
+ loggable.log(mozlog.WARN, 'message for "log" method')
+ loggable.info('message for "info" method')
+ loggable.error('message for "error" method')
+ loggable.log_structured('test_message',
+ params={'_message': 'message for ' +
+ '"log_structured" method'})
+
+ expected_messages = ['message for "log" method',
+ 'message for "info" method',
+ 'message for "error" method',
+ 'message for "log_structured" method']
+
+ actual_messages = loggable._logger.handlers[0].messages
+ self.assertEqual(expected_messages, actual_messages)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/testing/mozbase/mozlog/tests/test_structured.py b/testing/mozbase/mozlog/tests/test_structured.py
new file mode 100644
index 000000000..ab4b5f582
--- /dev/null
+++ b/testing/mozbase/mozlog/tests/test_structured.py
@@ -0,0 +1,1098 @@
+# -*- coding: utf-8 -*-
+import argparse
+import json
+import optparse
+import os
+import StringIO
+import sys
+import unittest
+import signal
+import xml.etree.ElementTree as ET
+
+import mozfile
+
+from mozlog import (
+ commandline,
+ reader,
+ structuredlog,
+ stdadapter,
+ handlers,
+ formatters,
+)
+
+
+class TestHandler(object):
+
+ def __init__(self):
+ self.items = []
+
+ def __call__(self, data):
+ self.items.append(data)
+
+ @property
+ def last_item(self):
+ return self.items[-1]
+
+ @property
+ def empty(self):
+ return not self.items
+
+
+class BaseStructuredTest(unittest.TestCase):
+
+ def setUp(self):
+ self.logger = structuredlog.StructuredLogger("test")
+ self.handler = TestHandler()
+ self.logger.add_handler(self.handler)
+
+ def pop_last_item(self):
+ return self.handler.items.pop()
+
+ def assert_log_equals(self, expected, actual=None):
+ if actual is None:
+ actual = self.pop_last_item()
+
+ all_expected = {"pid": os.getpid(),
+ "thread": "MainThread",
+ "source": "test"}
+ specials = set(["time"])
+
+ all_expected.update(expected)
+ for key, value in all_expected.iteritems():
+ self.assertEqual(actual[key], value)
+
+ self.assertEquals(set(all_expected.keys()) |
+ specials, set(actual.keys()))
+
+
+class TestStatusHandler(BaseStructuredTest):
+
+ def setUp(self):
+ super(TestStatusHandler, self).setUp()
+ self.handler = handlers.StatusHandler()
+ self.logger.add_handler(self.handler)
+
+ def test_failure_run(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.test_status("test1", "sub1", status='PASS')
+ self.logger.test_status("test1", "sub2", status='TIMEOUT')
+ self.logger.test_end("test1", status='OK')
+ self.logger.suite_end()
+ summary = self.handler.summarize()
+ self.assertIn('TIMEOUT', summary.unexpected_statuses)
+ self.assertEqual(1, summary.unexpected_statuses['TIMEOUT'])
+ self.assertIn('PASS', summary.expected_statuses)
+ self.assertEqual(1, summary.expected_statuses['PASS'])
+ self.assertIn('OK', summary.expected_statuses)
+ self.assertEqual(1, summary.expected_statuses['OK'])
+ self.assertEqual(2, summary.action_counts['test_status'])
+ self.assertEqual(1, summary.action_counts['test_end'])
+
+ def test_error_run(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.error("ERRR!")
+ self.logger.test_end("test1", status='OK')
+ self.logger.test_start("test2")
+ self.logger.test_end("test2", status='OK')
+ self.logger.suite_end()
+ summary = self.handler.summarize()
+ self.assertIn('ERROR', summary.log_level_counts)
+ self.assertEqual(1, summary.log_level_counts['ERROR'])
+ self.assertIn('OK', summary.expected_statuses)
+ self.assertEqual(2, summary.expected_statuses['OK'])
+
+
+class TestStructuredLog(BaseStructuredTest):
+
+ def test_suite_start(self):
+ self.logger.suite_start(["test"])
+ self.assert_log_equals({"action": "suite_start",
+ "tests": ["test"]})
+ self.logger.suite_end()
+
+ def test_suite_end(self):
+ self.logger.suite_start([])
+ self.logger.suite_end()
+ self.assert_log_equals({"action": "suite_end"})
+
+ def test_start(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.assert_log_equals({"action": "test_start",
+ "test": "test1"})
+
+ self.logger.test_start(
+ ("test1", "==", "test1-ref"), path="path/to/test")
+ self.assert_log_equals({"action": "test_start",
+ "test": ("test1", "==", "test1-ref"),
+ "path": "path/to/test"})
+ self.logger.suite_end()
+
+ def test_start_inprogress(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.test_start("test1")
+ self.assert_log_equals({"action": "log",
+ "message": "test_start for test1 logged while in progress.",
+ "level": "ERROR"})
+ self.logger.suite_end()
+
+ def test_status(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.test_status("test1", "subtest name", "fail", expected="FAIL",
+ message="Test message")
+ self.assert_log_equals({"action": "test_status",
+ "subtest": "subtest name",
+ "status": "FAIL",
+ "message": "Test message",
+ "test": "test1"})
+ self.logger.test_end("test1", "OK")
+ self.logger.suite_end()
+
+ def test_status_1(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.test_status("test1", "subtest name", "fail")
+ self.assert_log_equals({"action": "test_status",
+ "subtest": "subtest name",
+ "status": "FAIL",
+ "expected": "PASS",
+ "test": "test1"})
+ self.logger.test_end("test1", "OK")
+ self.logger.suite_end()
+
+ def test_status_2(self):
+ self.assertRaises(ValueError, self.logger.test_status, "test1", "subtest name",
+ "XXXUNKNOWNXXX")
+
+ def test_status_extra(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.test_status("test1", "subtest name", "FAIL", expected="PASS",
+ extra={"data": 42})
+ self.assert_log_equals({"action": "test_status",
+ "subtest": "subtest name",
+ "status": "FAIL",
+ "expected": "PASS",
+ "test": "test1",
+ "extra": {"data": 42}})
+ self.logger.test_end("test1", "OK")
+ self.logger.suite_end()
+
+ def test_status_stack(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.test_status("test1", "subtest name", "FAIL", expected="PASS",
+ stack="many\nlines\nof\nstack")
+ self.assert_log_equals({"action": "test_status",
+ "subtest": "subtest name",
+ "status": "FAIL",
+ "expected": "PASS",
+ "test": "test1",
+ "stack": "many\nlines\nof\nstack"})
+ self.logger.test_end("test1", "OK")
+ self.logger.suite_end()
+
+ def test_status_not_started(self):
+ self.logger.test_status("test_UNKNOWN", "subtest", "PASS")
+ self.assertTrue(self.pop_last_item()["message"].startswith(
+ "test_status for test_UNKNOWN logged while not in progress. Logged with data: {"))
+
+ def test_end(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.test_end("test1", "fail", message="Test message")
+ self.assert_log_equals({"action": "test_end",
+ "status": "FAIL",
+ "expected": "OK",
+ "message": "Test message",
+ "test": "test1"})
+ self.logger.suite_end()
+
+ def test_end_1(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.test_end(
+ "test1", "PASS", expected="PASS", extra={"data": 123})
+ self.assert_log_equals({"action": "test_end",
+ "status": "PASS",
+ "extra": {"data": 123},
+ "test": "test1"})
+ self.logger.suite_end()
+
+ def test_end_2(self):
+ self.assertRaises(ValueError, self.logger.test_end,
+ "test1", "XXXUNKNOWNXXX")
+
+ def test_end_stack(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.test_end("test1", "PASS", expected="PASS",
+ stack="many\nlines\nof\nstack")
+ self.assert_log_equals({"action": "test_end",
+ "status": "PASS",
+ "test": "test1",
+ "stack": "many\nlines\nof\nstack"})
+ self.logger.suite_end()
+
+ def test_end_no_start(self):
+ self.logger.test_end("test1", "PASS", expected="PASS")
+ self.assertTrue(self.pop_last_item()["message"].startswith(
+ "test_end for test1 logged while not in progress. Logged with data: {"))
+ self.logger.suite_end()
+
+ def test_end_twice(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test2")
+ self.logger.test_end("test2", "PASS", expected="PASS")
+ self.assert_log_equals({"action": "test_end",
+ "status": "PASS",
+ "test": "test2"})
+ self.logger.test_end("test2", "PASS", expected="PASS")
+ last_item = self.pop_last_item()
+ self.assertEquals(last_item["action"], "log")
+ self.assertEquals(last_item["level"], "ERROR")
+ self.assertTrue(last_item["message"].startswith(
+ "test_end for test2 logged while not in progress. Logged with data: {"))
+ self.logger.suite_end()
+
+ def test_suite_start_twice(self):
+ self.logger.suite_start([])
+ self.assert_log_equals({"action": "suite_start",
+ "tests": []})
+ self.logger.suite_start([])
+ last_item = self.pop_last_item()
+ self.assertEquals(last_item["action"], "log")
+ self.assertEquals(last_item["level"], "ERROR")
+ self.logger.suite_end()
+
+ def test_suite_end_no_start(self):
+ self.logger.suite_start([])
+ self.assert_log_equals({"action": "suite_start",
+ "tests": []})
+ self.logger.suite_end()
+ self.assert_log_equals({"action": "suite_end"})
+ self.logger.suite_end()
+ last_item = self.pop_last_item()
+ self.assertEquals(last_item["action"], "log")
+ self.assertEquals(last_item["level"], "ERROR")
+
+ def test_multiple_loggers_suite_start(self):
+ logger1 = structuredlog.StructuredLogger("test")
+ self.logger.suite_start([])
+ logger1.suite_start([])
+ last_item = self.pop_last_item()
+ self.assertEquals(last_item["action"], "log")
+ self.assertEquals(last_item["level"], "ERROR")
+
+ def test_multiple_loggers_test_start(self):
+ logger1 = structuredlog.StructuredLogger("test")
+ self.logger.suite_start([])
+ self.logger.test_start("test")
+ logger1.test_start("test")
+ last_item = self.pop_last_item()
+ self.assertEquals(last_item["action"], "log")
+ self.assertEquals(last_item["level"], "ERROR")
+
+ def test_process(self):
+ self.logger.process_output(1234, "test output")
+ self.assert_log_equals({"action": "process_output",
+ "process": "1234",
+ "data": "test output"})
+
+ def test_process_start(self):
+ self.logger.process_start(1234)
+ self.assert_log_equals({"action": "process_start",
+ "process": "1234"})
+
+ def test_process_exit(self):
+ self.logger.process_exit(1234, 0)
+ self.assert_log_equals({"action": "process_exit",
+ "process": "1234",
+ "exitcode": 0})
+
+ def test_log(self):
+ for level in ["critical", "error", "warning", "info", "debug"]:
+ getattr(self.logger, level)("message")
+ self.assert_log_equals({"action": "log",
+ "level": level.upper(),
+ "message": "message"})
+
+ def test_logging_adapter(self):
+ import logging
+ logging.basicConfig(level="DEBUG")
+ old_level = logging.root.getEffectiveLevel()
+ logging.root.setLevel("DEBUG")
+
+ std_logger = logging.getLogger("test")
+ std_logger.setLevel("DEBUG")
+
+ logger = stdadapter.std_logging_adapter(std_logger)
+
+ try:
+ for level in ["critical", "error", "warning", "info", "debug"]:
+ getattr(logger, level)("message")
+ self.assert_log_equals({"action": "log",
+ "level": level.upper(),
+ "message": "message"})
+ finally:
+ logging.root.setLevel(old_level)
+
+ def test_add_remove_handlers(self):
+ handler = TestHandler()
+ self.logger.add_handler(handler)
+ self.logger.info("test1")
+
+ self.assert_log_equals({"action": "log",
+ "level": "INFO",
+ "message": "test1"})
+
+ self.assert_log_equals({"action": "log",
+ "level": "INFO",
+ "message": "test1"}, actual=handler.last_item)
+
+ self.logger.remove_handler(handler)
+ self.logger.info("test2")
+
+ self.assert_log_equals({"action": "log",
+ "level": "INFO",
+ "message": "test2"})
+
+ self.assert_log_equals({"action": "log",
+ "level": "INFO",
+ "message": "test1"}, actual=handler.last_item)
+
+ def test_wrapper(self):
+ file_like = structuredlog.StructuredLogFileLike(self.logger)
+
+ file_like.write("line 1")
+
+ self.assert_log_equals({"action": "log",
+ "level": "INFO",
+ "message": "line 1"})
+
+ file_like.write("line 2\n")
+
+ self.assert_log_equals({"action": "log",
+ "level": "INFO",
+ "message": "line 2"})
+
+ file_like.write("line 3\r")
+
+ self.assert_log_equals({"action": "log",
+ "level": "INFO",
+ "message": "line 3"})
+
+ file_like.write("line 4\r\n")
+
+ self.assert_log_equals({"action": "log",
+ "level": "INFO",
+ "message": "line 4"})
+
+
+class TestTypeConversions(BaseStructuredTest):
+
+ def test_raw(self):
+ self.logger.log_raw({"action": "suite_start",
+ "tests": [1],
+ "time": "1234"})
+ self.assert_log_equals({"action": "suite_start",
+ "tests": ["1"],
+ "time": 1234})
+ self.logger.suite_end()
+
+ def test_tuple(self):
+ self.logger.suite_start([])
+ self.logger.test_start(("\xf0\x90\x8d\x84\xf0\x90\x8c\xb4\xf0\x90\x8d\x83\xf0\x90\x8d\x84",
+ 42, u"\u16a4"))
+ self.assert_log_equals({"action": "test_start",
+ "test": (u'\U00010344\U00010334\U00010343\U00010344',
+ u"42", u"\u16a4")})
+ self.logger.suite_end()
+
+ def test_non_string_messages(self):
+ self.logger.suite_start([])
+ self.logger.info(1)
+ self.assert_log_equals({"action": "log",
+ "message": "1",
+ "level": "INFO"})
+ self.logger.info([1, (2, '3'), "s", "s" + chr(255)])
+ self.assert_log_equals({"action": "log",
+ "message": "[1, (2, '3'), 's', 's\\xff']",
+ "level": "INFO"})
+ self.logger.suite_end()
+
+ def test_utf8str_write(self):
+ with mozfile.NamedTemporaryFile() as logfile:
+ _fmt = formatters.TbplFormatter()
+ _handler = handlers.StreamHandler(logfile, _fmt)
+ self.logger.add_handler(_handler)
+ self.logger.suite_start([])
+ self.logger.info("☺")
+ logfile.seek(0)
+ data = logfile.readlines()[-1].strip()
+ self.assertEquals(data, "☺")
+ self.logger.suite_end()
+
+ def test_arguments(self):
+ self.logger.info(message="test")
+ self.assert_log_equals({"action": "log",
+ "message": "test",
+ "level": "INFO"})
+
+ self.logger.suite_start([], {})
+ self.assert_log_equals({"action": "suite_start",
+ "tests": [],
+ "run_info": {}})
+ self.logger.test_start(test="test1")
+ self.logger.test_status(
+ "subtest1", "FAIL", test="test1", status="PASS")
+ self.assert_log_equals({"action": "test_status",
+ "test": "test1",
+ "subtest": "subtest1",
+ "status": "PASS",
+ "expected": "FAIL"})
+ self.logger.process_output(123, "data", "test")
+ self.assert_log_equals({"action": "process_output",
+ "process": "123",
+ "command": "test",
+ "data": "data"})
+ self.assertRaises(TypeError, self.logger.test_status, subtest="subtest2",
+ status="FAIL", expected="PASS")
+ self.assertRaises(TypeError, self.logger.test_status, "test1", "subtest1",
+ "PASS", "FAIL", "message", "stack", {}, "unexpected")
+ self.assertRaises(TypeError, self.logger.test_status,
+ "test1", test="test2")
+ self.logger.suite_end()
+
+
+class TestComponentFilter(BaseStructuredTest):
+
+ def test_filter_component(self):
+ component_logger = structuredlog.StructuredLogger(self.logger.name,
+ "test_component")
+ component_logger.component_filter = handlers.LogLevelFilter(
+ lambda x: x, "info")
+
+ self.logger.debug("Test")
+ self.assertFalse(self.handler.empty)
+ self.assert_log_equals({"action": "log",
+ "level": "DEBUG",
+ "message": "Test"})
+ self.assertTrue(self.handler.empty)
+
+ component_logger.info("Test 1")
+ self.assertFalse(self.handler.empty)
+ self.assert_log_equals({"action": "log",
+ "level": "INFO",
+ "message": "Test 1",
+ "component": "test_component"})
+
+ component_logger.debug("Test 2")
+ self.assertTrue(self.handler.empty)
+
+ component_logger.component_filter = None
+
+ component_logger.debug("Test 3")
+ self.assertFalse(self.handler.empty)
+ self.assert_log_equals({"action": "log",
+ "level": "DEBUG",
+ "message": "Test 3",
+ "component": "test_component"})
+
+ def test_filter_default_component(self):
+ component_logger = structuredlog.StructuredLogger(self.logger.name,
+ "test_component")
+
+ self.logger.debug("Test")
+ self.assertFalse(self.handler.empty)
+ self.assert_log_equals({"action": "log",
+ "level": "DEBUG",
+ "message": "Test"})
+
+ self.logger.component_filter = handlers.LogLevelFilter(
+ lambda x: x, "info")
+
+ self.logger.debug("Test 1")
+ self.assertTrue(self.handler.empty)
+
+ component_logger.debug("Test 2")
+ self.assertFalse(self.handler.empty)
+ self.assert_log_equals({"action": "log",
+ "level": "DEBUG",
+ "message": "Test 2",
+ "component": "test_component"})
+
+ self.logger.component_filter = None
+
+ self.logger.debug("Test 3")
+ self.assertFalse(self.handler.empty)
+ self.assert_log_equals({"action": "log",
+ "level": "DEBUG",
+ "message": "Test 3"})
+
+ def test_filter_message_mutuate(self):
+ def filter_mutate(msg):
+ if msg["action"] == "log":
+ msg["message"] = "FILTERED! %s" % msg["message"]
+ return msg
+
+ self.logger.component_filter = filter_mutate
+ self.logger.debug("Test")
+ self.assert_log_equals({"action": "log",
+ "level": "DEBUG",
+ "message": "FILTERED! Test"})
+ self.logger.component_filter = None
+
+
+class FormatterTest(unittest.TestCase):
+
+ def setUp(self):
+ self.position = 0
+ self.logger = structuredlog.StructuredLogger(
+ "test_%s" % type(self).__name__)
+ self.output_file = StringIO.StringIO()
+ self.handler = handlers.StreamHandler(
+ self.output_file, self.get_formatter())
+ self.logger.add_handler(self.handler)
+
+ def set_position(self, pos=None):
+ if pos is None:
+ pos = self.output_file.tell()
+ self.position = pos
+
+ def get_formatter(self):
+ raise NotImplementedError(
+ "FormatterTest subclasses must implement get_formatter")
+
+ @property
+ def loglines(self):
+ self.output_file.seek(self.position)
+ return [line.rstrip() for line in self.output_file.readlines()]
+
+
+class TestHTMLFormatter(FormatterTest):
+
+ def get_formatter(self):
+ return formatters.HTMLFormatter()
+
+ def test_base64_string(self):
+ self.logger.suite_start([])
+ self.logger.test_start("string_test")
+ self.logger.test_end("string_test", "FAIL",
+ extra={"data": "foobar"})
+ self.logger.suite_end()
+ self.assertIn("data:text/html;charset=utf-8;base64,Zm9vYmFy",
+ ''.join(self.loglines))
+
+ def test_base64_unicode(self):
+ self.logger.suite_start([])
+ self.logger.test_start("unicode_test")
+ self.logger.test_end("unicode_test", "FAIL",
+ extra={"data": unichr(0x02A9)})
+ self.logger.suite_end()
+ self.assertIn("data:text/html;charset=utf-8;base64,yqk=",
+ ''.join(self.loglines))
+
+ def test_base64_other(self):
+ self.logger.suite_start([])
+ self.logger.test_start("int_test")
+ self.logger.test_end("int_test", "FAIL",
+ extra={"data": {"foo": "bar"}})
+ self.logger.suite_end()
+ self.assertIn("data:text/html;charset=utf-8;base64,eydmb28nOiAnYmFyJ30=",
+ ''.join(self.loglines))
+
+
+class TestTBPLFormatter(FormatterTest):
+
+ def get_formatter(self):
+ return formatters.TbplFormatter()
+
+ def test_unexpected_message(self):
+ self.logger.suite_start([])
+ self.logger.test_start("timeout_test")
+ self.logger.test_end("timeout_test",
+ "TIMEOUT",
+ message="timed out")
+ self.assertIn("TEST-UNEXPECTED-TIMEOUT | timeout_test | timed out",
+ self.loglines)
+ self.logger.suite_end()
+
+ def test_default_unexpected_end_message(self):
+ self.logger.suite_start([])
+ self.logger.test_start("timeout_test")
+ self.logger.test_end("timeout_test",
+ "TIMEOUT")
+ self.assertIn("TEST-UNEXPECTED-TIMEOUT | timeout_test | expected OK",
+ self.loglines)
+ self.logger.suite_end()
+
+ def test_default_unexpected_status_message(self):
+ self.logger.suite_start([])
+ self.logger.test_start("timeout_test")
+ self.logger.test_status("timeout_test",
+ "subtest",
+ status="TIMEOUT")
+ self.assertIn("TEST-UNEXPECTED-TIMEOUT | timeout_test | subtest - expected PASS",
+ self.loglines)
+ self.logger.test_end("timeout_test", "OK")
+ self.logger.suite_end()
+
+ def test_single_newline(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.set_position()
+ self.logger.test_status("test1", "subtest",
+ status="PASS",
+ expected="FAIL")
+ self.logger.test_end("test1", "OK")
+ self.logger.suite_end()
+
+ # This sequence should not produce blanklines
+ for line in self.loglines:
+ self.assertNotEqual("", line, "No blank line should be present in: %s" %
+ self.loglines)
+
+ def test_process_exit(self):
+ self.logger.process_exit(1234, 0)
+ self.assertIn('TEST-INFO | 1234: exit 0', self.loglines)
+
+ @unittest.skipUnless(os.name == 'posix', 'posix only')
+ def test_process_exit_with_sig(self):
+ # subprocess return code is negative when process
+ # has been killed by signal on posix.
+ self.logger.process_exit(1234, -signal.SIGTERM)
+ self.assertIn, ('TEST-INFO | 1234: killed by SIGTERM', self.loglines)
+
+
+class TestMachFormatter(FormatterTest):
+
+ def get_formatter(self):
+ return formatters.MachFormatter(disable_colors=True)
+
+ def test_summary(self):
+ self.logger.suite_start([])
+
+ # Some tests that pass
+ self.logger.test_start("test1")
+ self.logger.test_end("test1", status="PASS", expected="PASS")
+
+ self.logger.test_start("test2")
+ self.logger.test_end("test2", status="PASS", expected="TIMEOUT")
+
+ self.logger.test_start("test3")
+ self.logger.test_end("test3", status="FAIL", expected="PASS")
+
+ self.set_position()
+ self.logger.suite_end()
+
+ self.assertIn("Ran 3 tests", self.loglines)
+ self.assertIn("Expected results: 1", self.loglines)
+ self.assertIn(
+ "Unexpected results: 2 (FAIL: 1, PASS: 1)", self.loglines)
+ self.assertNotIn("test1", self.loglines)
+ self.assertIn("PASS expected TIMEOUT test2", self.loglines)
+ self.assertIn("FAIL test3", self.loglines)
+
+ def test_summary_subtests(self):
+ self.logger.suite_start([])
+
+ self.logger.test_start("test1")
+ self.logger.test_status("test1", "subtest1", status="PASS")
+ self.logger.test_status("test1", "subtest2", status="FAIL")
+ self.logger.test_end("test1", status="OK", expected="OK")
+
+ self.logger.test_start("test2")
+ self.logger.test_status("test2", "subtest1",
+ status="TIMEOUT", expected="PASS")
+ self.logger.test_end("test2", status="TIMEOUT", expected="OK")
+
+ self.set_position()
+ self.logger.suite_end()
+
+ self.assertIn("Ran 5 tests (2 parents, 3 subtests)", self.loglines)
+ self.assertIn("Expected results: 2", self.loglines)
+ self.assertIn(
+ "Unexpected results: 3 (FAIL: 1, TIMEOUT: 2)", self.loglines)
+
+ def test_summary_ok(self):
+ self.logger.suite_start([])
+
+ self.logger.test_start("test1")
+ self.logger.test_status("test1", "subtest1", status="PASS")
+ self.logger.test_status("test1", "subtest2", status="PASS")
+ self.logger.test_end("test1", status="OK", expected="OK")
+
+ self.logger.test_start("test2")
+ self.logger.test_status("test2", "subtest1",
+ status="PASS", expected="PASS")
+ self.logger.test_end("test2", status="OK", expected="OK")
+
+ self.set_position()
+ self.logger.suite_end()
+
+ self.assertIn("OK", self.loglines)
+ self.assertIn("Expected results: 5", self.loglines)
+ self.assertIn("Unexpected results: 0", self.loglines)
+
+ def test_process_start(self):
+ self.logger.process_start(1234)
+ self.assertIn("Started process `1234`", self.loglines[0])
+
+ def test_process_start_with_command(self):
+ self.logger.process_start(1234, command='test cmd')
+ self.assertIn("Started process `1234` (test cmd)", self.loglines[0])
+
+ def test_process_exit(self):
+ self.logger.process_exit(1234, 0)
+ self.assertIn('1234: exit 0', self.loglines[0])
+
+ @unittest.skipUnless(os.name == 'posix', 'posix only')
+ def test_process_exit_with_sig(self):
+ # subprocess return code is negative when process
+ # has been killed by signal on posix.
+ self.logger.process_exit(1234, -signal.SIGTERM)
+ self.assertIn('1234: killed by SIGTERM', self.loglines[0])
+
+
+class TestXUnitFormatter(FormatterTest):
+
+ def get_formatter(self):
+ return formatters.XUnitFormatter()
+
+ def log_as_xml(self):
+ return ET.fromstring('\n'.join(self.loglines))
+
+ def test_stacktrace_is_present(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.test_end(
+ "test1", "fail", message="Test message", stack='this\nis\na\nstack')
+ self.logger.suite_end()
+
+ root = self.log_as_xml()
+ self.assertIn('this\nis\na\nstack', root.find('testcase/failure').text)
+
+ def test_failure_message(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.test_end("test1", "fail", message="Test message")
+ self.logger.suite_end()
+
+ root = self.log_as_xml()
+ self.assertEquals('Expected OK, got FAIL', root.find(
+ 'testcase/failure').get('message'))
+
+ def test_suite_attrs(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.test_end("test1", "ok", message="Test message")
+ self.logger.suite_end()
+
+ root = self.log_as_xml()
+ self.assertEqual(root.get('skips'), '0')
+ self.assertEqual(root.get('failures'), '0')
+ self.assertEqual(root.get('errors'), '0')
+ self.assertEqual(root.get('tests'), '1')
+ self.assertEqual(root.get('time'), '0.00')
+
+ def test_time_is_not_rounded(self):
+ # call formatter directly, it is easier here
+ formatter = self.get_formatter()
+ formatter.suite_start(dict(time=55000))
+ formatter.test_start(dict(time=55100))
+ formatter.test_end(
+ dict(time=55558, test='id', message='message', status='PASS'))
+ xml_string = formatter.suite_end(dict(time=55559))
+
+ root = ET.fromstring(xml_string)
+ self.assertEqual(root.get('time'), '0.56')
+ self.assertEqual(root.find('testcase').get('time'), '0.46')
+
+
+class TestCommandline(unittest.TestCase):
+
+ def setUp(self):
+ self.logfile = mozfile.NamedTemporaryFile()
+
+ @property
+ def loglines(self):
+ self.logfile.seek(0)
+ return [line.rstrip() for line in self.logfile.readlines()]
+
+ def test_setup_logging(self):
+ parser = argparse.ArgumentParser()
+ commandline.add_logging_group(parser)
+ args = parser.parse_args(["--log-raw=-"])
+ logger = commandline.setup_logging("test_setup_logging", args, {})
+ self.assertEqual(len(logger.handlers), 1)
+
+ def test_setup_logging_optparse(self):
+ parser = optparse.OptionParser()
+ commandline.add_logging_group(parser)
+ args, _ = parser.parse_args(["--log-raw=-"])
+ logger = commandline.setup_logging("test_optparse", args, {})
+ self.assertEqual(len(logger.handlers), 1)
+ self.assertIsInstance(logger.handlers[0], handlers.StreamHandler)
+
+ def test_limit_formatters(self):
+ parser = argparse.ArgumentParser()
+ commandline.add_logging_group(parser, include_formatters=['raw'])
+ other_formatters = [fmt for fmt in commandline.log_formatters
+ if fmt != 'raw']
+ # check that every formatter except raw is not present
+ for fmt in other_formatters:
+ with self.assertRaises(SystemExit):
+ parser.parse_args(["--log-%s=-" % fmt])
+ with self.assertRaises(SystemExit):
+ parser.parse_args(["--log-%s-level=error" % fmt])
+ # raw is still ok
+ args = parser.parse_args(["--log-raw=-"])
+ logger = commandline.setup_logging("test_setup_logging2", args, {})
+ self.assertEqual(len(logger.handlers), 1)
+
+ def test_setup_logging_optparse_unicode(self):
+ parser = optparse.OptionParser()
+ commandline.add_logging_group(parser)
+ args, _ = parser.parse_args([u"--log-raw=-"])
+ logger = commandline.setup_logging("test_optparse_unicode", args, {})
+ self.assertEqual(len(logger.handlers), 1)
+ self.assertEqual(logger.handlers[0].stream, sys.stdout)
+ self.assertIsInstance(logger.handlers[0], handlers.StreamHandler)
+
+ def test_logging_defaultlevel(self):
+ parser = argparse.ArgumentParser()
+ commandline.add_logging_group(parser)
+
+ args = parser.parse_args(["--log-tbpl=%s" % self.logfile.name])
+ logger = commandline.setup_logging("test_fmtopts", args, {})
+ logger.info("INFO message")
+ logger.debug("DEBUG message")
+ logger.error("ERROR message")
+ # The debug level is not logged by default.
+ self.assertEqual(["INFO message",
+ "ERROR message"],
+ self.loglines)
+
+ def test_logging_errorlevel(self):
+ parser = argparse.ArgumentParser()
+ commandline.add_logging_group(parser)
+ args = parser.parse_args(
+ ["--log-tbpl=%s" % self.logfile.name, "--log-tbpl-level=error"])
+ logger = commandline.setup_logging("test_fmtopts", args, {})
+ logger.info("INFO message")
+ logger.debug("DEBUG message")
+ logger.error("ERROR message")
+
+ # Only the error level and above were requested.
+ self.assertEqual(["ERROR message"],
+ self.loglines)
+
+ def test_logging_debuglevel(self):
+ parser = argparse.ArgumentParser()
+ commandline.add_logging_group(parser)
+ args = parser.parse_args(
+ ["--log-tbpl=%s" % self.logfile.name, "--log-tbpl-level=debug"])
+ logger = commandline.setup_logging("test_fmtopts", args, {})
+ logger.info("INFO message")
+ logger.debug("DEBUG message")
+ logger.error("ERROR message")
+ # Requesting a lower log level than default works as expected.
+ self.assertEqual(["INFO message",
+ "DEBUG message",
+ "ERROR message"],
+ self.loglines)
+
+ def test_unused_options(self):
+ parser = argparse.ArgumentParser()
+ commandline.add_logging_group(parser)
+ args = parser.parse_args(["--log-tbpl-level=error"])
+ self.assertRaises(ValueError, commandline.setup_logging,
+ "test_fmtopts", args, {})
+
+
+class TestBuffer(BaseStructuredTest):
+
+ def assert_log_equals(self, expected, actual=None):
+ if actual is None:
+ actual = self.pop_last_item()
+
+ all_expected = {"pid": os.getpid(),
+ "thread": "MainThread",
+ "source": "testBuffer"}
+ specials = set(["time"])
+
+ all_expected.update(expected)
+ for key, value in all_expected.iteritems():
+ self.assertEqual(actual[key], value)
+
+ self.assertEquals(set(all_expected.keys()) |
+ specials, set(actual.keys()))
+
+ def setUp(self):
+ self.logger = structuredlog.StructuredLogger("testBuffer")
+ self.handler = handlers.BufferHandler(TestHandler(), message_limit=4)
+ self.logger.add_handler(self.handler)
+
+ def tearDown(self):
+ self.logger.remove_handler(self.handler)
+
+ def pop_last_item(self):
+ return self.handler.inner.items.pop()
+
+ def test_buffer_messages(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.send_message("buffer", "off")
+ self.logger.test_status("test1", "sub1", status="PASS")
+ # Even for buffered actions, the buffer does not interfere if
+ # buffering is turned off.
+ self.assert_log_equals({"action": "test_status",
+ "test": "test1",
+ "status": "PASS",
+ "subtest": "sub1"})
+ self.logger.send_message("buffer", "on")
+ self.logger.test_status("test1", "sub2", status="PASS")
+ self.logger.test_status("test1", "sub3", status="PASS")
+ self.logger.test_status("test1", "sub4", status="PASS")
+ self.logger.test_status("test1", "sub5", status="PASS")
+ self.logger.test_status("test1", "sub6", status="PASS")
+ self.logger.test_status("test1", "sub7", status="PASS")
+ self.logger.test_end("test1", status="OK")
+ self.logger.send_message("buffer", "clear")
+ self.assert_log_equals({"action": "test_end",
+ "test": "test1",
+ "status": "OK"})
+ self.logger.suite_end()
+
+ def test_buffer_size(self):
+ self.logger.suite_start([])
+ self.logger.test_start("test1")
+ self.logger.test_status("test1", "sub1", status="PASS")
+ self.logger.test_status("test1", "sub2", status="PASS")
+ self.logger.test_status("test1", "sub3", status="PASS")
+ self.logger.test_status("test1", "sub4", status="PASS")
+ self.logger.test_status("test1", "sub5", status="PASS")
+ self.logger.test_status("test1", "sub6", status="PASS")
+ self.logger.test_status("test1", "sub7", status="PASS")
+
+ # No test status messages made it to the underlying handler.
+ self.assert_log_equals({"action": "test_start",
+ "test": "test1"})
+
+ # The buffer's actual size never grows beyond the specified limit.
+ self.assertEquals(len(self.handler._buffer), 4)
+
+ self.logger.test_status("test1", "sub8", status="FAIL")
+ # The number of messages deleted comes back in a list.
+ self.assertEquals([4], self.logger.send_message("buffer", "flush"))
+
+ # When the buffer is dumped, the failure is the last thing logged
+ self.assert_log_equals({"action": "test_status",
+ "test": "test1",
+ "subtest": "sub8",
+ "status": "FAIL",
+ "expected": "PASS"})
+ # Three additional messages should have been retained for context
+ self.assert_log_equals({"action": "test_status",
+ "test": "test1",
+ "status": "PASS",
+ "subtest": "sub7"})
+ self.assert_log_equals({"action": "test_status",
+ "test": "test1",
+ "status": "PASS",
+ "subtest": "sub6"})
+ self.assert_log_equals({"action": "test_status",
+ "test": "test1",
+ "status": "PASS",
+ "subtest": "sub5"})
+ self.assert_log_equals({"action": "suite_start",
+ "tests": []})
+
+
+class TestReader(unittest.TestCase):
+
+ def to_file_like(self, obj):
+ data_str = "\n".join(json.dumps(item) for item in obj)
+ return StringIO.StringIO(data_str)
+
+ def test_read(self):
+ data = [{"action": "action_0", "data": "data_0"},
+ {"action": "action_1", "data": "data_1"}]
+
+ f = self.to_file_like(data)
+ self.assertEquals(data, list(reader.read(f)))
+
+ def test_imap_log(self):
+ data = [{"action": "action_0", "data": "data_0"},
+ {"action": "action_1", "data": "data_1"}]
+
+ f = self.to_file_like(data)
+
+ def f_action_0(item):
+ return ("action_0", item["data"])
+
+ def f_action_1(item):
+ return ("action_1", item["data"])
+
+ res_iter = reader.imap_log(reader.read(f),
+ {"action_0": f_action_0,
+ "action_1": f_action_1})
+ self.assertEquals([("action_0", "data_0"), ("action_1", "data_1")],
+ list(res_iter))
+
+ def test_each_log(self):
+ data = [{"action": "action_0", "data": "data_0"},
+ {"action": "action_1", "data": "data_1"}]
+
+ f = self.to_file_like(data)
+
+ count = {"action_0": 0,
+ "action_1": 0}
+
+ def f_action_0(item):
+ count[item["action"]] += 1
+
+ def f_action_1(item):
+ count[item["action"]] += 2
+
+ reader.each_log(reader.read(f),
+ {"action_0": f_action_0,
+ "action_1": f_action_1})
+
+ self.assertEquals({"action_0": 1, "action_1": 2}, count)
+
+ def test_handler(self):
+ data = [{"action": "action_0", "data": "data_0"},
+ {"action": "action_1", "data": "data_1"}]
+
+ f = self.to_file_like(data)
+
+ test = self
+
+ class ReaderTestHandler(reader.LogHandler):
+
+ def __init__(self):
+ self.action_0_count = 0
+ self.action_1_count = 0
+
+ def action_0(self, item):
+ test.assertEquals(item["action"], "action_0")
+ self.action_0_count += 1
+
+ def action_1(self, item):
+ test.assertEquals(item["action"], "action_1")
+ self.action_1_count += 1
+
+ handler = ReaderTestHandler()
+ reader.handle_log(reader.read(f), handler)
+
+ self.assertEquals(handler.action_0_count, 1)
+ self.assertEquals(handler.action_1_count, 1)
+
+if __name__ == "__main__":
+ unittest.main()