""" terminal reporting of the full testing process.

This is a good source for looking at the various reporting hooks.
"""
from _pytest.main import EXIT_OK, EXIT_TESTSFAILED, EXIT_INTERRUPTED, \
    EXIT_USAGEERROR, EXIT_NOTESTSCOLLECTED
import pytest
import py
import sys
import time
import platform

import _pytest._pluggy as pluggy


def pytest_addoption(parser):
    group = parser.getgroup("terminal reporting", "reporting", after="general")
    group._addoption('-v', '--verbose', action="count",
               dest="verbose", default=0, help="increase verbosity."),
    group._addoption('-q', '--quiet', action="count",
               dest="quiet", default=0, help="decrease verbosity."),
    group._addoption('-r',
         action="store", dest="reportchars", default=None, metavar="chars",
         help="show extra test summary info as specified by chars (f)ailed, "
              "(E)error, (s)skipped, (x)failed, (X)passed (w)pytest-warnings "
              "(p)passed, (P)passed with output, (a)all except pP.")
    group._addoption('-l', '--showlocals',
         action="store_true", dest="showlocals", default=False,
         help="show locals in tracebacks (disabled by default).")
    group._addoption('--report',
         action="store", dest="report", default=None, metavar="opts",
         help="(deprecated, use -r)")
    group._addoption('--tb', metavar="style",
               action="store", dest="tbstyle", default='auto',
               choices=['auto', 'long', 'short', 'no', 'line', 'native'],
               help="traceback print mode (auto/long/short/line/native/no).")
    group._addoption('--fulltrace', '--full-trace',
               action="store_true", default=False,
               help="don't cut any tracebacks (default is to cut).")
    group._addoption('--color', metavar="color",
               action="store", dest="color", default='auto',
               choices=['yes', 'no', 'auto'],
               help="color terminal output (yes/no/auto).")

def pytest_configure(config):
    config.option.verbose -= config.option.quiet
    reporter = TerminalReporter(config, sys.stdout)
    config.pluginmanager.register(reporter, 'terminalreporter')
    if config.option.debug or config.option.traceconfig:
        def mywriter(tags, args):
            msg = " ".join(map(str, args))
            reporter.write_line("[traceconfig] " + msg)
        config.trace.root.setprocessor("pytest:config", mywriter)

def getreportopt(config):
    reportopts = ""
    optvalue = config.option.report
    if optvalue:
        py.builtin.print_("DEPRECATED: use -r instead of --report option.",
            file=sys.stderr)
        if optvalue:
            for setting in optvalue.split(","):
                setting = setting.strip()
                if setting == "skipped":
                    reportopts += "s"
                elif setting == "xfailed":
                    reportopts += "x"
    reportchars = config.option.reportchars
    if reportchars:
        for char in reportchars:
            if char not in reportopts and char != 'a':
                reportopts += char
            elif char == 'a':
                reportopts = 'fEsxXw'
    return reportopts

def pytest_report_teststatus(report):
    if report.passed:
        letter = "."
    elif report.skipped:
        letter = "s"
    elif report.failed:
        letter = "F"
        if report.when != "call":
            letter = "f"
    return report.outcome, letter, report.outcome.upper()

class WarningReport:
    def __init__(self, code, message, nodeid=None, fslocation=None):
        self.code = code
        self.message = message
        self.nodeid = nodeid
        self.fslocation = fslocation


class TerminalReporter:
    def __init__(self, config, file=None):
        import _pytest.config
        self.config = config
        self.verbosity = self.config.option.verbose
        self.showheader = self.verbosity >= 0
        self.showfspath = self.verbosity >= 0
        self.showlongtestinfo = self.verbosity > 0
        self._numcollected = 0

        self.stats = {}
        self.startdir = py.path.local()
        if file is None:
            file = sys.stdout
        self._tw = self.writer = _pytest.config.create_terminal_writer(config,
                                                                       file)
        self.currentfspath = None
        self.reportchars = getreportopt(config)
        self.hasmarkup = self._tw.hasmarkup
        self.isatty = file.isatty()

    def hasopt(self, char):
        char = {'xfailed': 'x', 'skipped': 's'}.get(char, char)
        return char in self.reportchars

    def write_fspath_result(self, nodeid, res):
        fspath = self.config.rootdir.join(nodeid.split("::")[0])
        if fspath != self.currentfspath:
            self.currentfspath = fspath
            fspath = self.startdir.bestrelpath(fspath)
            self._tw.line()
            self._tw.write(fspath + " ")
        self._tw.write(res)

    def write_ensure_prefix(self, prefix, extra="", **kwargs):
        if self.currentfspath != prefix:
            self._tw.line()
            self.currentfspath = prefix
            self._tw.write(prefix)
        if extra:
            self._tw.write(extra, **kwargs)
            self.currentfspath = -2

    def ensure_newline(self):
        if self.currentfspath:
            self._tw.line()
            self.currentfspath = None

    def write(self, content, **markup):
        self._tw.write(content, **markup)

    def write_line(self, line, **markup):
        if not py.builtin._istext(line):
            line = py.builtin.text(line, errors="replace")
        self.ensure_newline()
        self._tw.line(line, **markup)

    def rewrite(self, line, **markup):
        line = str(line)
        self._tw.write("\r" + line, **markup)

    def write_sep(self, sep, title=None, **markup):
        self.ensure_newline()
        self._tw.sep(sep, title, **markup)

    def section(self, title, sep="=", **kw):
        self._tw.sep(sep, title, **kw)

    def line(self, msg, **kw):
        self._tw.line(msg, **kw)

    def pytest_internalerror(self, excrepr):
        for line in py.builtin.text(excrepr).split("\n"):
            self.write_line("INTERNALERROR> " + line)
        return 1

    def pytest_logwarning(self, code, fslocation, message, nodeid):
        warnings = self.stats.setdefault("warnings", [])
        if isinstance(fslocation, tuple):
            fslocation = "%s:%d" % fslocation
        warning = WarningReport(code=code, fslocation=fslocation,
                                message=message, nodeid=nodeid)
        warnings.append(warning)

    def pytest_plugin_registered(self, plugin):
        if self.config.option.traceconfig:
            msg = "PLUGIN registered: %s" % (plugin,)
            # XXX this event may happen during setup/teardown time
            #     which unfortunately captures our output here
            #     which garbles our output if we use self.write_line
            self.write_line(msg)

    def pytest_deselected(self, items):
        self.stats.setdefault('deselected', []).extend(items)

    def pytest_runtest_logstart(self, nodeid, location):
        # ensure that the path is printed before the
        # 1st test of a module starts running
        if self.showlongtestinfo:
            line = self._locationline(nodeid, *location)
            self.write_ensure_prefix(line, "")
        elif self.showfspath:
            fsid = nodeid.split("::")[0]
            self.write_fspath_result(fsid, "")

    def pytest_runtest_logreport(self, report):
        rep = report
        res = self.config.hook.pytest_report_teststatus(report=rep)
        cat, letter, word = res
        self.stats.setdefault(cat, []).append(rep)
        self._tests_ran = True
        if not letter and not word:
            # probably passed setup/teardown
            return
        if self.verbosity <= 0:
            if not hasattr(rep, 'node') and self.showfspath:
                self.write_fspath_result(rep.nodeid, letter)
            else:
                self._tw.write(letter)
        else:
            if isinstance(word, tuple):
                word, markup = word
            else:
                if rep.passed:
                    markup = {'green':True}
                elif rep.failed:
                    markup = {'red':True}
                elif rep.skipped:
                    markup = {'yellow':True}
            line = self._locationline(rep.nodeid, *rep.location)
            if not hasattr(rep, 'node'):
                self.write_ensure_prefix(line, word, **markup)
                #self._tw.write(word, **markup)
            else:
                self.ensure_newline()
                if hasattr(rep, 'node'):
                    self._tw.write("[%s] " % rep.node.gateway.id)
                self._tw.write(word, **markup)
                self._tw.write(" " + line)
                self.currentfspath = -2

    def pytest_collection(self):
        if not self.isatty and self.config.option.verbose >= 1:
            self.write("collecting ... ", bold=True)

    def pytest_collectreport(self, report):
        if report.failed:
            self.stats.setdefault("error", []).append(report)
        elif report.skipped:
            self.stats.setdefault("skipped", []).append(report)
        items = [x for x in report.result if isinstance(x, pytest.Item)]
        self._numcollected += len(items)
        if self.isatty:
            #self.write_fspath_result(report.nodeid, 'E')
            self.report_collect()

    def report_collect(self, final=False):
        if self.config.option.verbose < 0:
            return

        errors = len(self.stats.get('error', []))
        skipped = len(self.stats.get('skipped', []))
        if final:
            line = "collected "
        else:
            line = "collecting "
        line += str(self._numcollected) + " items"
        if errors:
            line += " / %d errors" % errors
        if skipped:
            line += " / %d skipped" % skipped
        if self.isatty:
            if final:
                line += " \n"
            self.rewrite(line, bold=True)
        else:
            self.write_line(line)

    def pytest_collection_modifyitems(self):
        self.report_collect(True)

    @pytest.hookimpl(trylast=True)
    def pytest_sessionstart(self, session):
        self._sessionstarttime = time.time()
        if not self.showheader:
            return
        self.write_sep("=", "test session starts", bold=True)
        verinfo = platform.python_version()
        msg = "platform %s -- Python %s" % (sys.platform, verinfo)
        if hasattr(sys, 'pypy_version_info'):
            verinfo = ".".join(map(str, sys.pypy_version_info[:3]))
            msg += "[pypy-%s-%s]" % (verinfo, sys.pypy_version_info[3])
        msg += ", pytest-%s, py-%s, pluggy-%s" % (
               pytest.__version__, py.__version__, pluggy.__version__)
        if self.verbosity > 0 or self.config.option.debug or \
           getattr(self.config.option, 'pastebin', None):
            msg += " -- " + str(sys.executable)
        self.write_line(msg)
        lines = self.config.hook.pytest_report_header(
            config=self.config, startdir=self.startdir)
        lines.reverse()
        for line in flatten(lines):
            self.write_line(line)

    def pytest_report_header(self, config):
        inifile = ""
        if config.inifile:
            inifile = config.rootdir.bestrelpath(config.inifile)
        lines = ["rootdir: %s, inifile: %s" %(config.rootdir, inifile)]

        plugininfo = config.pluginmanager.list_plugin_distinfo()
        if plugininfo:

            lines.append(
                "plugins: %s" % ", ".join(_plugin_nameversions(plugininfo)))
        return lines

    def pytest_collection_finish(self, session):
        if self.config.option.collectonly:
            self._printcollecteditems(session.items)
            if self.stats.get('failed'):
                self._tw.sep("!", "collection failures")
                for rep in self.stats.get('failed'):
                    rep.toterminal(self._tw)
                return 1
            return 0
        if not self.showheader:
            return
        #for i, testarg in enumerate(self.config.args):
        #    self.write_line("test path %d: %s" %(i+1, testarg))

    def _printcollecteditems(self, items):
        # to print out items and their parent collectors
        # we take care to leave out Instances aka ()
        # because later versions are going to get rid of them anyway
        if self.config.option.verbose < 0:
            if self.config.option.verbose < -1:
                counts = {}
                for item in items:
                    name = item.nodeid.split('::', 1)[0]
                    counts[name] = counts.get(name, 0) + 1
                for name, count in sorted(counts.items()):
                    self._tw.line("%s: %d" % (name, count))
            else:
                for item in items:
                    nodeid = item.nodeid
                    nodeid = nodeid.replace("::()::", "::")
                    self._tw.line(nodeid)
            return
        stack = []
        indent = ""
        for item in items:
            needed_collectors = item.listchain()[1:] # strip root node
            while stack:
                if stack == needed_collectors[:len(stack)]:
                    break
                stack.pop()
            for col in needed_collectors[len(stack):]:
                stack.append(col)
                #if col.name == "()":
                #    continue
                indent = (len(stack) - 1) * "  "
                self._tw.line("%s%s" % (indent, col))

    @pytest.hookimpl(hookwrapper=True)
    def pytest_sessionfinish(self, exitstatus):
        outcome = yield
        outcome.get_result()
        self._tw.line("")
        summary_exit_codes = (
            EXIT_OK, EXIT_TESTSFAILED, EXIT_INTERRUPTED, EXIT_USAGEERROR,
            EXIT_NOTESTSCOLLECTED)
        if exitstatus in summary_exit_codes:
            self.config.hook.pytest_terminal_summary(terminalreporter=self)
            self.summary_errors()
            self.summary_failures()
            self.summary_warnings()
            self.summary_passes()
        if exitstatus == EXIT_INTERRUPTED:
            self._report_keyboardinterrupt()
            del self._keyboardinterrupt_memo
        self.summary_deselected()
        self.summary_stats()

    def pytest_keyboard_interrupt(self, excinfo):
        self._keyboardinterrupt_memo = excinfo.getrepr(funcargs=True)

    def pytest_unconfigure(self):
        if hasattr(self, '_keyboardinterrupt_memo'):
            self._report_keyboardinterrupt()

    def _report_keyboardinterrupt(self):
        excrepr = self._keyboardinterrupt_memo
        msg = excrepr.reprcrash.message
        self.write_sep("!", msg)
        if "KeyboardInterrupt" in msg:
            if self.config.option.fulltrace:
                excrepr.toterminal(self._tw)
            else:
                self._tw.line("to show a full traceback on KeyboardInterrupt use --fulltrace", yellow=True)
                excrepr.reprcrash.toterminal(self._tw)

    def _locationline(self, nodeid, fspath, lineno, domain):
        def mkrel(nodeid):
            line = self.config.cwd_relative_nodeid(nodeid)
            if domain and line.endswith(domain):
                line = line[:-len(domain)]
                l = domain.split("[")
                l[0] = l[0].replace('.', '::')  # don't replace '.' in params
                line += "[".join(l)
            return line
        # collect_fspath comes from testid which has a "/"-normalized path

        if fspath:
            res = mkrel(nodeid).replace("::()", "")  # parens-normalization
            if nodeid.split("::")[0] != fspath.replace("\\", "/"):
                res += " <- " + self.startdir.bestrelpath(fspath)
        else:
            res = "[location]"
        return res + " "

    def _getfailureheadline(self, rep):
        if hasattr(rep, 'location'):
            fspath, lineno, domain = rep.location
            return domain
        else:
            return "test session" # XXX?

    def _getcrashline(self, rep):
        try:
            return str(rep.longrepr.reprcrash)
        except AttributeError:
            try:
                return str(rep.longrepr)[:50]
            except AttributeError:
                return ""

    #
    # summaries for sessionfinish
    #
    def getreports(self, name):
        l = []
        for x in self.stats.get(name, []):
            if not hasattr(x, '_pdbshown'):
                l.append(x)
        return l

    def summary_warnings(self):
        if self.hasopt("w"):
            warnings = self.stats.get("warnings")
            if not warnings:
                return
            self.write_sep("=", "pytest-warning summary")
            for w in warnings:
                self._tw.line("W%s %s %s" % (w.code,
                              w.fslocation, w.message))

    def summary_passes(self):
        if self.config.option.tbstyle != "no":
            if self.hasopt("P"):
                reports = self.getreports('passed')
                if not reports:
                    return
                self.write_sep("=", "PASSES")
                for rep in reports:
                    msg = self._getfailureheadline(rep)
                    self.write_sep("_", msg)
                    self._outrep_summary(rep)

    def summary_failures(self):
        if self.config.option.tbstyle != "no":
            reports = self.getreports('failed')
            if not reports:
                return
            self.write_sep("=", "FAILURES")
            for rep in reports:
                if self.config.option.tbstyle == "line":
                    line = self._getcrashline(rep)
                    self.write_line(line)
                else:
                    msg = self._getfailureheadline(rep)
                    markup = {'red': True, 'bold': True}
                    self.write_sep("_", msg, **markup)
                    self._outrep_summary(rep)

    def summary_errors(self):
        if self.config.option.tbstyle != "no":
            reports = self.getreports('error')
            if not reports:
                return
            self.write_sep("=", "ERRORS")
            for rep in self.stats['error']:
                msg = self._getfailureheadline(rep)
                if not hasattr(rep, 'when'):
                    # collect
                    msg = "ERROR collecting " + msg
                elif rep.when == "setup":
                    msg = "ERROR at setup of " + msg
                elif rep.when == "teardown":
                    msg = "ERROR at teardown of " + msg
                self.write_sep("_", msg)
                self._outrep_summary(rep)

    def _outrep_summary(self, rep):
        rep.toterminal(self._tw)
        for secname, content in rep.sections:
            self._tw.sep("-", secname)
            if content[-1:] == "\n":
                content = content[:-1]
            self._tw.line(content)

    def summary_stats(self):
        session_duration = time.time() - self._sessionstarttime
        (line, color) = build_summary_stats_line(self.stats)
        msg = "%s in %.2f seconds" % (line, session_duration)
        markup = {color: True, 'bold': True}

        if self.verbosity >= 0:
            self.write_sep("=", msg, **markup)
        if self.verbosity == -1:
            self.write_line(msg, **markup)

    def summary_deselected(self):
        if 'deselected' in self.stats:
            l = []
            k = self.config.option.keyword
            if k:
                l.append("-k%s" % k)
            m = self.config.option.markexpr
            if m:
                l.append("-m %r" % m)
            if l:
                self.write_sep("=", "%d tests deselected by %r" % (
                    len(self.stats['deselected']), " ".join(l)), bold=True)

def repr_pythonversion(v=None):
    if v is None:
        v = sys.version_info
    try:
        return "%s.%s.%s-%s-%s" % v
    except (TypeError, ValueError):
        return str(v)

def flatten(l):
    for x in l:
        if isinstance(x, (list, tuple)):
            for y in flatten(x):
                yield y
        else:
            yield x

def build_summary_stats_line(stats):
    keys = ("failed passed skipped deselected "
           "xfailed xpassed warnings error").split()
    key_translation = {'warnings': 'pytest-warnings'}
    unknown_key_seen = False
    for key in stats.keys():
        if key not in keys:
            if key: # setup/teardown reports have an empty key, ignore them
                keys.append(key)
                unknown_key_seen = True
    parts = []
    for key in keys:
        val = stats.get(key, None)
        if val:
            key_name = key_translation.get(key, key)
            parts.append("%d %s" % (len(val), key_name))

    if parts:
        line = ", ".join(parts)
    else:
        line = "no tests ran"

    if 'failed' in stats or 'error' in stats:
        color = 'red'
    elif 'warnings' in stats or unknown_key_seen:
        color = 'yellow'
    elif 'passed' in stats:
        color = 'green'
    else:
        color = 'yellow'

    return (line, color)


def _plugin_nameversions(plugininfo):
    l = []
    for plugin, dist in plugininfo:
        # gets us name and version!
        name = '{dist.project_name}-{dist.version}'.format(dist=dist)
        # questionable convenience, but it keeps things short
        if name.startswith("pytest-"):
            name = name[7:]
        # we decided to print python package names
        # they can have more than one plugin
        if name not in l:
            l.append(name)
    return l