# -*- coding: utf-8 -*-
import argparse
import json
import optparse
import os
import StringIO
import sys
import unittest
import signal
import xml.etree.ElementTree as ET

import mozfile

from mozlog import (
    commandline,
    reader,
    structuredlog,
    stdadapter,
    handlers,
    formatters,
)


class TestHandler(object):

    def __init__(self):
        self.items = []

    def __call__(self, data):
        self.items.append(data)

    @property
    def last_item(self):
        return self.items[-1]

    @property
    def empty(self):
        return not self.items


class BaseStructuredTest(unittest.TestCase):

    def setUp(self):
        self.logger = structuredlog.StructuredLogger("test")
        self.handler = TestHandler()
        self.logger.add_handler(self.handler)

    def pop_last_item(self):
        return self.handler.items.pop()

    def assert_log_equals(self, expected, actual=None):
        if actual is None:
            actual = self.pop_last_item()

        all_expected = {"pid": os.getpid(),
                        "thread": "MainThread",
                        "source": "test"}
        specials = set(["time"])

        all_expected.update(expected)
        for key, value in all_expected.iteritems():
            self.assertEqual(actual[key], value)

        self.assertEquals(set(all_expected.keys()) |
                          specials, set(actual.keys()))


class TestStatusHandler(BaseStructuredTest):

    def setUp(self):
        super(TestStatusHandler, self).setUp()
        self.handler = handlers.StatusHandler()
        self.logger.add_handler(self.handler)

    def test_failure_run(self):
        self.logger.suite_start([])
        self.logger.test_start("test1")
        self.logger.test_status("test1", "sub1", status='PASS')
        self.logger.test_status("test1", "sub2", status='TIMEOUT')
        self.logger.test_end("test1", status='OK')
        self.logger.suite_end()
        summary = self.handler.summarize()
        self.assertIn('TIMEOUT', summary.unexpected_statuses)
        self.assertEqual(1, summary.unexpected_statuses['TIMEOUT'])
        self.assertIn('PASS', summary.expected_statuses)
        self.assertEqual(1, summary.expected_statuses['PASS'])
        self.assertIn('OK', summary.expected_statuses)
        self.assertEqual(1, summary.expected_statuses['OK'])
        self.assertEqual(2, summary.action_counts['test_status'])
        self.assertEqual(1, summary.action_counts['test_end'])

    def test_error_run(self):
        self.logger.suite_start([])
        self.logger.test_start("test1")
        self.logger.error("ERRR!")
        self.logger.test_end("test1", status='OK')
        self.logger.test_start("test2")
        self.logger.test_end("test2", status='OK')
        self.logger.suite_end()
        summary = self.handler.summarize()
        self.assertIn('ERROR', summary.log_level_counts)
        self.assertEqual(1, summary.log_level_counts['ERROR'])
        self.assertIn('OK', summary.expected_statuses)
        self.assertEqual(2, summary.expected_statuses['OK'])


class TestStructuredLog(BaseStructuredTest):

    def test_suite_start(self):
        self.logger.suite_start(["test"])
        self.assert_log_equals({"action": "suite_start",
                                "tests": ["test"]})
        self.logger.suite_end()

    def test_suite_end(self):
        self.logger.suite_start([])
        self.logger.suite_end()
        self.assert_log_equals({"action": "suite_end"})

    def test_start(self):
        self.logger.suite_start([])
        self.logger.test_start("test1")
        self.assert_log_equals({"action": "test_start",
                                "test": "test1"})

        self.logger.test_start(
            ("test1", "==", "test1-ref"), path="path/to/test")
        self.assert_log_equals({"action": "test_start",
                                "test": ("test1", "==", "test1-ref"),
                                "path": "path/to/test"})
        self.logger.suite_end()

    def test_start_inprogress(self):
        self.logger.suite_start([])
        self.logger.test_start("test1")
        self.logger.test_start("test1")
        self.assert_log_equals({"action": "log",
                                "message": "test_start for test1 logged while in progress.",
                                "level": "ERROR"})
        self.logger.suite_end()

    def test_status(self):
        self.logger.suite_start([])
        self.logger.test_start("test1")
        self.logger.test_status("test1", "subtest name", "fail", expected="FAIL",
                                message="Test message")
        self.assert_log_equals({"action": "test_status",
                                "subtest": "subtest name",
                                "status": "FAIL",
                                "message": "Test message",
                                "test": "test1"})
        self.logger.test_end("test1", "OK")
        self.logger.suite_end()

    def test_status_1(self):
        self.logger.suite_start([])
        self.logger.test_start("test1")
        self.logger.test_status("test1", "subtest name", "fail")
        self.assert_log_equals({"action": "test_status",
                                "subtest": "subtest name",
                                "status": "FAIL",
                                "expected": "PASS",
                                "test": "test1"})
        self.logger.test_end("test1", "OK")
        self.logger.suite_end()

    def test_status_2(self):
        self.assertRaises(ValueError, self.logger.test_status, "test1", "subtest name",
                          "XXXUNKNOWNXXX")

    def test_status_extra(self):
        self.logger.suite_start([])
        self.logger.test_start("test1")
        self.logger.test_status("test1", "subtest name", "FAIL", expected="PASS",
                                extra={"data": 42})
        self.assert_log_equals({"action": "test_status",
                                "subtest": "subtest name",
                                "status": "FAIL",
                                "expected": "PASS",
                                "test": "test1",
                                "extra": {"data": 42}})
        self.logger.test_end("test1", "OK")
        self.logger.suite_end()

    def test_status_stack(self):
        self.logger.suite_start([])
        self.logger.test_start("test1")
        self.logger.test_status("test1", "subtest name", "FAIL", expected="PASS",
                                stack="many\nlines\nof\nstack")
        self.assert_log_equals({"action": "test_status",
                                "subtest": "subtest name",
                                "status": "FAIL",
                                "expected": "PASS",
                                "test": "test1",
                                "stack": "many\nlines\nof\nstack"})
        self.logger.test_end("test1", "OK")
        self.logger.suite_end()

    def test_status_not_started(self):
        self.logger.test_status("test_UNKNOWN", "subtest", "PASS")
        self.assertTrue(self.pop_last_item()["message"].startswith(
            "test_status for test_UNKNOWN logged while not in progress. Logged with data: {"))

    def test_end(self):
        self.logger.suite_start([])
        self.logger.test_start("test1")
        self.logger.test_end("test1", "fail", message="Test message")
        self.assert_log_equals({"action": "test_end",
                                "status": "FAIL",
                                "expected": "OK",
                                "message": "Test message",
                                "test": "test1"})
        self.logger.suite_end()

    def test_end_1(self):
        self.logger.suite_start([])
        self.logger.test_start("test1")
        self.logger.test_end(
            "test1", "PASS", expected="PASS", extra={"data": 123})
        self.assert_log_equals({"action": "test_end",
                                "status": "PASS",
                                "extra": {"data": 123},
                                "test": "test1"})
        self.logger.suite_end()

    def test_end_2(self):
        self.assertRaises(ValueError, self.logger.test_end,
                          "test1", "XXXUNKNOWNXXX")

    def test_end_stack(self):
        self.logger.suite_start([])
        self.logger.test_start("test1")
        self.logger.test_end("test1", "PASS", expected="PASS",
                             stack="many\nlines\nof\nstack")
        self.assert_log_equals({"action": "test_end",
                                "status": "PASS",
                                "test": "test1",
                                "stack": "many\nlines\nof\nstack"})
        self.logger.suite_end()

    def test_end_no_start(self):
        self.logger.test_end("test1", "PASS", expected="PASS")
        self.assertTrue(self.pop_last_item()["message"].startswith(
            "test_end for test1 logged while not in progress. Logged with data: {"))
        self.logger.suite_end()

    def test_end_twice(self):
        self.logger.suite_start([])
        self.logger.test_start("test2")
        self.logger.test_end("test2", "PASS", expected="PASS")
        self.assert_log_equals({"action": "test_end",
                                "status": "PASS",
                                "test": "test2"})
        self.logger.test_end("test2", "PASS", expected="PASS")
        last_item = self.pop_last_item()
        self.assertEquals(last_item["action"], "log")
        self.assertEquals(last_item["level"], "ERROR")
        self.assertTrue(last_item["message"].startswith(
            "test_end for test2 logged while not in progress. Logged with data: {"))
        self.logger.suite_end()

    def test_suite_start_twice(self):
        self.logger.suite_start([])
        self.assert_log_equals({"action": "suite_start",
                                "tests": []})
        self.logger.suite_start([])
        last_item = self.pop_last_item()
        self.assertEquals(last_item["action"], "log")
        self.assertEquals(last_item["level"], "ERROR")
        self.logger.suite_end()

    def test_suite_end_no_start(self):
        self.logger.suite_start([])
        self.assert_log_equals({"action": "suite_start",
                                "tests": []})
        self.logger.suite_end()
        self.assert_log_equals({"action": "suite_end"})
        self.logger.suite_end()
        last_item = self.pop_last_item()
        self.assertEquals(last_item["action"], "log")
        self.assertEquals(last_item["level"], "ERROR")

    def test_multiple_loggers_suite_start(self):
        logger1 = structuredlog.StructuredLogger("test")
        self.logger.suite_start([])
        logger1.suite_start([])
        last_item = self.pop_last_item()
        self.assertEquals(last_item["action"], "log")
        self.assertEquals(last_item["level"], "ERROR")

    def test_multiple_loggers_test_start(self):
        logger1 = structuredlog.StructuredLogger("test")
        self.logger.suite_start([])
        self.logger.test_start("test")
        logger1.test_start("test")
        last_item = self.pop_last_item()
        self.assertEquals(last_item["action"], "log")
        self.assertEquals(last_item["level"], "ERROR")

    def test_process(self):
        self.logger.process_output(1234, "test output")
        self.assert_log_equals({"action": "process_output",
                                "process": "1234",
                                "data": "test output"})

    def test_process_start(self):
        self.logger.process_start(1234)
        self.assert_log_equals({"action": "process_start",
                                "process": "1234"})

    def test_process_exit(self):
        self.logger.process_exit(1234, 0)
        self.assert_log_equals({"action": "process_exit",
                                "process": "1234",
                                "exitcode": 0})

    def test_log(self):
        for level in ["critical", "error", "warning", "info", "debug"]:
            getattr(self.logger, level)("message")
            self.assert_log_equals({"action": "log",
                                    "level": level.upper(),
                                    "message": "message"})

    def test_logging_adapter(self):
        import logging
        logging.basicConfig(level="DEBUG")
        old_level = logging.root.getEffectiveLevel()
        logging.root.setLevel("DEBUG")

        std_logger = logging.getLogger("test")
        std_logger.setLevel("DEBUG")

        logger = stdadapter.std_logging_adapter(std_logger)

        try:
            for level in ["critical", "error", "warning", "info", "debug"]:
                getattr(logger, level)("message")
                self.assert_log_equals({"action": "log",
                                        "level": level.upper(),
                                        "message": "message"})
        finally:
            logging.root.setLevel(old_level)

    def test_add_remove_handlers(self):
        handler = TestHandler()
        self.logger.add_handler(handler)
        self.logger.info("test1")

        self.assert_log_equals({"action": "log",
                                "level": "INFO",
                                "message": "test1"})

        self.assert_log_equals({"action": "log",
                                "level": "INFO",
                                "message": "test1"}, actual=handler.last_item)

        self.logger.remove_handler(handler)
        self.logger.info("test2")

        self.assert_log_equals({"action": "log",
                                "level": "INFO",
                                "message": "test2"})

        self.assert_log_equals({"action": "log",
                                "level": "INFO",
                                "message": "test1"}, actual=handler.last_item)

    def test_wrapper(self):
        file_like = structuredlog.StructuredLogFileLike(self.logger)

        file_like.write("line 1")

        self.assert_log_equals({"action": "log",
                                "level": "INFO",
                                "message": "line 1"})

        file_like.write("line 2\n")

        self.assert_log_equals({"action": "log",
                                "level": "INFO",
                                "message": "line 2"})

        file_like.write("line 3\r")

        self.assert_log_equals({"action": "log",
                                "level": "INFO",
                                "message": "line 3"})

        file_like.write("line 4\r\n")

        self.assert_log_equals({"action": "log",
                                "level": "INFO",
                                "message": "line 4"})


class TestTypeConversions(BaseStructuredTest):

    def test_raw(self):
        self.logger.log_raw({"action": "suite_start",
                             "tests": [1],
                             "time": "1234"})
        self.assert_log_equals({"action": "suite_start",
                                "tests": ["1"],
                                "time": 1234})
        self.logger.suite_end()

    def test_tuple(self):
        self.logger.suite_start([])
        self.logger.test_start(("\xf0\x90\x8d\x84\xf0\x90\x8c\xb4\xf0\x90\x8d\x83\xf0\x90\x8d\x84",
                                42, u"\u16a4"))
        self.assert_log_equals({"action": "test_start",
                                "test": (u'\U00010344\U00010334\U00010343\U00010344',
                                         u"42", u"\u16a4")})
        self.logger.suite_end()

    def test_non_string_messages(self):
        self.logger.suite_start([])
        self.logger.info(1)
        self.assert_log_equals({"action": "log",
                                "message": "1",
                                "level": "INFO"})
        self.logger.info([1, (2, '3'), "s", "s" + chr(255)])
        self.assert_log_equals({"action": "log",
                                "message": "[1, (2, '3'), 's', 's\\xff']",
                                "level": "INFO"})
        self.logger.suite_end()

    def test_utf8str_write(self):
        with mozfile.NamedTemporaryFile() as logfile:
            _fmt = formatters.TbplFormatter()
            _handler = handlers.StreamHandler(logfile, _fmt)
            self.logger.add_handler(_handler)
            self.logger.suite_start([])
            self.logger.info("☺")
            logfile.seek(0)
            data = logfile.readlines()[-1].strip()
            self.assertEquals(data, "☺")
            self.logger.suite_end()

    def test_arguments(self):
        self.logger.info(message="test")
        self.assert_log_equals({"action": "log",
                                "message": "test",
                                "level": "INFO"})

        self.logger.suite_start([], {})
        self.assert_log_equals({"action": "suite_start",
                                "tests": [],
                                "run_info": {}})
        self.logger.test_start(test="test1")
        self.logger.test_status(
            "subtest1", "FAIL", test="test1", status="PASS")
        self.assert_log_equals({"action": "test_status",
                                "test": "test1",
                                "subtest": "subtest1",
                                "status": "PASS",
                                "expected": "FAIL"})
        self.logger.process_output(123, "data", "test")
        self.assert_log_equals({"action": "process_output",
                                "process": "123",
                                "command": "test",
                                "data": "data"})
        self.assertRaises(TypeError, self.logger.test_status, subtest="subtest2",
                          status="FAIL", expected="PASS")
        self.assertRaises(TypeError, self.logger.test_status, "test1", "subtest1",
                          "PASS", "FAIL", "message", "stack", {}, "unexpected")
        self.assertRaises(TypeError, self.logger.test_status,
                          "test1", test="test2")
        self.logger.suite_end()


class TestComponentFilter(BaseStructuredTest):

    def test_filter_component(self):
        component_logger = structuredlog.StructuredLogger(self.logger.name,
                                                          "test_component")
        component_logger.component_filter = handlers.LogLevelFilter(
            lambda x: x, "info")

        self.logger.debug("Test")
        self.assertFalse(self.handler.empty)
        self.assert_log_equals({"action": "log",
                                "level": "DEBUG",
                                "message": "Test"})
        self.assertTrue(self.handler.empty)

        component_logger.info("Test 1")
        self.assertFalse(self.handler.empty)
        self.assert_log_equals({"action": "log",
                                "level": "INFO",
                                "message": "Test 1",
                                "component": "test_component"})

        component_logger.debug("Test 2")
        self.assertTrue(self.handler.empty)

        component_logger.component_filter = None

        component_logger.debug("Test 3")
        self.assertFalse(self.handler.empty)
        self.assert_log_equals({"action": "log",
                                "level": "DEBUG",
                                "message": "Test 3",
                                "component": "test_component"})

    def test_filter_default_component(self):
        component_logger = structuredlog.StructuredLogger(self.logger.name,
                                                          "test_component")

        self.logger.debug("Test")
        self.assertFalse(self.handler.empty)
        self.assert_log_equals({"action": "log",
                                "level": "DEBUG",
                                "message": "Test"})

        self.logger.component_filter = handlers.LogLevelFilter(
            lambda x: x, "info")

        self.logger.debug("Test 1")
        self.assertTrue(self.handler.empty)

        component_logger.debug("Test 2")
        self.assertFalse(self.handler.empty)
        self.assert_log_equals({"action": "log",
                                "level": "DEBUG",
                                "message": "Test 2",
                                "component": "test_component"})

        self.logger.component_filter = None

        self.logger.debug("Test 3")
        self.assertFalse(self.handler.empty)
        self.assert_log_equals({"action": "log",
                                "level": "DEBUG",
                                "message": "Test 3"})

    def test_filter_message_mutuate(self):
        def filter_mutate(msg):
            if msg["action"] == "log":
                msg["message"] = "FILTERED! %s" % msg["message"]
            return msg

        self.logger.component_filter = filter_mutate
        self.logger.debug("Test")
        self.assert_log_equals({"action": "log",
                                "level": "DEBUG",
                                "message": "FILTERED! Test"})
        self.logger.component_filter = None


class FormatterTest(unittest.TestCase):

    def setUp(self):
        self.position = 0
        self.logger = structuredlog.StructuredLogger(
            "test_%s" % type(self).__name__)
        self.output_file = StringIO.StringIO()
        self.handler = handlers.StreamHandler(
            self.output_file, self.get_formatter())
        self.logger.add_handler(self.handler)

    def set_position(self, pos=None):
        if pos is None:
            pos = self.output_file.tell()
        self.position = pos

    def get_formatter(self):
        raise NotImplementedError(
            "FormatterTest subclasses must implement get_formatter")

    @property
    def loglines(self):
        self.output_file.seek(self.position)
        return [line.rstrip() for line in self.output_file.readlines()]


class TestHTMLFormatter(FormatterTest):

    def get_formatter(self):
        return formatters.HTMLFormatter()

    def test_base64_string(self):
        self.logger.suite_start([])
        self.logger.test_start("string_test")
        self.logger.test_end("string_test", "FAIL",
                             extra={"data": "foobar"})
        self.logger.suite_end()
        self.assertIn("data:text/html;charset=utf-8;base64,Zm9vYmFy",
                      ''.join(self.loglines))

    def test_base64_unicode(self):
        self.logger.suite_start([])
        self.logger.test_start("unicode_test")
        self.logger.test_end("unicode_test", "FAIL",
                             extra={"data": unichr(0x02A9)})
        self.logger.suite_end()
        self.assertIn("data:text/html;charset=utf-8;base64,yqk=",
                      ''.join(self.loglines))

    def test_base64_other(self):
        self.logger.suite_start([])
        self.logger.test_start("int_test")
        self.logger.test_end("int_test", "FAIL",
                             extra={"data": {"foo": "bar"}})
        self.logger.suite_end()
        self.assertIn("data:text/html;charset=utf-8;base64,eydmb28nOiAnYmFyJ30=",
                      ''.join(self.loglines))


class TestTBPLFormatter(FormatterTest):

    def get_formatter(self):
        return formatters.TbplFormatter()

    def test_unexpected_message(self):
        self.logger.suite_start([])
        self.logger.test_start("timeout_test")
        self.logger.test_end("timeout_test",
                             "TIMEOUT",
                             message="timed out")
        self.assertIn("TEST-UNEXPECTED-TIMEOUT | timeout_test | timed out",
                      self.loglines)
        self.logger.suite_end()

    def test_default_unexpected_end_message(self):
        self.logger.suite_start([])
        self.logger.test_start("timeout_test")
        self.logger.test_end("timeout_test",
                             "TIMEOUT")
        self.assertIn("TEST-UNEXPECTED-TIMEOUT | timeout_test | expected OK",
                      self.loglines)
        self.logger.suite_end()

    def test_default_unexpected_status_message(self):
        self.logger.suite_start([])
        self.logger.test_start("timeout_test")
        self.logger.test_status("timeout_test",
                                "subtest",
                                status="TIMEOUT")
        self.assertIn("TEST-UNEXPECTED-TIMEOUT | timeout_test | subtest - expected PASS",
                      self.loglines)
        self.logger.test_end("timeout_test", "OK")
        self.logger.suite_end()

    def test_single_newline(self):
        self.logger.suite_start([])
        self.logger.test_start("test1")
        self.set_position()
        self.logger.test_status("test1", "subtest",
                                status="PASS",
                                expected="FAIL")
        self.logger.test_end("test1", "OK")
        self.logger.suite_end()

        # This sequence should not produce blanklines
        for line in self.loglines:
            self.assertNotEqual("", line, "No blank line should be present in: %s" %
                                self.loglines)

    def test_process_exit(self):
        self.logger.process_exit(1234, 0)
        self.assertIn('TEST-INFO | 1234: exit 0', self.loglines)

    @unittest.skipUnless(os.name == 'posix', 'posix only')
    def test_process_exit_with_sig(self):
        # subprocess return code is negative when process
        # has been killed by signal on posix.
        self.logger.process_exit(1234, -signal.SIGTERM)
        self.assertIn, ('TEST-INFO | 1234: killed by SIGTERM', self.loglines)


class TestMachFormatter(FormatterTest):

    def get_formatter(self):
        return formatters.MachFormatter(disable_colors=True)

    def test_summary(self):
        self.logger.suite_start([])

        # Some tests that pass
        self.logger.test_start("test1")
        self.logger.test_end("test1", status="PASS", expected="PASS")

        self.logger.test_start("test2")
        self.logger.test_end("test2", status="PASS", expected="TIMEOUT")

        self.logger.test_start("test3")
        self.logger.test_end("test3", status="FAIL", expected="PASS")

        self.set_position()
        self.logger.suite_end()

        self.assertIn("Ran 3 tests", self.loglines)
        self.assertIn("Expected results: 1", self.loglines)
        self.assertIn(
            "Unexpected results: 2 (FAIL: 1, PASS: 1)", self.loglines)
        self.assertNotIn("test1", self.loglines)
        self.assertIn("PASS expected TIMEOUT test2", self.loglines)
        self.assertIn("FAIL test3", self.loglines)

    def test_summary_subtests(self):
        self.logger.suite_start([])

        self.logger.test_start("test1")
        self.logger.test_status("test1", "subtest1", status="PASS")
        self.logger.test_status("test1", "subtest2", status="FAIL")
        self.logger.test_end("test1", status="OK", expected="OK")

        self.logger.test_start("test2")
        self.logger.test_status("test2", "subtest1",
                                status="TIMEOUT", expected="PASS")
        self.logger.test_end("test2", status="TIMEOUT", expected="OK")

        self.set_position()
        self.logger.suite_end()

        self.assertIn("Ran 5 tests (2 parents, 3 subtests)", self.loglines)
        self.assertIn("Expected results: 2", self.loglines)
        self.assertIn(
            "Unexpected results: 3 (FAIL: 1, TIMEOUT: 2)", self.loglines)

    def test_summary_ok(self):
        self.logger.suite_start([])

        self.logger.test_start("test1")
        self.logger.test_status("test1", "subtest1", status="PASS")
        self.logger.test_status("test1", "subtest2", status="PASS")
        self.logger.test_end("test1", status="OK", expected="OK")

        self.logger.test_start("test2")
        self.logger.test_status("test2", "subtest1",
                                status="PASS", expected="PASS")
        self.logger.test_end("test2", status="OK", expected="OK")

        self.set_position()
        self.logger.suite_end()

        self.assertIn("OK", self.loglines)
        self.assertIn("Expected results: 5", self.loglines)
        self.assertIn("Unexpected results: 0", self.loglines)

    def test_process_start(self):
        self.logger.process_start(1234)
        self.assertIn("Started process `1234`", self.loglines[0])

    def test_process_start_with_command(self):
        self.logger.process_start(1234, command='test cmd')
        self.assertIn("Started process `1234` (test cmd)", self.loglines[0])

    def test_process_exit(self):
        self.logger.process_exit(1234, 0)
        self.assertIn('1234: exit 0', self.loglines[0])

    @unittest.skipUnless(os.name == 'posix', 'posix only')
    def test_process_exit_with_sig(self):
        # subprocess return code is negative when process
        # has been killed by signal on posix.
        self.logger.process_exit(1234, -signal.SIGTERM)
        self.assertIn('1234: killed by SIGTERM', self.loglines[0])


class TestXUnitFormatter(FormatterTest):

    def get_formatter(self):
        return formatters.XUnitFormatter()

    def log_as_xml(self):
        return ET.fromstring('\n'.join(self.loglines))

    def test_stacktrace_is_present(self):
        self.logger.suite_start([])
        self.logger.test_start("test1")
        self.logger.test_end(
            "test1", "fail", message="Test message", stack='this\nis\na\nstack')
        self.logger.suite_end()

        root = self.log_as_xml()
        self.assertIn('this\nis\na\nstack', root.find('testcase/failure').text)

    def test_failure_message(self):
        self.logger.suite_start([])
        self.logger.test_start("test1")
        self.logger.test_end("test1", "fail", message="Test message")
        self.logger.suite_end()

        root = self.log_as_xml()
        self.assertEquals('Expected OK, got FAIL', root.find(
            'testcase/failure').get('message'))

    def test_suite_attrs(self):
        self.logger.suite_start([])
        self.logger.test_start("test1")
        self.logger.test_end("test1", "ok", message="Test message")
        self.logger.suite_end()

        root = self.log_as_xml()
        self.assertEqual(root.get('skips'), '0')
        self.assertEqual(root.get('failures'), '0')
        self.assertEqual(root.get('errors'), '0')
        self.assertEqual(root.get('tests'), '1')
        self.assertEqual(root.get('time'), '0.00')

    def test_time_is_not_rounded(self):
        # call formatter directly, it is easier here
        formatter = self.get_formatter()
        formatter.suite_start(dict(time=55000))
        formatter.test_start(dict(time=55100))
        formatter.test_end(
            dict(time=55558, test='id', message='message', status='PASS'))
        xml_string = formatter.suite_end(dict(time=55559))

        root = ET.fromstring(xml_string)
        self.assertEqual(root.get('time'), '0.56')
        self.assertEqual(root.find('testcase').get('time'), '0.46')


class TestCommandline(unittest.TestCase):

    def setUp(self):
        self.logfile = mozfile.NamedTemporaryFile()

    @property
    def loglines(self):
        self.logfile.seek(0)
        return [line.rstrip() for line in self.logfile.readlines()]

    def test_setup_logging(self):
        parser = argparse.ArgumentParser()
        commandline.add_logging_group(parser)
        args = parser.parse_args(["--log-raw=-"])
        logger = commandline.setup_logging("test_setup_logging", args, {})
        self.assertEqual(len(logger.handlers), 1)

    def test_setup_logging_optparse(self):
        parser = optparse.OptionParser()
        commandline.add_logging_group(parser)
        args, _ = parser.parse_args(["--log-raw=-"])
        logger = commandline.setup_logging("test_optparse", args, {})
        self.assertEqual(len(logger.handlers), 1)
        self.assertIsInstance(logger.handlers[0], handlers.StreamHandler)

    def test_limit_formatters(self):
        parser = argparse.ArgumentParser()
        commandline.add_logging_group(parser, include_formatters=['raw'])
        other_formatters = [fmt for fmt in commandline.log_formatters
                            if fmt != 'raw']
        # check that every formatter except raw is not present
        for fmt in other_formatters:
            with self.assertRaises(SystemExit):
                parser.parse_args(["--log-%s=-" % fmt])
            with self.assertRaises(SystemExit):
                parser.parse_args(["--log-%s-level=error" % fmt])
        # raw is still ok
        args = parser.parse_args(["--log-raw=-"])
        logger = commandline.setup_logging("test_setup_logging2", args, {})
        self.assertEqual(len(logger.handlers), 1)

    def test_setup_logging_optparse_unicode(self):
        parser = optparse.OptionParser()
        commandline.add_logging_group(parser)
        args, _ = parser.parse_args([u"--log-raw=-"])
        logger = commandline.setup_logging("test_optparse_unicode", args, {})
        self.assertEqual(len(logger.handlers), 1)
        self.assertEqual(logger.handlers[0].stream, sys.stdout)
        self.assertIsInstance(logger.handlers[0], handlers.StreamHandler)

    def test_logging_defaultlevel(self):
        parser = argparse.ArgumentParser()
        commandline.add_logging_group(parser)

        args = parser.parse_args(["--log-tbpl=%s" % self.logfile.name])
        logger = commandline.setup_logging("test_fmtopts", args, {})
        logger.info("INFO message")
        logger.debug("DEBUG message")
        logger.error("ERROR message")
        # The debug level is not logged by default.
        self.assertEqual(["INFO message",
                          "ERROR message"],
                         self.loglines)

    def test_logging_errorlevel(self):
        parser = argparse.ArgumentParser()
        commandline.add_logging_group(parser)
        args = parser.parse_args(
            ["--log-tbpl=%s" % self.logfile.name, "--log-tbpl-level=error"])
        logger = commandline.setup_logging("test_fmtopts", args, {})
        logger.info("INFO message")
        logger.debug("DEBUG message")
        logger.error("ERROR message")

        # Only the error level and above were requested.
        self.assertEqual(["ERROR message"],
                         self.loglines)

    def test_logging_debuglevel(self):
        parser = argparse.ArgumentParser()
        commandline.add_logging_group(parser)
        args = parser.parse_args(
            ["--log-tbpl=%s" % self.logfile.name, "--log-tbpl-level=debug"])
        logger = commandline.setup_logging("test_fmtopts", args, {})
        logger.info("INFO message")
        logger.debug("DEBUG message")
        logger.error("ERROR message")
        # Requesting a lower log level than default works as expected.
        self.assertEqual(["INFO message",
                          "DEBUG message",
                          "ERROR message"],
                         self.loglines)

    def test_unused_options(self):
        parser = argparse.ArgumentParser()
        commandline.add_logging_group(parser)
        args = parser.parse_args(["--log-tbpl-level=error"])
        self.assertRaises(ValueError, commandline.setup_logging,
                          "test_fmtopts", args, {})


class TestBuffer(BaseStructuredTest):

    def assert_log_equals(self, expected, actual=None):
        if actual is None:
            actual = self.pop_last_item()

        all_expected = {"pid": os.getpid(),
                        "thread": "MainThread",
                        "source": "testBuffer"}
        specials = set(["time"])

        all_expected.update(expected)
        for key, value in all_expected.iteritems():
            self.assertEqual(actual[key], value)

        self.assertEquals(set(all_expected.keys()) |
                          specials, set(actual.keys()))

    def setUp(self):
        self.logger = structuredlog.StructuredLogger("testBuffer")
        self.handler = handlers.BufferHandler(TestHandler(), message_limit=4)
        self.logger.add_handler(self.handler)

    def tearDown(self):
        self.logger.remove_handler(self.handler)

    def pop_last_item(self):
        return self.handler.inner.items.pop()

    def test_buffer_messages(self):
        self.logger.suite_start([])
        self.logger.test_start("test1")
        self.logger.send_message("buffer", "off")
        self.logger.test_status("test1", "sub1", status="PASS")
        # Even for buffered actions, the buffer does not interfere if
        # buffering is turned off.
        self.assert_log_equals({"action": "test_status",
                                "test": "test1",
                                "status": "PASS",
                                "subtest": "sub1"})
        self.logger.send_message("buffer", "on")
        self.logger.test_status("test1", "sub2", status="PASS")
        self.logger.test_status("test1", "sub3", status="PASS")
        self.logger.test_status("test1", "sub4", status="PASS")
        self.logger.test_status("test1", "sub5", status="PASS")
        self.logger.test_status("test1", "sub6", status="PASS")
        self.logger.test_status("test1", "sub7", status="PASS")
        self.logger.test_end("test1", status="OK")
        self.logger.send_message("buffer", "clear")
        self.assert_log_equals({"action": "test_end",
                                "test": "test1",
                                "status": "OK"})
        self.logger.suite_end()

    def test_buffer_size(self):
        self.logger.suite_start([])
        self.logger.test_start("test1")
        self.logger.test_status("test1", "sub1", status="PASS")
        self.logger.test_status("test1", "sub2", status="PASS")
        self.logger.test_status("test1", "sub3", status="PASS")
        self.logger.test_status("test1", "sub4", status="PASS")
        self.logger.test_status("test1", "sub5", status="PASS")
        self.logger.test_status("test1", "sub6", status="PASS")
        self.logger.test_status("test1", "sub7", status="PASS")

        # No test status messages made it to the underlying handler.
        self.assert_log_equals({"action": "test_start",
                                "test": "test1"})

        # The buffer's actual size never grows beyond the specified limit.
        self.assertEquals(len(self.handler._buffer), 4)

        self.logger.test_status("test1", "sub8", status="FAIL")
        # The number of messages deleted comes back in a list.
        self.assertEquals([4], self.logger.send_message("buffer", "flush"))

        # When the buffer is dumped, the failure is the last thing logged
        self.assert_log_equals({"action": "test_status",
                                "test": "test1",
                                "subtest": "sub8",
                                "status": "FAIL",
                                "expected": "PASS"})
        # Three additional messages should have been retained for context
        self.assert_log_equals({"action": "test_status",
                                "test": "test1",
                                "status": "PASS",
                                "subtest": "sub7"})
        self.assert_log_equals({"action": "test_status",
                                "test": "test1",
                                "status": "PASS",
                                "subtest": "sub6"})
        self.assert_log_equals({"action": "test_status",
                                "test": "test1",
                                "status": "PASS",
                                "subtest": "sub5"})
        self.assert_log_equals({"action": "suite_start",
                                "tests": []})


class TestReader(unittest.TestCase):

    def to_file_like(self, obj):
        data_str = "\n".join(json.dumps(item) for item in obj)
        return StringIO.StringIO(data_str)

    def test_read(self):
        data = [{"action": "action_0", "data": "data_0"},
                {"action": "action_1", "data": "data_1"}]

        f = self.to_file_like(data)
        self.assertEquals(data, list(reader.read(f)))

    def test_imap_log(self):
        data = [{"action": "action_0", "data": "data_0"},
                {"action": "action_1", "data": "data_1"}]

        f = self.to_file_like(data)

        def f_action_0(item):
            return ("action_0", item["data"])

        def f_action_1(item):
            return ("action_1", item["data"])

        res_iter = reader.imap_log(reader.read(f),
                                   {"action_0": f_action_0,
                                    "action_1": f_action_1})
        self.assertEquals([("action_0", "data_0"), ("action_1", "data_1")],
                          list(res_iter))

    def test_each_log(self):
        data = [{"action": "action_0", "data": "data_0"},
                {"action": "action_1", "data": "data_1"}]

        f = self.to_file_like(data)

        count = {"action_0": 0,
                 "action_1": 0}

        def f_action_0(item):
            count[item["action"]] += 1

        def f_action_1(item):
            count[item["action"]] += 2

        reader.each_log(reader.read(f),
                        {"action_0": f_action_0,
                         "action_1": f_action_1})

        self.assertEquals({"action_0": 1, "action_1": 2}, count)

    def test_handler(self):
        data = [{"action": "action_0", "data": "data_0"},
                {"action": "action_1", "data": "data_1"}]

        f = self.to_file_like(data)

        test = self

        class ReaderTestHandler(reader.LogHandler):

            def __init__(self):
                self.action_0_count = 0
                self.action_1_count = 0

            def action_0(self, item):
                test.assertEquals(item["action"], "action_0")
                self.action_0_count += 1

            def action_1(self, item):
                test.assertEquals(item["action"], "action_1")
                self.action_1_count += 1

        handler = ReaderTestHandler()
        reader.handle_log(reader.read(f), handler)

        self.assertEquals(handler.action_0_count, 1)
        self.assertEquals(handler.action_1_count, 1)

if __name__ == "__main__":
    unittest.main()