diff options
Diffstat (limited to 'python/pytest/_pytest/assertion')
-rw-r--r-- | python/pytest/_pytest/assertion/__init__.py | 176 | ||||
-rw-r--r-- | python/pytest/_pytest/assertion/reinterpret.py | 407 | ||||
-rw-r--r-- | python/pytest/_pytest/assertion/rewrite.py | 885 | ||||
-rw-r--r-- | python/pytest/_pytest/assertion/util.py | 332 |
4 files changed, 1800 insertions, 0 deletions
diff --git a/python/pytest/_pytest/assertion/__init__.py b/python/pytest/_pytest/assertion/__init__.py new file mode 100644 index 000000000..6921deb2a --- /dev/null +++ b/python/pytest/_pytest/assertion/__init__.py @@ -0,0 +1,176 @@ +""" +support for presenting detailed information in failing assertions. +""" +import py +import os +import sys +from _pytest.monkeypatch import monkeypatch +from _pytest.assertion import util + + +def pytest_addoption(parser): + group = parser.getgroup("debugconfig") + group.addoption('--assert', + action="store", + dest="assertmode", + choices=("rewrite", "reinterp", "plain",), + default="rewrite", + metavar="MODE", + help="""control assertion debugging tools. 'plain' + performs no assertion debugging. 'reinterp' + reinterprets assert statements after they failed + to provide assertion expression information. + 'rewrite' (the default) rewrites assert + statements in test modules on import to + provide assert expression information. """) + group.addoption('--no-assert', + action="store_true", + default=False, + dest="noassert", + help="DEPRECATED equivalent to --assert=plain") + group.addoption('--nomagic', '--no-magic', + action="store_true", + default=False, + help="DEPRECATED equivalent to --assert=plain") + + +class AssertionState: + """State for the assertion plugin.""" + + def __init__(self, config, mode): + self.mode = mode + self.trace = config.trace.root.get("assertion") + + +def pytest_configure(config): + mode = config.getvalue("assertmode") + if config.getvalue("noassert") or config.getvalue("nomagic"): + mode = "plain" + if mode == "rewrite": + try: + import ast # noqa + except ImportError: + mode = "reinterp" + else: + # Both Jython and CPython 2.6.0 have AST bugs that make the + # assertion rewriting hook malfunction. + if (sys.platform.startswith('java') or + sys.version_info[:3] == (2, 6, 0)): + mode = "reinterp" + if mode != "plain": + _load_modules(mode) + m = monkeypatch() + config._cleanup.append(m.undo) + m.setattr(py.builtin.builtins, 'AssertionError', + reinterpret.AssertionError) # noqa + hook = None + if mode == "rewrite": + hook = rewrite.AssertionRewritingHook() # noqa + sys.meta_path.insert(0, hook) + warn_about_missing_assertion(mode) + config._assertstate = AssertionState(config, mode) + config._assertstate.hook = hook + config._assertstate.trace("configured with mode set to %r" % (mode,)) + def undo(): + hook = config._assertstate.hook + if hook is not None and hook in sys.meta_path: + sys.meta_path.remove(hook) + config.add_cleanup(undo) + + +def pytest_collection(session): + # this hook is only called when test modules are collected + # so for example not in the master process of pytest-xdist + # (which does not collect test modules) + hook = session.config._assertstate.hook + if hook is not None: + hook.set_session(session) + + +def _running_on_ci(): + """Check if we're currently running on a CI system.""" + env_vars = ['CI', 'BUILD_NUMBER'] + return any(var in os.environ for var in env_vars) + + +def pytest_runtest_setup(item): + """Setup the pytest_assertrepr_compare hook + + The newinterpret and rewrite modules will use util._reprcompare if + it exists to use custom reporting via the + pytest_assertrepr_compare hook. This sets up this custom + comparison for the test. + """ + def callbinrepr(op, left, right): + """Call the pytest_assertrepr_compare hook and prepare the result + + This uses the first result from the hook and then ensures the + following: + * Overly verbose explanations are dropped unless -vv was used or + running on a CI. + * Embedded newlines are escaped to help util.format_explanation() + later. + * If the rewrite mode is used embedded %-characters are replaced + to protect later % formatting. + + The result can be formatted by util.format_explanation() for + pretty printing. + """ + hook_result = item.ihook.pytest_assertrepr_compare( + config=item.config, op=op, left=left, right=right) + for new_expl in hook_result: + if new_expl: + if (sum(len(p) for p in new_expl[1:]) > 80*8 and + item.config.option.verbose < 2 and + not _running_on_ci()): + show_max = 10 + truncated_lines = len(new_expl) - show_max + new_expl[show_max:] = [py.builtin._totext( + 'Detailed information truncated (%d more lines)' + ', use "-vv" to show' % truncated_lines)] + new_expl = [line.replace("\n", "\\n") for line in new_expl] + res = py.builtin._totext("\n~").join(new_expl) + if item.config.getvalue("assertmode") == "rewrite": + res = res.replace("%", "%%") + return res + util._reprcompare = callbinrepr + + +def pytest_runtest_teardown(item): + util._reprcompare = None + + +def pytest_sessionfinish(session): + hook = session.config._assertstate.hook + if hook is not None: + hook.session = None + + +def _load_modules(mode): + """Lazily import assertion related code.""" + global rewrite, reinterpret + from _pytest.assertion import reinterpret # noqa + if mode == "rewrite": + from _pytest.assertion import rewrite # noqa + + +def warn_about_missing_assertion(mode): + try: + assert False + except AssertionError: + pass + else: + if mode == "rewrite": + specifically = ("assertions which are not in test modules " + "will be ignored") + else: + specifically = "failing tests may report as passing" + + sys.stderr.write("WARNING: " + specifically + + " because assert statements are not executed " + "by the underlying Python interpreter " + "(are you using python -O?)\n") + + +# Expose this plugin's implementation for the pytest_assertrepr_compare hook +pytest_assertrepr_compare = util.assertrepr_compare diff --git a/python/pytest/_pytest/assertion/reinterpret.py b/python/pytest/_pytest/assertion/reinterpret.py new file mode 100644 index 000000000..f4262c3ac --- /dev/null +++ b/python/pytest/_pytest/assertion/reinterpret.py @@ -0,0 +1,407 @@ +""" +Find intermediate evalutation results in assert statements through builtin AST. +""" +import ast +import sys + +import _pytest._code +import py +from _pytest.assertion import util +u = py.builtin._totext + + +class AssertionError(util.BuiltinAssertionError): + def __init__(self, *args): + util.BuiltinAssertionError.__init__(self, *args) + if args: + # on Python2.6 we get len(args)==2 for: assert 0, (x,y) + # on Python2.7 and above we always get len(args) == 1 + # with args[0] being the (x,y) tuple. + if len(args) > 1: + toprint = args + else: + toprint = args[0] + try: + self.msg = u(toprint) + except Exception: + self.msg = u( + "<[broken __repr__] %s at %0xd>" + % (toprint.__class__, id(toprint))) + else: + f = _pytest._code.Frame(sys._getframe(1)) + try: + source = f.code.fullsource + if source is not None: + try: + source = source.getstatement(f.lineno, assertion=True) + except IndexError: + source = None + else: + source = str(source.deindent()).strip() + except py.error.ENOENT: + source = None + # this can also occur during reinterpretation, when the + # co_filename is set to "<run>". + if source: + self.msg = reinterpret(source, f, should_fail=True) + else: + self.msg = "<could not determine information>" + if not self.args: + self.args = (self.msg,) + +if sys.version_info > (3, 0): + AssertionError.__module__ = "builtins" + +if sys.platform.startswith("java"): + # See http://bugs.jython.org/issue1497 + _exprs = ("BoolOp", "BinOp", "UnaryOp", "Lambda", "IfExp", "Dict", + "ListComp", "GeneratorExp", "Yield", "Compare", "Call", + "Repr", "Num", "Str", "Attribute", "Subscript", "Name", + "List", "Tuple") + _stmts = ("FunctionDef", "ClassDef", "Return", "Delete", "Assign", + "AugAssign", "Print", "For", "While", "If", "With", "Raise", + "TryExcept", "TryFinally", "Assert", "Import", "ImportFrom", + "Exec", "Global", "Expr", "Pass", "Break", "Continue") + _expr_nodes = set(getattr(ast, name) for name in _exprs) + _stmt_nodes = set(getattr(ast, name) for name in _stmts) + def _is_ast_expr(node): + return node.__class__ in _expr_nodes + def _is_ast_stmt(node): + return node.__class__ in _stmt_nodes +else: + def _is_ast_expr(node): + return isinstance(node, ast.expr) + def _is_ast_stmt(node): + return isinstance(node, ast.stmt) + +try: + _Starred = ast.Starred +except AttributeError: + # Python 2. Define a dummy class so isinstance() will always be False. + class _Starred(object): pass + + +class Failure(Exception): + """Error found while interpreting AST.""" + + def __init__(self, explanation=""): + self.cause = sys.exc_info() + self.explanation = explanation + + +def reinterpret(source, frame, should_fail=False): + mod = ast.parse(source) + visitor = DebugInterpreter(frame) + try: + visitor.visit(mod) + except Failure: + failure = sys.exc_info()[1] + return getfailure(failure) + if should_fail: + return ("(assertion failed, but when it was re-run for " + "printing intermediate values, it did not fail. Suggestions: " + "compute assert expression before the assert or use --assert=plain)") + +def run(offending_line, frame=None): + if frame is None: + frame = _pytest._code.Frame(sys._getframe(1)) + return reinterpret(offending_line, frame) + +def getfailure(e): + explanation = util.format_explanation(e.explanation) + value = e.cause[1] + if str(value): + lines = explanation.split('\n') + lines[0] += " << %s" % (value,) + explanation = '\n'.join(lines) + text = "%s: %s" % (e.cause[0].__name__, explanation) + if text.startswith('AssertionError: assert '): + text = text[16:] + return text + +operator_map = { + ast.BitOr : "|", + ast.BitXor : "^", + ast.BitAnd : "&", + ast.LShift : "<<", + ast.RShift : ">>", + ast.Add : "+", + ast.Sub : "-", + ast.Mult : "*", + ast.Div : "/", + ast.FloorDiv : "//", + ast.Mod : "%", + ast.Eq : "==", + ast.NotEq : "!=", + ast.Lt : "<", + ast.LtE : "<=", + ast.Gt : ">", + ast.GtE : ">=", + ast.Pow : "**", + ast.Is : "is", + ast.IsNot : "is not", + ast.In : "in", + ast.NotIn : "not in" +} + +unary_map = { + ast.Not : "not %s", + ast.Invert : "~%s", + ast.USub : "-%s", + ast.UAdd : "+%s" +} + + +class DebugInterpreter(ast.NodeVisitor): + """Interpret AST nodes to gleam useful debugging information. """ + + def __init__(self, frame): + self.frame = frame + + def generic_visit(self, node): + # Fallback when we don't have a special implementation. + if _is_ast_expr(node): + mod = ast.Expression(node) + co = self._compile(mod) + try: + result = self.frame.eval(co) + except Exception: + raise Failure() + explanation = self.frame.repr(result) + return explanation, result + elif _is_ast_stmt(node): + mod = ast.Module([node]) + co = self._compile(mod, "exec") + try: + self.frame.exec_(co) + except Exception: + raise Failure() + return None, None + else: + raise AssertionError("can't handle %s" %(node,)) + + def _compile(self, source, mode="eval"): + return compile(source, "<assertion interpretation>", mode) + + def visit_Expr(self, expr): + return self.visit(expr.value) + + def visit_Module(self, mod): + for stmt in mod.body: + self.visit(stmt) + + def visit_Name(self, name): + explanation, result = self.generic_visit(name) + # See if the name is local. + source = "%r in locals() is not globals()" % (name.id,) + co = self._compile(source) + try: + local = self.frame.eval(co) + except Exception: + # have to assume it isn't + local = None + if local is None or not self.frame.is_true(local): + return name.id, result + return explanation, result + + def visit_Compare(self, comp): + left = comp.left + left_explanation, left_result = self.visit(left) + for op, next_op in zip(comp.ops, comp.comparators): + next_explanation, next_result = self.visit(next_op) + op_symbol = operator_map[op.__class__] + explanation = "%s %s %s" % (left_explanation, op_symbol, + next_explanation) + source = "__exprinfo_left %s __exprinfo_right" % (op_symbol,) + co = self._compile(source) + try: + result = self.frame.eval(co, __exprinfo_left=left_result, + __exprinfo_right=next_result) + except Exception: + raise Failure(explanation) + try: + if not self.frame.is_true(result): + break + except KeyboardInterrupt: + raise + except: + break + left_explanation, left_result = next_explanation, next_result + + if util._reprcompare is not None: + res = util._reprcompare(op_symbol, left_result, next_result) + if res: + explanation = res + return explanation, result + + def visit_BoolOp(self, boolop): + is_or = isinstance(boolop.op, ast.Or) + explanations = [] + for operand in boolop.values: + explanation, result = self.visit(operand) + explanations.append(explanation) + if result == is_or: + break + name = is_or and " or " or " and " + explanation = "(" + name.join(explanations) + ")" + return explanation, result + + def visit_UnaryOp(self, unary): + pattern = unary_map[unary.op.__class__] + operand_explanation, operand_result = self.visit(unary.operand) + explanation = pattern % (operand_explanation,) + co = self._compile(pattern % ("__exprinfo_expr",)) + try: + result = self.frame.eval(co, __exprinfo_expr=operand_result) + except Exception: + raise Failure(explanation) + return explanation, result + + def visit_BinOp(self, binop): + left_explanation, left_result = self.visit(binop.left) + right_explanation, right_result = self.visit(binop.right) + symbol = operator_map[binop.op.__class__] + explanation = "(%s %s %s)" % (left_explanation, symbol, + right_explanation) + source = "__exprinfo_left %s __exprinfo_right" % (symbol,) + co = self._compile(source) + try: + result = self.frame.eval(co, __exprinfo_left=left_result, + __exprinfo_right=right_result) + except Exception: + raise Failure(explanation) + return explanation, result + + def visit_Call(self, call): + func_explanation, func = self.visit(call.func) + arg_explanations = [] + ns = {"__exprinfo_func" : func} + arguments = [] + for arg in call.args: + arg_explanation, arg_result = self.visit(arg) + if isinstance(arg, _Starred): + arg_name = "__exprinfo_star" + ns[arg_name] = arg_result + arguments.append("*%s" % (arg_name,)) + arg_explanations.append("*%s" % (arg_explanation,)) + else: + arg_name = "__exprinfo_%s" % (len(ns),) + ns[arg_name] = arg_result + arguments.append(arg_name) + arg_explanations.append(arg_explanation) + for keyword in call.keywords: + arg_explanation, arg_result = self.visit(keyword.value) + if keyword.arg: + arg_name = "__exprinfo_%s" % (len(ns),) + keyword_source = "%s=%%s" % (keyword.arg) + arguments.append(keyword_source % (arg_name,)) + arg_explanations.append(keyword_source % (arg_explanation,)) + else: + arg_name = "__exprinfo_kwds" + arguments.append("**%s" % (arg_name,)) + arg_explanations.append("**%s" % (arg_explanation,)) + + ns[arg_name] = arg_result + + if getattr(call, 'starargs', None): + arg_explanation, arg_result = self.visit(call.starargs) + arg_name = "__exprinfo_star" + ns[arg_name] = arg_result + arguments.append("*%s" % (arg_name,)) + arg_explanations.append("*%s" % (arg_explanation,)) + + if getattr(call, 'kwargs', None): + arg_explanation, arg_result = self.visit(call.kwargs) + arg_name = "__exprinfo_kwds" + ns[arg_name] = arg_result + arguments.append("**%s" % (arg_name,)) + arg_explanations.append("**%s" % (arg_explanation,)) + args_explained = ", ".join(arg_explanations) + explanation = "%s(%s)" % (func_explanation, args_explained) + args = ", ".join(arguments) + source = "__exprinfo_func(%s)" % (args,) + co = self._compile(source) + try: + result = self.frame.eval(co, **ns) + except Exception: + raise Failure(explanation) + pattern = "%s\n{%s = %s\n}" + rep = self.frame.repr(result) + explanation = pattern % (rep, rep, explanation) + return explanation, result + + def _is_builtin_name(self, name): + pattern = "%r not in globals() and %r not in locals()" + source = pattern % (name.id, name.id) + co = self._compile(source) + try: + return self.frame.eval(co) + except Exception: + return False + + def visit_Attribute(self, attr): + if not isinstance(attr.ctx, ast.Load): + return self.generic_visit(attr) + source_explanation, source_result = self.visit(attr.value) + explanation = "%s.%s" % (source_explanation, attr.attr) + source = "__exprinfo_expr.%s" % (attr.attr,) + co = self._compile(source) + try: + try: + result = self.frame.eval(co, __exprinfo_expr=source_result) + except AttributeError: + # Maybe the attribute name needs to be mangled? + if not attr.attr.startswith("__") or attr.attr.endswith("__"): + raise + source = "getattr(__exprinfo_expr.__class__, '__name__', '')" + co = self._compile(source) + class_name = self.frame.eval(co, __exprinfo_expr=source_result) + mangled_attr = "_" + class_name + attr.attr + source = "__exprinfo_expr.%s" % (mangled_attr,) + co = self._compile(source) + result = self.frame.eval(co, __exprinfo_expr=source_result) + except Exception: + raise Failure(explanation) + explanation = "%s\n{%s = %s.%s\n}" % (self.frame.repr(result), + self.frame.repr(result), + source_explanation, attr.attr) + # Check if the attr is from an instance. + source = "%r in getattr(__exprinfo_expr, '__dict__', {})" + source = source % (attr.attr,) + co = self._compile(source) + try: + from_instance = self.frame.eval(co, __exprinfo_expr=source_result) + except Exception: + from_instance = None + if from_instance is None or self.frame.is_true(from_instance): + rep = self.frame.repr(result) + pattern = "%s\n{%s = %s\n}" + explanation = pattern % (rep, rep, explanation) + return explanation, result + + def visit_Assert(self, assrt): + test_explanation, test_result = self.visit(assrt.test) + explanation = "assert %s" % (test_explanation,) + if not self.frame.is_true(test_result): + try: + raise util.BuiltinAssertionError + except Exception: + raise Failure(explanation) + return explanation, test_result + + def visit_Assign(self, assign): + value_explanation, value_result = self.visit(assign.value) + explanation = "... = %s" % (value_explanation,) + name = ast.Name("__exprinfo_expr", ast.Load(), + lineno=assign.value.lineno, + col_offset=assign.value.col_offset) + new_assign = ast.Assign(assign.targets, name, lineno=assign.lineno, + col_offset=assign.col_offset) + mod = ast.Module([new_assign]) + co = self._compile(mod, "exec") + try: + self.frame.exec_(co, __exprinfo_expr=value_result) + except Exception: + raise Failure(explanation) + return explanation, value_result + diff --git a/python/pytest/_pytest/assertion/rewrite.py b/python/pytest/_pytest/assertion/rewrite.py new file mode 100644 index 000000000..14b8e49db --- /dev/null +++ b/python/pytest/_pytest/assertion/rewrite.py @@ -0,0 +1,885 @@ +"""Rewrite assertion AST to produce nice error messages""" + +import ast +import errno +import itertools +import imp +import marshal +import os +import re +import struct +import sys +import types + +import py +from _pytest.assertion import util + + +# pytest caches rewritten pycs in __pycache__. +if hasattr(imp, "get_tag"): + PYTEST_TAG = imp.get_tag() + "-PYTEST" +else: + if hasattr(sys, "pypy_version_info"): + impl = "pypy" + elif sys.platform == "java": + impl = "jython" + else: + impl = "cpython" + ver = sys.version_info + PYTEST_TAG = "%s-%s%s-PYTEST" % (impl, ver[0], ver[1]) + del ver, impl + +PYC_EXT = ".py" + (__debug__ and "c" or "o") +PYC_TAIL = "." + PYTEST_TAG + PYC_EXT + +REWRITE_NEWLINES = sys.version_info[:2] != (2, 7) and sys.version_info < (3, 2) +ASCII_IS_DEFAULT_ENCODING = sys.version_info[0] < 3 + +if sys.version_info >= (3,5): + ast_Call = ast.Call +else: + ast_Call = lambda a,b,c: ast.Call(a, b, c, None, None) + + +class AssertionRewritingHook(object): + """PEP302 Import hook which rewrites asserts.""" + + def __init__(self): + self.session = None + self.modules = {} + self._register_with_pkg_resources() + + def set_session(self, session): + self.fnpats = session.config.getini("python_files") + self.session = session + + def find_module(self, name, path=None): + if self.session is None: + return None + sess = self.session + state = sess.config._assertstate + state.trace("find_module called for: %s" % name) + names = name.rsplit(".", 1) + lastname = names[-1] + pth = None + if path is not None: + # Starting with Python 3.3, path is a _NamespacePath(), which + # causes problems if not converted to list. + path = list(path) + if len(path) == 1: + pth = path[0] + if pth is None: + try: + fd, fn, desc = imp.find_module(lastname, path) + except ImportError: + return None + if fd is not None: + fd.close() + tp = desc[2] + if tp == imp.PY_COMPILED: + if hasattr(imp, "source_from_cache"): + fn = imp.source_from_cache(fn) + else: + fn = fn[:-1] + elif tp != imp.PY_SOURCE: + # Don't know what this is. + return None + else: + fn = os.path.join(pth, name.rpartition(".")[2] + ".py") + fn_pypath = py.path.local(fn) + # Is this a test file? + if not sess.isinitpath(fn): + # We have to be very careful here because imports in this code can + # trigger a cycle. + self.session = None + try: + for pat in self.fnpats: + if fn_pypath.fnmatch(pat): + state.trace("matched test file %r" % (fn,)) + break + else: + return None + finally: + self.session = sess + else: + state.trace("matched test file (was specified on cmdline): %r" % + (fn,)) + # The requested module looks like a test file, so rewrite it. This is + # the most magical part of the process: load the source, rewrite the + # asserts, and load the rewritten source. We also cache the rewritten + # module code in a special pyc. We must be aware of the possibility of + # concurrent pytest processes rewriting and loading pycs. To avoid + # tricky race conditions, we maintain the following invariant: The + # cached pyc is always a complete, valid pyc. Operations on it must be + # atomic. POSIX's atomic rename comes in handy. + write = not sys.dont_write_bytecode + cache_dir = os.path.join(fn_pypath.dirname, "__pycache__") + if write: + try: + os.mkdir(cache_dir) + except OSError: + e = sys.exc_info()[1].errno + if e == errno.EEXIST: + # Either the __pycache__ directory already exists (the + # common case) or it's blocked by a non-dir node. In the + # latter case, we'll ignore it in _write_pyc. + pass + elif e in [errno.ENOENT, errno.ENOTDIR]: + # One of the path components was not a directory, likely + # because we're in a zip file. + write = False + elif e in [errno.EACCES, errno.EROFS, errno.EPERM]: + state.trace("read only directory: %r" % fn_pypath.dirname) + write = False + else: + raise + cache_name = fn_pypath.basename[:-3] + PYC_TAIL + pyc = os.path.join(cache_dir, cache_name) + # Notice that even if we're in a read-only directory, I'm going + # to check for a cached pyc. This may not be optimal... + co = _read_pyc(fn_pypath, pyc, state.trace) + if co is None: + state.trace("rewriting %r" % (fn,)) + source_stat, co = _rewrite_test(state, fn_pypath) + if co is None: + # Probably a SyntaxError in the test. + return None + if write: + _make_rewritten_pyc(state, source_stat, pyc, co) + else: + state.trace("found cached rewritten pyc for %r" % (fn,)) + self.modules[name] = co, pyc + return self + + def load_module(self, name): + # If there is an existing module object named 'fullname' in + # sys.modules, the loader must use that existing module. (Otherwise, + # the reload() builtin will not work correctly.) + if name in sys.modules: + return sys.modules[name] + + co, pyc = self.modules.pop(name) + # I wish I could just call imp.load_compiled here, but __file__ has to + # be set properly. In Python 3.2+, this all would be handled correctly + # by load_compiled. + mod = sys.modules[name] = imp.new_module(name) + try: + mod.__file__ = co.co_filename + # Normally, this attribute is 3.2+. + mod.__cached__ = pyc + mod.__loader__ = self + py.builtin.exec_(co, mod.__dict__) + except: + del sys.modules[name] + raise + return sys.modules[name] + + + + def is_package(self, name): + try: + fd, fn, desc = imp.find_module(name) + except ImportError: + return False + if fd is not None: + fd.close() + tp = desc[2] + return tp == imp.PKG_DIRECTORY + + @classmethod + def _register_with_pkg_resources(cls): + """ + Ensure package resources can be loaded from this loader. May be called + multiple times, as the operation is idempotent. + """ + try: + import pkg_resources + # access an attribute in case a deferred importer is present + pkg_resources.__name__ + except ImportError: + return + + # Since pytest tests are always located in the file system, the + # DefaultProvider is appropriate. + pkg_resources.register_loader_type(cls, pkg_resources.DefaultProvider) + + def get_data(self, pathname): + """Optional PEP302 get_data API. + """ + with open(pathname, 'rb') as f: + return f.read() + + +def _write_pyc(state, co, source_stat, pyc): + # Technically, we don't have to have the same pyc format as + # (C)Python, since these "pycs" should never be seen by builtin + # import. However, there's little reason deviate, and I hope + # sometime to be able to use imp.load_compiled to load them. (See + # the comment in load_module above.) + try: + fp = open(pyc, "wb") + except IOError: + err = sys.exc_info()[1].errno + state.trace("error writing pyc file at %s: errno=%s" %(pyc, err)) + # we ignore any failure to write the cache file + # there are many reasons, permission-denied, __pycache__ being a + # file etc. + return False + try: + fp.write(imp.get_magic()) + mtime = int(source_stat.mtime) + size = source_stat.size & 0xFFFFFFFF + fp.write(struct.pack("<ll", mtime, size)) + marshal.dump(co, fp) + finally: + fp.close() + return True + +RN = "\r\n".encode("utf-8") +N = "\n".encode("utf-8") + +cookie_re = re.compile(r"^[ \t\f]*#.*coding[:=][ \t]*[-\w.]+") +BOM_UTF8 = '\xef\xbb\xbf' + +def _rewrite_test(state, fn): + """Try to read and rewrite *fn* and return the code object.""" + try: + stat = fn.stat() + source = fn.read("rb") + except EnvironmentError: + return None, None + if ASCII_IS_DEFAULT_ENCODING: + # ASCII is the default encoding in Python 2. Without a coding + # declaration, Python 2 will complain about any bytes in the file + # outside the ASCII range. Sadly, this behavior does not extend to + # compile() or ast.parse(), which prefer to interpret the bytes as + # latin-1. (At least they properly handle explicit coding cookies.) To + # preserve this error behavior, we could force ast.parse() to use ASCII + # as the encoding by inserting a coding cookie. Unfortunately, that + # messes up line numbers. Thus, we have to check ourselves if anything + # is outside the ASCII range in the case no encoding is explicitly + # declared. For more context, see issue #269. Yay for Python 3 which + # gets this right. + end1 = source.find("\n") + end2 = source.find("\n", end1 + 1) + if (not source.startswith(BOM_UTF8) and + cookie_re.match(source[0:end1]) is None and + cookie_re.match(source[end1 + 1:end2]) is None): + if hasattr(state, "_indecode"): + # encodings imported us again, so don't rewrite. + return None, None + state._indecode = True + try: + try: + source.decode("ascii") + except UnicodeDecodeError: + # Let it fail in real import. + return None, None + finally: + del state._indecode + # On Python versions which are not 2.7 and less than or equal to 3.1, the + # parser expects *nix newlines. + if REWRITE_NEWLINES: + source = source.replace(RN, N) + N + try: + tree = ast.parse(source) + except SyntaxError: + # Let this pop up again in the real import. + state.trace("failed to parse: %r" % (fn,)) + return None, None + rewrite_asserts(tree) + try: + co = compile(tree, fn.strpath, "exec") + except SyntaxError: + # It's possible that this error is from some bug in the + # assertion rewriting, but I don't know of a fast way to tell. + state.trace("failed to compile: %r" % (fn,)) + return None, None + return stat, co + +def _make_rewritten_pyc(state, source_stat, pyc, co): + """Try to dump rewritten code to *pyc*.""" + if sys.platform.startswith("win"): + # Windows grants exclusive access to open files and doesn't have atomic + # rename, so just write into the final file. + _write_pyc(state, co, source_stat, pyc) + else: + # When not on windows, assume rename is atomic. Dump the code object + # into a file specific to this process and atomically replace it. + proc_pyc = pyc + "." + str(os.getpid()) + if _write_pyc(state, co, source_stat, proc_pyc): + os.rename(proc_pyc, pyc) + +def _read_pyc(source, pyc, trace=lambda x: None): + """Possibly read a pytest pyc containing rewritten code. + + Return rewritten code if successful or None if not. + """ + try: + fp = open(pyc, "rb") + except IOError: + return None + with fp: + try: + mtime = int(source.mtime()) + size = source.size() + data = fp.read(12) + except EnvironmentError as e: + trace('_read_pyc(%s): EnvironmentError %s' % (source, e)) + return None + # Check for invalid or out of date pyc file. + if (len(data) != 12 or data[:4] != imp.get_magic() or + struct.unpack("<ll", data[4:]) != (mtime, size)): + trace('_read_pyc(%s): invalid or out of date pyc' % source) + return None + try: + co = marshal.load(fp) + except Exception as e: + trace('_read_pyc(%s): marshal.load error %s' % (source, e)) + return None + if not isinstance(co, types.CodeType): + trace('_read_pyc(%s): not a code object' % source) + return None + return co + + +def rewrite_asserts(mod): + """Rewrite the assert statements in mod.""" + AssertionRewriter().run(mod) + + +def _saferepr(obj): + """Get a safe repr of an object for assertion error messages. + + The assertion formatting (util.format_explanation()) requires + newlines to be escaped since they are a special character for it. + Normally assertion.util.format_explanation() does this but for a + custom repr it is possible to contain one of the special escape + sequences, especially '\n{' and '\n}' are likely to be present in + JSON reprs. + + """ + repr = py.io.saferepr(obj) + if py.builtin._istext(repr): + t = py.builtin.text + else: + t = py.builtin.bytes + return repr.replace(t("\n"), t("\\n")) + + +from _pytest.assertion.util import format_explanation as _format_explanation # noqa + +def _format_assertmsg(obj): + """Format the custom assertion message given. + + For strings this simply replaces newlines with '\n~' so that + util.format_explanation() will preserve them instead of escaping + newlines. For other objects py.io.saferepr() is used first. + + """ + # reprlib appears to have a bug which means that if a string + # contains a newline it gets escaped, however if an object has a + # .__repr__() which contains newlines it does not get escaped. + # However in either case we want to preserve the newline. + if py.builtin._istext(obj) or py.builtin._isbytes(obj): + s = obj + is_repr = False + else: + s = py.io.saferepr(obj) + is_repr = True + if py.builtin._istext(s): + t = py.builtin.text + else: + t = py.builtin.bytes + s = s.replace(t("\n"), t("\n~")).replace(t("%"), t("%%")) + if is_repr: + s = s.replace(t("\\n"), t("\n~")) + return s + +def _should_repr_global_name(obj): + return not hasattr(obj, "__name__") and not py.builtin.callable(obj) + +def _format_boolop(explanations, is_or): + explanation = "(" + (is_or and " or " or " and ").join(explanations) + ")" + if py.builtin._istext(explanation): + t = py.builtin.text + else: + t = py.builtin.bytes + return explanation.replace(t('%'), t('%%')) + +def _call_reprcompare(ops, results, expls, each_obj): + for i, res, expl in zip(range(len(ops)), results, expls): + try: + done = not res + except Exception: + done = True + if done: + break + if util._reprcompare is not None: + custom = util._reprcompare(ops[i], each_obj[i], each_obj[i + 1]) + if custom is not None: + return custom + return expl + + +unary_map = { + ast.Not: "not %s", + ast.Invert: "~%s", + ast.USub: "-%s", + ast.UAdd: "+%s" +} + +binop_map = { + ast.BitOr: "|", + ast.BitXor: "^", + ast.BitAnd: "&", + ast.LShift: "<<", + ast.RShift: ">>", + ast.Add: "+", + ast.Sub: "-", + ast.Mult: "*", + ast.Div: "/", + ast.FloorDiv: "//", + ast.Mod: "%%", # escaped for string formatting + ast.Eq: "==", + ast.NotEq: "!=", + ast.Lt: "<", + ast.LtE: "<=", + ast.Gt: ">", + ast.GtE: ">=", + ast.Pow: "**", + ast.Is: "is", + ast.IsNot: "is not", + ast.In: "in", + ast.NotIn: "not in" +} +# Python 3.5+ compatibility +try: + binop_map[ast.MatMult] = "@" +except AttributeError: + pass + +# Python 3.4+ compatibility +if hasattr(ast, "NameConstant"): + _NameConstant = ast.NameConstant +else: + def _NameConstant(c): + return ast.Name(str(c), ast.Load()) + + +def set_location(node, lineno, col_offset): + """Set node location information recursively.""" + def _fix(node, lineno, col_offset): + if "lineno" in node._attributes: + node.lineno = lineno + if "col_offset" in node._attributes: + node.col_offset = col_offset + for child in ast.iter_child_nodes(node): + _fix(child, lineno, col_offset) + _fix(node, lineno, col_offset) + return node + + +class AssertionRewriter(ast.NodeVisitor): + """Assertion rewriting implementation. + + The main entrypoint is to call .run() with an ast.Module instance, + this will then find all the assert statements and re-write them to + provide intermediate values and a detailed assertion error. See + http://pybites.blogspot.be/2011/07/behind-scenes-of-pytests-new-assertion.html + for an overview of how this works. + + The entry point here is .run() which will iterate over all the + statements in an ast.Module and for each ast.Assert statement it + finds call .visit() with it. Then .visit_Assert() takes over and + is responsible for creating new ast statements to replace the + original assert statement: it re-writes the test of an assertion + to provide intermediate values and replace it with an if statement + which raises an assertion error with a detailed explanation in + case the expression is false. + + For this .visit_Assert() uses the visitor pattern to visit all the + AST nodes of the ast.Assert.test field, each visit call returning + an AST node and the corresponding explanation string. During this + state is kept in several instance attributes: + + :statements: All the AST statements which will replace the assert + statement. + + :variables: This is populated by .variable() with each variable + used by the statements so that they can all be set to None at + the end of the statements. + + :variable_counter: Counter to create new unique variables needed + by statements. Variables are created using .variable() and + have the form of "@py_assert0". + + :on_failure: The AST statements which will be executed if the + assertion test fails. This is the code which will construct + the failure message and raises the AssertionError. + + :explanation_specifiers: A dict filled by .explanation_param() + with %-formatting placeholders and their corresponding + expressions to use in the building of an assertion message. + This is used by .pop_format_context() to build a message. + + :stack: A stack of the explanation_specifiers dicts maintained by + .push_format_context() and .pop_format_context() which allows + to build another %-formatted string while already building one. + + This state is reset on every new assert statement visited and used + by the other visitors. + + """ + + def run(self, mod): + """Find all assert statements in *mod* and rewrite them.""" + if not mod.body: + # Nothing to do. + return + # Insert some special imports at the top of the module but after any + # docstrings and __future__ imports. + aliases = [ast.alias(py.builtin.builtins.__name__, "@py_builtins"), + ast.alias("_pytest.assertion.rewrite", "@pytest_ar")] + expect_docstring = True + pos = 0 + lineno = 0 + for item in mod.body: + if (expect_docstring and isinstance(item, ast.Expr) and + isinstance(item.value, ast.Str)): + doc = item.value.s + if "PYTEST_DONT_REWRITE" in doc: + # The module has disabled assertion rewriting. + return + lineno += len(doc) - 1 + expect_docstring = False + elif (not isinstance(item, ast.ImportFrom) or item.level > 0 or + item.module != "__future__"): + lineno = item.lineno + break + pos += 1 + imports = [ast.Import([alias], lineno=lineno, col_offset=0) + for alias in aliases] + mod.body[pos:pos] = imports + # Collect asserts. + nodes = [mod] + while nodes: + node = nodes.pop() + for name, field in ast.iter_fields(node): + if isinstance(field, list): + new = [] + for i, child in enumerate(field): + if isinstance(child, ast.Assert): + # Transform assert. + new.extend(self.visit(child)) + else: + new.append(child) + if isinstance(child, ast.AST): + nodes.append(child) + setattr(node, name, new) + elif (isinstance(field, ast.AST) and + # Don't recurse into expressions as they can't contain + # asserts. + not isinstance(field, ast.expr)): + nodes.append(field) + + def variable(self): + """Get a new variable.""" + # Use a character invalid in python identifiers to avoid clashing. + name = "@py_assert" + str(next(self.variable_counter)) + self.variables.append(name) + return name + + def assign(self, expr): + """Give *expr* a name.""" + name = self.variable() + self.statements.append(ast.Assign([ast.Name(name, ast.Store())], expr)) + return ast.Name(name, ast.Load()) + + def display(self, expr): + """Call py.io.saferepr on the expression.""" + return self.helper("saferepr", expr) + + def helper(self, name, *args): + """Call a helper in this module.""" + py_name = ast.Name("@pytest_ar", ast.Load()) + attr = ast.Attribute(py_name, "_" + name, ast.Load()) + return ast_Call(attr, list(args), []) + + def builtin(self, name): + """Return the builtin called *name*.""" + builtin_name = ast.Name("@py_builtins", ast.Load()) + return ast.Attribute(builtin_name, name, ast.Load()) + + def explanation_param(self, expr): + """Return a new named %-formatting placeholder for expr. + + This creates a %-formatting placeholder for expr in the + current formatting context, e.g. ``%(py0)s``. The placeholder + and expr are placed in the current format context so that it + can be used on the next call to .pop_format_context(). + + """ + specifier = "py" + str(next(self.variable_counter)) + self.explanation_specifiers[specifier] = expr + return "%(" + specifier + ")s" + + def push_format_context(self): + """Create a new formatting context. + + The format context is used for when an explanation wants to + have a variable value formatted in the assertion message. In + this case the value required can be added using + .explanation_param(). Finally .pop_format_context() is used + to format a string of %-formatted values as added by + .explanation_param(). + + """ + self.explanation_specifiers = {} + self.stack.append(self.explanation_specifiers) + + def pop_format_context(self, expl_expr): + """Format the %-formatted string with current format context. + + The expl_expr should be an ast.Str instance constructed from + the %-placeholders created by .explanation_param(). This will + add the required code to format said string to .on_failure and + return the ast.Name instance of the formatted string. + + """ + current = self.stack.pop() + if self.stack: + self.explanation_specifiers = self.stack[-1] + keys = [ast.Str(key) for key in current.keys()] + format_dict = ast.Dict(keys, list(current.values())) + form = ast.BinOp(expl_expr, ast.Mod(), format_dict) + name = "@py_format" + str(next(self.variable_counter)) + self.on_failure.append(ast.Assign([ast.Name(name, ast.Store())], form)) + return ast.Name(name, ast.Load()) + + def generic_visit(self, node): + """Handle expressions we don't have custom code for.""" + assert isinstance(node, ast.expr) + res = self.assign(node) + return res, self.explanation_param(self.display(res)) + + def visit_Assert(self, assert_): + """Return the AST statements to replace the ast.Assert instance. + + This re-writes the test of an assertion to provide + intermediate values and replace it with an if statement which + raises an assertion error with a detailed explanation in case + the expression is false. + + """ + self.statements = [] + self.variables = [] + self.variable_counter = itertools.count() + self.stack = [] + self.on_failure = [] + self.push_format_context() + # Rewrite assert into a bunch of statements. + top_condition, explanation = self.visit(assert_.test) + # Create failure message. + body = self.on_failure + negation = ast.UnaryOp(ast.Not(), top_condition) + self.statements.append(ast.If(negation, body, [])) + if assert_.msg: + assertmsg = self.helper('format_assertmsg', assert_.msg) + explanation = "\n>assert " + explanation + else: + assertmsg = ast.Str("") + explanation = "assert " + explanation + template = ast.BinOp(assertmsg, ast.Add(), ast.Str(explanation)) + msg = self.pop_format_context(template) + fmt = self.helper("format_explanation", msg) + err_name = ast.Name("AssertionError", ast.Load()) + exc = ast_Call(err_name, [fmt], []) + if sys.version_info[0] >= 3: + raise_ = ast.Raise(exc, None) + else: + raise_ = ast.Raise(exc, None, None) + body.append(raise_) + # Clear temporary variables by setting them to None. + if self.variables: + variables = [ast.Name(name, ast.Store()) + for name in self.variables] + clear = ast.Assign(variables, _NameConstant(None)) + self.statements.append(clear) + # Fix line numbers. + for stmt in self.statements: + set_location(stmt, assert_.lineno, assert_.col_offset) + return self.statements + + def visit_Name(self, name): + # Display the repr of the name if it's a local variable or + # _should_repr_global_name() thinks it's acceptable. + locs = ast_Call(self.builtin("locals"), [], []) + inlocs = ast.Compare(ast.Str(name.id), [ast.In()], [locs]) + dorepr = self.helper("should_repr_global_name", name) + test = ast.BoolOp(ast.Or(), [inlocs, dorepr]) + expr = ast.IfExp(test, self.display(name), ast.Str(name.id)) + return name, self.explanation_param(expr) + + def visit_BoolOp(self, boolop): + res_var = self.variable() + expl_list = self.assign(ast.List([], ast.Load())) + app = ast.Attribute(expl_list, "append", ast.Load()) + is_or = int(isinstance(boolop.op, ast.Or)) + body = save = self.statements + fail_save = self.on_failure + levels = len(boolop.values) - 1 + self.push_format_context() + # Process each operand, short-circuting if needed. + for i, v in enumerate(boolop.values): + if i: + fail_inner = [] + # cond is set in a prior loop iteration below + self.on_failure.append(ast.If(cond, fail_inner, [])) # noqa + self.on_failure = fail_inner + self.push_format_context() + res, expl = self.visit(v) + body.append(ast.Assign([ast.Name(res_var, ast.Store())], res)) + expl_format = self.pop_format_context(ast.Str(expl)) + call = ast_Call(app, [expl_format], []) + self.on_failure.append(ast.Expr(call)) + if i < levels: + cond = res + if is_or: + cond = ast.UnaryOp(ast.Not(), cond) + inner = [] + self.statements.append(ast.If(cond, inner, [])) + self.statements = body = inner + self.statements = save + self.on_failure = fail_save + expl_template = self.helper("format_boolop", expl_list, ast.Num(is_or)) + expl = self.pop_format_context(expl_template) + return ast.Name(res_var, ast.Load()), self.explanation_param(expl) + + def visit_UnaryOp(self, unary): + pattern = unary_map[unary.op.__class__] + operand_res, operand_expl = self.visit(unary.operand) + res = self.assign(ast.UnaryOp(unary.op, operand_res)) + return res, pattern % (operand_expl,) + + def visit_BinOp(self, binop): + symbol = binop_map[binop.op.__class__] + left_expr, left_expl = self.visit(binop.left) + right_expr, right_expl = self.visit(binop.right) + explanation = "(%s %s %s)" % (left_expl, symbol, right_expl) + res = self.assign(ast.BinOp(left_expr, binop.op, right_expr)) + return res, explanation + + def visit_Call_35(self, call): + """ + visit `ast.Call` nodes on Python3.5 and after + """ + new_func, func_expl = self.visit(call.func) + arg_expls = [] + new_args = [] + new_kwargs = [] + for arg in call.args: + res, expl = self.visit(arg) + arg_expls.append(expl) + new_args.append(res) + for keyword in call.keywords: + res, expl = self.visit(keyword.value) + new_kwargs.append(ast.keyword(keyword.arg, res)) + if keyword.arg: + arg_expls.append(keyword.arg + "=" + expl) + else: ## **args have `arg` keywords with an .arg of None + arg_expls.append("**" + expl) + + expl = "%s(%s)" % (func_expl, ', '.join(arg_expls)) + new_call = ast.Call(new_func, new_args, new_kwargs) + res = self.assign(new_call) + res_expl = self.explanation_param(self.display(res)) + outer_expl = "%s\n{%s = %s\n}" % (res_expl, res_expl, expl) + return res, outer_expl + + def visit_Starred(self, starred): + # From Python 3.5, a Starred node can appear in a function call + res, expl = self.visit(starred.value) + return starred, '*' + expl + + def visit_Call_legacy(self, call): + """ + visit `ast.Call nodes on 3.4 and below` + """ + new_func, func_expl = self.visit(call.func) + arg_expls = [] + new_args = [] + new_kwargs = [] + new_star = new_kwarg = None + for arg in call.args: + res, expl = self.visit(arg) + new_args.append(res) + arg_expls.append(expl) + for keyword in call.keywords: + res, expl = self.visit(keyword.value) + new_kwargs.append(ast.keyword(keyword.arg, res)) + arg_expls.append(keyword.arg + "=" + expl) + if call.starargs: + new_star, expl = self.visit(call.starargs) + arg_expls.append("*" + expl) + if call.kwargs: + new_kwarg, expl = self.visit(call.kwargs) + arg_expls.append("**" + expl) + expl = "%s(%s)" % (func_expl, ', '.join(arg_expls)) + new_call = ast.Call(new_func, new_args, new_kwargs, + new_star, new_kwarg) + res = self.assign(new_call) + res_expl = self.explanation_param(self.display(res)) + outer_expl = "%s\n{%s = %s\n}" % (res_expl, res_expl, expl) + return res, outer_expl + + # ast.Call signature changed on 3.5, + # conditionally change which methods is named + # visit_Call depending on Python version + if sys.version_info >= (3, 5): + visit_Call = visit_Call_35 + else: + visit_Call = visit_Call_legacy + + + def visit_Attribute(self, attr): + if not isinstance(attr.ctx, ast.Load): + return self.generic_visit(attr) + value, value_expl = self.visit(attr.value) + res = self.assign(ast.Attribute(value, attr.attr, ast.Load())) + res_expl = self.explanation_param(self.display(res)) + pat = "%s\n{%s = %s.%s\n}" + expl = pat % (res_expl, res_expl, value_expl, attr.attr) + return res, expl + + def visit_Compare(self, comp): + self.push_format_context() + left_res, left_expl = self.visit(comp.left) + res_variables = [self.variable() for i in range(len(comp.ops))] + load_names = [ast.Name(v, ast.Load()) for v in res_variables] + store_names = [ast.Name(v, ast.Store()) for v in res_variables] + it = zip(range(len(comp.ops)), comp.ops, comp.comparators) + expls = [] + syms = [] + results = [left_res] + for i, op, next_operand in it: + next_res, next_expl = self.visit(next_operand) + results.append(next_res) + sym = binop_map[op.__class__] + syms.append(ast.Str(sym)) + expl = "%s %s %s" % (left_expl, sym, next_expl) + expls.append(ast.Str(expl)) + res_expr = ast.Compare(left_res, [op], [next_res]) + self.statements.append(ast.Assign([store_names[i]], res_expr)) + left_res, left_expl = next_res, next_expl + # Use pytest.assertion.util._reprcompare if that's available. + expl_call = self.helper("call_reprcompare", + ast.Tuple(syms, ast.Load()), + ast.Tuple(load_names, ast.Load()), + ast.Tuple(expls, ast.Load()), + ast.Tuple(results, ast.Load())) + if len(comp.ops) > 1: + res = ast.BoolOp(ast.And(), load_names) + else: + res = load_names[0] + return res, self.explanation_param(self.pop_format_context(expl_call)) diff --git a/python/pytest/_pytest/assertion/util.py b/python/pytest/_pytest/assertion/util.py new file mode 100644 index 000000000..f2f23efea --- /dev/null +++ b/python/pytest/_pytest/assertion/util.py @@ -0,0 +1,332 @@ +"""Utilities for assertion debugging""" +import pprint + +import _pytest._code +import py +try: + from collections import Sequence +except ImportError: + Sequence = list + +BuiltinAssertionError = py.builtin.builtins.AssertionError +u = py.builtin._totext + +# The _reprcompare attribute on the util module is used by the new assertion +# interpretation code and assertion rewriter to detect this plugin was +# loaded and in turn call the hooks defined here as part of the +# DebugInterpreter. +_reprcompare = None + + +# the re-encoding is needed for python2 repr +# with non-ascii characters (see issue 877 and 1379) +def ecu(s): + try: + return u(s, 'utf-8', 'replace') + except TypeError: + return s + + +def format_explanation(explanation): + """This formats an explanation + + Normally all embedded newlines are escaped, however there are + three exceptions: \n{, \n} and \n~. The first two are intended + cover nested explanations, see function and attribute explanations + for examples (.visit_Call(), visit_Attribute()). The last one is + for when one explanation needs to span multiple lines, e.g. when + displaying diffs. + """ + explanation = ecu(explanation) + explanation = _collapse_false(explanation) + lines = _split_explanation(explanation) + result = _format_lines(lines) + return u('\n').join(result) + + +def _collapse_false(explanation): + """Collapse expansions of False + + So this strips out any "assert False\n{where False = ...\n}" + blocks. + """ + where = 0 + while True: + start = where = explanation.find("False\n{False = ", where) + if where == -1: + break + level = 0 + prev_c = explanation[start] + for i, c in enumerate(explanation[start:]): + if prev_c + c == "\n{": + level += 1 + elif prev_c + c == "\n}": + level -= 1 + if not level: + break + prev_c = c + else: + raise AssertionError("unbalanced braces: %r" % (explanation,)) + end = start + i + where = end + if explanation[end - 1] == '\n': + explanation = (explanation[:start] + explanation[start+15:end-1] + + explanation[end+1:]) + where -= 17 + return explanation + + +def _split_explanation(explanation): + """Return a list of individual lines in the explanation + + This will return a list of lines split on '\n{', '\n}' and '\n~'. + Any other newlines will be escaped and appear in the line as the + literal '\n' characters. + """ + raw_lines = (explanation or u('')).split('\n') + lines = [raw_lines[0]] + for l in raw_lines[1:]: + if l and l[0] in ['{', '}', '~', '>']: + lines.append(l) + else: + lines[-1] += '\\n' + l + return lines + + +def _format_lines(lines): + """Format the individual lines + + This will replace the '{', '}' and '~' characters of our mini + formatting language with the proper 'where ...', 'and ...' and ' + + ...' text, taking care of indentation along the way. + + Return a list of formatted lines. + """ + result = lines[:1] + stack = [0] + stackcnt = [0] + for line in lines[1:]: + if line.startswith('{'): + if stackcnt[-1]: + s = u('and ') + else: + s = u('where ') + stack.append(len(result)) + stackcnt[-1] += 1 + stackcnt.append(0) + result.append(u(' +') + u(' ')*(len(stack)-1) + s + line[1:]) + elif line.startswith('}'): + stack.pop() + stackcnt.pop() + result[stack[-1]] += line[1:] + else: + assert line[0] in ['~', '>'] + stack[-1] += 1 + indent = len(stack) if line.startswith('~') else len(stack) - 1 + result.append(u(' ')*indent + line[1:]) + assert len(stack) == 1 + return result + + +# Provide basestring in python3 +try: + basestring = basestring +except NameError: + basestring = str + + +def assertrepr_compare(config, op, left, right): + """Return specialised explanations for some operators/operands""" + width = 80 - 15 - len(op) - 2 # 15 chars indentation, 1 space around op + left_repr = py.io.saferepr(left, maxsize=int(width/2)) + right_repr = py.io.saferepr(right, maxsize=width-len(left_repr)) + + summary = u('%s %s %s') % (ecu(left_repr), op, ecu(right_repr)) + + issequence = lambda x: (isinstance(x, (list, tuple, Sequence)) and + not isinstance(x, basestring)) + istext = lambda x: isinstance(x, basestring) + isdict = lambda x: isinstance(x, dict) + isset = lambda x: isinstance(x, (set, frozenset)) + + def isiterable(obj): + try: + iter(obj) + return not istext(obj) + except TypeError: + return False + + verbose = config.getoption('verbose') + explanation = None + try: + if op == '==': + if istext(left) and istext(right): + explanation = _diff_text(left, right, verbose) + else: + if issequence(left) and issequence(right): + explanation = _compare_eq_sequence(left, right, verbose) + elif isset(left) and isset(right): + explanation = _compare_eq_set(left, right, verbose) + elif isdict(left) and isdict(right): + explanation = _compare_eq_dict(left, right, verbose) + if isiterable(left) and isiterable(right): + expl = _compare_eq_iterable(left, right, verbose) + if explanation is not None: + explanation.extend(expl) + else: + explanation = expl + elif op == 'not in': + if istext(left) and istext(right): + explanation = _notin_text(left, right, verbose) + except Exception: + explanation = [ + u('(pytest_assertion plugin: representation of details failed. ' + 'Probably an object has a faulty __repr__.)'), + u(_pytest._code.ExceptionInfo())] + + if not explanation: + return None + + return [summary] + explanation + + +def _diff_text(left, right, verbose=False): + """Return the explanation for the diff between text or bytes + + Unless --verbose is used this will skip leading and trailing + characters which are identical to keep the diff minimal. + + If the input are bytes they will be safely converted to text. + """ + from difflib import ndiff + explanation = [] + if isinstance(left, py.builtin.bytes): + left = u(repr(left)[1:-1]).replace(r'\n', '\n') + if isinstance(right, py.builtin.bytes): + right = u(repr(right)[1:-1]).replace(r'\n', '\n') + if not verbose: + i = 0 # just in case left or right has zero length + for i in range(min(len(left), len(right))): + if left[i] != right[i]: + break + if i > 42: + i -= 10 # Provide some context + explanation = [u('Skipping %s identical leading ' + 'characters in diff, use -v to show') % i] + left = left[i:] + right = right[i:] + if len(left) == len(right): + for i in range(len(left)): + if left[-i] != right[-i]: + break + if i > 42: + i -= 10 # Provide some context + explanation += [u('Skipping %s identical trailing ' + 'characters in diff, use -v to show') % i] + left = left[:-i] + right = right[:-i] + explanation += [line.strip('\n') + for line in ndiff(left.splitlines(), + right.splitlines())] + return explanation + + +def _compare_eq_iterable(left, right, verbose=False): + if not verbose: + return [u('Use -v to get the full diff')] + # dynamic import to speedup pytest + import difflib + + try: + left_formatting = pprint.pformat(left).splitlines() + right_formatting = pprint.pformat(right).splitlines() + explanation = [u('Full diff:')] + except Exception: + # hack: PrettyPrinter.pformat() in python 2 fails when formatting items that can't be sorted(), ie, calling + # sorted() on a list would raise. See issue #718. + # As a workaround, the full diff is generated by using the repr() string of each item of each container. + left_formatting = sorted(repr(x) for x in left) + right_formatting = sorted(repr(x) for x in right) + explanation = [u('Full diff (fallback to calling repr on each item):')] + explanation.extend(line.strip() for line in difflib.ndiff(left_formatting, right_formatting)) + return explanation + + +def _compare_eq_sequence(left, right, verbose=False): + explanation = [] + for i in range(min(len(left), len(right))): + if left[i] != right[i]: + explanation += [u('At index %s diff: %r != %r') + % (i, left[i], right[i])] + break + if len(left) > len(right): + explanation += [u('Left contains more items, first extra item: %s') + % py.io.saferepr(left[len(right)],)] + elif len(left) < len(right): + explanation += [ + u('Right contains more items, first extra item: %s') % + py.io.saferepr(right[len(left)],)] + return explanation + + +def _compare_eq_set(left, right, verbose=False): + explanation = [] + diff_left = left - right + diff_right = right - left + if diff_left: + explanation.append(u('Extra items in the left set:')) + for item in diff_left: + explanation.append(py.io.saferepr(item)) + if diff_right: + explanation.append(u('Extra items in the right set:')) + for item in diff_right: + explanation.append(py.io.saferepr(item)) + return explanation + + +def _compare_eq_dict(left, right, verbose=False): + explanation = [] + common = set(left).intersection(set(right)) + same = dict((k, left[k]) for k in common if left[k] == right[k]) + if same and not verbose: + explanation += [u('Omitting %s identical items, use -v to show') % + len(same)] + elif same: + explanation += [u('Common items:')] + explanation += pprint.pformat(same).splitlines() + diff = set(k for k in common if left[k] != right[k]) + if diff: + explanation += [u('Differing items:')] + for k in diff: + explanation += [py.io.saferepr({k: left[k]}) + ' != ' + + py.io.saferepr({k: right[k]})] + extra_left = set(left) - set(right) + if extra_left: + explanation.append(u('Left contains more items:')) + explanation.extend(pprint.pformat( + dict((k, left[k]) for k in extra_left)).splitlines()) + extra_right = set(right) - set(left) + if extra_right: + explanation.append(u('Right contains more items:')) + explanation.extend(pprint.pformat( + dict((k, right[k]) for k in extra_right)).splitlines()) + return explanation + + +def _notin_text(term, text, verbose=False): + index = text.find(term) + head = text[:index] + tail = text[index+len(term):] + correct_text = head + tail + diff = _diff_text(correct_text, text, verbose) + newdiff = [u('%s is contained here:') % py.io.saferepr(term, maxsize=42)] + for line in diff: + if line.startswith(u('Skipping')): + continue + if line.startswith(u('- ')): + continue + if line.startswith(u('+ ')): + newdiff.append(u(' ') + line[2:]) + else: + newdiff.append(line) + return newdiff |