summaryrefslogtreecommitdiffstats
path: root/python/pytest/_pytest/vendored_packages/pluggy.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pytest/_pytest/vendored_packages/pluggy.py')
-rw-r--r--python/pytest/_pytest/vendored_packages/pluggy.py777
1 files changed, 777 insertions, 0 deletions
diff --git a/python/pytest/_pytest/vendored_packages/pluggy.py b/python/pytest/_pytest/vendored_packages/pluggy.py
new file mode 100644
index 000000000..2f848b23d
--- /dev/null
+++ b/python/pytest/_pytest/vendored_packages/pluggy.py
@@ -0,0 +1,777 @@
+"""
+PluginManager, basic initialization and tracing.
+
+pluggy is the cristallized core of plugin management as used
+by some 150 plugins for pytest.
+
+Pluggy uses semantic versioning. Breaking changes are only foreseen for
+Major releases (incremented X in "X.Y.Z"). If you want to use pluggy in
+your project you should thus use a dependency restriction like
+"pluggy>=0.1.0,<1.0" to avoid surprises.
+
+pluggy is concerned with hook specification, hook implementations and hook
+calling. For any given hook specification a hook call invokes up to N implementations.
+A hook implementation can influence its position and type of execution:
+if attributed "tryfirst" or "trylast" it will be tried to execute
+first or last. However, if attributed "hookwrapper" an implementation
+can wrap all calls to non-hookwrapper implementations. A hookwrapper
+can thus execute some code ahead and after the execution of other hooks.
+
+Hook specification is done by way of a regular python function where
+both the function name and the names of all its arguments are significant.
+Each hook implementation function is verified against the original specification
+function, including the names of all its arguments. To allow for hook specifications
+to evolve over the livetime of a project, hook implementations can
+accept less arguments. One can thus add new arguments and semantics to
+a hook specification by adding another argument typically without breaking
+existing hook implementations.
+
+The chosen approach is meant to let a hook designer think carefuly about
+which objects are needed by an extension writer. By contrast, subclass-based
+extension mechanisms often expose a lot more state and behaviour than needed,
+thus restricting future developments.
+
+Pluggy currently consists of functionality for:
+
+- a way to register new hook specifications. Without a hook
+ specification no hook calling can be performed.
+
+- a registry of plugins which contain hook implementation functions. It
+ is possible to register plugins for which a hook specification is not yet
+ known and validate all hooks when the system is in a more referentially
+ consistent state. Setting an "optionalhook" attribution to a hook
+ implementation will avoid PluginValidationError's if a specification
+ is missing. This allows to have optional integration between plugins.
+
+- a "hook" relay object from which you can launch 1:N calls to
+ registered hook implementation functions
+
+- a mechanism for ordering hook implementation functions
+
+- mechanisms for two different type of 1:N calls: "firstresult" for when
+ the call should stop when the first implementation returns a non-None result.
+ And the other (default) way of guaranteeing that all hook implementations
+ will be called and their non-None result collected.
+
+- mechanisms for "historic" extension points such that all newly
+ registered functions will receive all hook calls that happened
+ before their registration.
+
+- a mechanism for discovering plugin objects which are based on
+ setuptools based entry points.
+
+- a simple tracing mechanism, including tracing of plugin calls and
+ their arguments.
+
+"""
+import sys
+import inspect
+
+__version__ = '0.3.1'
+__all__ = ["PluginManager", "PluginValidationError",
+ "HookspecMarker", "HookimplMarker"]
+
+_py3 = sys.version_info > (3, 0)
+
+
+class HookspecMarker:
+ """ Decorator helper class for marking functions as hook specifications.
+
+ You can instantiate it with a project_name to get a decorator.
+ Calling PluginManager.add_hookspecs later will discover all marked functions
+ if the PluginManager uses the same project_name.
+ """
+
+ def __init__(self, project_name):
+ self.project_name = project_name
+
+ def __call__(self, function=None, firstresult=False, historic=False):
+ """ if passed a function, directly sets attributes on the function
+ which will make it discoverable to add_hookspecs(). If passed no
+ function, returns a decorator which can be applied to a function
+ later using the attributes supplied.
+
+ If firstresult is True the 1:N hook call (N being the number of registered
+ hook implementation functions) will stop at I<=N when the I'th function
+ returns a non-None result.
+
+ If historic is True calls to a hook will be memorized and replayed
+ on later registered plugins.
+
+ """
+ def setattr_hookspec_opts(func):
+ if historic and firstresult:
+ raise ValueError("cannot have a historic firstresult hook")
+ setattr(func, self.project_name + "_spec",
+ dict(firstresult=firstresult, historic=historic))
+ return func
+
+ if function is not None:
+ return setattr_hookspec_opts(function)
+ else:
+ return setattr_hookspec_opts
+
+
+class HookimplMarker:
+ """ Decorator helper class for marking functions as hook implementations.
+
+ You can instantiate with a project_name to get a decorator.
+ Calling PluginManager.register later will discover all marked functions
+ if the PluginManager uses the same project_name.
+ """
+ def __init__(self, project_name):
+ self.project_name = project_name
+
+ def __call__(self, function=None, hookwrapper=False, optionalhook=False,
+ tryfirst=False, trylast=False):
+
+ """ if passed a function, directly sets attributes on the function
+ which will make it discoverable to register(). If passed no function,
+ returns a decorator which can be applied to a function later using
+ the attributes supplied.
+
+ If optionalhook is True a missing matching hook specification will not result
+ in an error (by default it is an error if no matching spec is found).
+
+ If tryfirst is True this hook implementation will run as early as possible
+ in the chain of N hook implementations for a specfication.
+
+ If trylast is True this hook implementation will run as late as possible
+ in the chain of N hook implementations.
+
+ If hookwrapper is True the hook implementations needs to execute exactly
+ one "yield". The code before the yield is run early before any non-hookwrapper
+ function is run. The code after the yield is run after all non-hookwrapper
+ function have run. The yield receives an ``_CallOutcome`` object representing
+ the exception or result outcome of the inner calls (including other hookwrapper
+ calls).
+
+ """
+ def setattr_hookimpl_opts(func):
+ setattr(func, self.project_name + "_impl",
+ dict(hookwrapper=hookwrapper, optionalhook=optionalhook,
+ tryfirst=tryfirst, trylast=trylast))
+ return func
+
+ if function is None:
+ return setattr_hookimpl_opts
+ else:
+ return setattr_hookimpl_opts(function)
+
+
+def normalize_hookimpl_opts(opts):
+ opts.setdefault("tryfirst", False)
+ opts.setdefault("trylast", False)
+ opts.setdefault("hookwrapper", False)
+ opts.setdefault("optionalhook", False)
+
+
+class _TagTracer:
+ def __init__(self):
+ self._tag2proc = {}
+ self.writer = None
+ self.indent = 0
+
+ def get(self, name):
+ return _TagTracerSub(self, (name,))
+
+ def format_message(self, tags, args):
+ if isinstance(args[-1], dict):
+ extra = args[-1]
+ args = args[:-1]
+ else:
+ extra = {}
+
+ content = " ".join(map(str, args))
+ indent = " " * self.indent
+
+ lines = [
+ "%s%s [%s]\n" % (indent, content, ":".join(tags))
+ ]
+
+ for name, value in extra.items():
+ lines.append("%s %s: %s\n" % (indent, name, value))
+ return lines
+
+ def processmessage(self, tags, args):
+ if self.writer is not None and args:
+ lines = self.format_message(tags, args)
+ self.writer(''.join(lines))
+ try:
+ self._tag2proc[tags](tags, args)
+ except KeyError:
+ pass
+
+ def setwriter(self, writer):
+ self.writer = writer
+
+ def setprocessor(self, tags, processor):
+ if isinstance(tags, str):
+ tags = tuple(tags.split(":"))
+ else:
+ assert isinstance(tags, tuple)
+ self._tag2proc[tags] = processor
+
+
+class _TagTracerSub:
+ def __init__(self, root, tags):
+ self.root = root
+ self.tags = tags
+
+ def __call__(self, *args):
+ self.root.processmessage(self.tags, args)
+
+ def setmyprocessor(self, processor):
+ self.root.setprocessor(self.tags, processor)
+
+ def get(self, name):
+ return self.__class__(self.root, self.tags + (name,))
+
+
+def _raise_wrapfail(wrap_controller, msg):
+ co = wrap_controller.gi_code
+ raise RuntimeError("wrap_controller at %r %s:%d %s" %
+ (co.co_name, co.co_filename, co.co_firstlineno, msg))
+
+
+def _wrapped_call(wrap_controller, func):
+ """ Wrap calling to a function with a generator which needs to yield
+ exactly once. The yield point will trigger calling the wrapped function
+ and return its _CallOutcome to the yield point. The generator then needs
+ to finish (raise StopIteration) in order for the wrapped call to complete.
+ """
+ try:
+ next(wrap_controller) # first yield
+ except StopIteration:
+ _raise_wrapfail(wrap_controller, "did not yield")
+ call_outcome = _CallOutcome(func)
+ try:
+ wrap_controller.send(call_outcome)
+ _raise_wrapfail(wrap_controller, "has second yield")
+ except StopIteration:
+ pass
+ return call_outcome.get_result()
+
+
+class _CallOutcome:
+ """ Outcome of a function call, either an exception or a proper result.
+ Calling the ``get_result`` method will return the result or reraise
+ the exception raised when the function was called. """
+ excinfo = None
+
+ def __init__(self, func):
+ try:
+ self.result = func()
+ except BaseException:
+ self.excinfo = sys.exc_info()
+
+ def force_result(self, result):
+ self.result = result
+ self.excinfo = None
+
+ def get_result(self):
+ if self.excinfo is None:
+ return self.result
+ else:
+ ex = self.excinfo
+ if _py3:
+ raise ex[1].with_traceback(ex[2])
+ _reraise(*ex) # noqa
+
+if not _py3:
+ exec("""
+def _reraise(cls, val, tb):
+ raise cls, val, tb
+""")
+
+
+class _TracedHookExecution:
+ def __init__(self, pluginmanager, before, after):
+ self.pluginmanager = pluginmanager
+ self.before = before
+ self.after = after
+ self.oldcall = pluginmanager._inner_hookexec
+ assert not isinstance(self.oldcall, _TracedHookExecution)
+ self.pluginmanager._inner_hookexec = self
+
+ def __call__(self, hook, hook_impls, kwargs):
+ self.before(hook.name, hook_impls, kwargs)
+ outcome = _CallOutcome(lambda: self.oldcall(hook, hook_impls, kwargs))
+ self.after(outcome, hook.name, hook_impls, kwargs)
+ return outcome.get_result()
+
+ def undo(self):
+ self.pluginmanager._inner_hookexec = self.oldcall
+
+
+class PluginManager(object):
+ """ Core Pluginmanager class which manages registration
+ of plugin objects and 1:N hook calling.
+
+ You can register new hooks by calling ``addhooks(module_or_class)``.
+ You can register plugin objects (which contain hooks) by calling
+ ``register(plugin)``. The Pluginmanager is initialized with a
+ prefix that is searched for in the names of the dict of registered
+ plugin objects. An optional excludefunc allows to blacklist names which
+ are not considered as hooks despite a matching prefix.
+
+ For debugging purposes you can call ``enable_tracing()``
+ which will subsequently send debug information to the trace helper.
+ """
+
+ def __init__(self, project_name, implprefix=None):
+ """ if implprefix is given implementation functions
+ will be recognized if their name matches the implprefix. """
+ self.project_name = project_name
+ self._name2plugin = {}
+ self._plugin2hookcallers = {}
+ self._plugin_distinfo = []
+ self.trace = _TagTracer().get("pluginmanage")
+ self.hook = _HookRelay(self.trace.root.get("hook"))
+ self._implprefix = implprefix
+ self._inner_hookexec = lambda hook, methods, kwargs: \
+ _MultiCall(methods, kwargs, hook.spec_opts).execute()
+
+ def _hookexec(self, hook, methods, kwargs):
+ # called from all hookcaller instances.
+ # enable_tracing will set its own wrapping function at self._inner_hookexec
+ return self._inner_hookexec(hook, methods, kwargs)
+
+ def register(self, plugin, name=None):
+ """ Register a plugin and return its canonical name or None if the name
+ is blocked from registering. Raise a ValueError if the plugin is already
+ registered. """
+ plugin_name = name or self.get_canonical_name(plugin)
+
+ if plugin_name in self._name2plugin or plugin in self._plugin2hookcallers:
+ if self._name2plugin.get(plugin_name, -1) is None:
+ return # blocked plugin, return None to indicate no registration
+ raise ValueError("Plugin already registered: %s=%s\n%s" %
+ (plugin_name, plugin, self._name2plugin))
+
+ # XXX if an error happens we should make sure no state has been
+ # changed at point of return
+ self._name2plugin[plugin_name] = plugin
+
+ # register matching hook implementations of the plugin
+ self._plugin2hookcallers[plugin] = hookcallers = []
+ for name in dir(plugin):
+ hookimpl_opts = self.parse_hookimpl_opts(plugin, name)
+ if hookimpl_opts is not None:
+ normalize_hookimpl_opts(hookimpl_opts)
+ method = getattr(plugin, name)
+ hookimpl = HookImpl(plugin, plugin_name, method, hookimpl_opts)
+ hook = getattr(self.hook, name, None)
+ if hook is None:
+ hook = _HookCaller(name, self._hookexec)
+ setattr(self.hook, name, hook)
+ elif hook.has_spec():
+ self._verify_hook(hook, hookimpl)
+ hook._maybe_apply_history(hookimpl)
+ hook._add_hookimpl(hookimpl)
+ hookcallers.append(hook)
+ return plugin_name
+
+ def parse_hookimpl_opts(self, plugin, name):
+ method = getattr(plugin, name)
+ res = getattr(method, self.project_name + "_impl", None)
+ if res is not None and not isinstance(res, dict):
+ # false positive
+ res = None
+ elif res is None and self._implprefix and name.startswith(self._implprefix):
+ res = {}
+ return res
+
+ def unregister(self, plugin=None, name=None):
+ """ unregister a plugin object and all its contained hook implementations
+ from internal data structures. """
+ if name is None:
+ assert plugin is not None, "one of name or plugin needs to be specified"
+ name = self.get_name(plugin)
+
+ if plugin is None:
+ plugin = self.get_plugin(name)
+
+ # if self._name2plugin[name] == None registration was blocked: ignore
+ if self._name2plugin.get(name):
+ del self._name2plugin[name]
+
+ for hookcaller in self._plugin2hookcallers.pop(plugin, []):
+ hookcaller._remove_plugin(plugin)
+
+ return plugin
+
+ def set_blocked(self, name):
+ """ block registrations of the given name, unregister if already registered. """
+ self.unregister(name=name)
+ self._name2plugin[name] = None
+
+ def is_blocked(self, name):
+ """ return True if the name blogs registering plugins of that name. """
+ return name in self._name2plugin and self._name2plugin[name] is None
+
+ def add_hookspecs(self, module_or_class):
+ """ add new hook specifications defined in the given module_or_class.
+ Functions are recognized if they have been decorated accordingly. """
+ names = []
+ for name in dir(module_or_class):
+ spec_opts = self.parse_hookspec_opts(module_or_class, name)
+ if spec_opts is not None:
+ hc = getattr(self.hook, name, None)
+ if hc is None:
+ hc = _HookCaller(name, self._hookexec, module_or_class, spec_opts)
+ setattr(self.hook, name, hc)
+ else:
+ # plugins registered this hook without knowing the spec
+ hc.set_specification(module_or_class, spec_opts)
+ for hookfunction in (hc._wrappers + hc._nonwrappers):
+ self._verify_hook(hc, hookfunction)
+ names.append(name)
+
+ if not names:
+ raise ValueError("did not find any %r hooks in %r" %
+ (self.project_name, module_or_class))
+
+ def parse_hookspec_opts(self, module_or_class, name):
+ method = getattr(module_or_class, name)
+ return getattr(method, self.project_name + "_spec", None)
+
+ def get_plugins(self):
+ """ return the set of registered plugins. """
+ return set(self._plugin2hookcallers)
+
+ def is_registered(self, plugin):
+ """ Return True if the plugin is already registered. """
+ return plugin in self._plugin2hookcallers
+
+ def get_canonical_name(self, plugin):
+ """ Return canonical name for a plugin object. Note that a plugin
+ may be registered under a different name which was specified
+ by the caller of register(plugin, name). To obtain the name
+ of an registered plugin use ``get_name(plugin)`` instead."""
+ return getattr(plugin, "__name__", None) or str(id(plugin))
+
+ def get_plugin(self, name):
+ """ Return a plugin or None for the given name. """
+ return self._name2plugin.get(name)
+
+ def get_name(self, plugin):
+ """ Return name for registered plugin or None if not registered. """
+ for name, val in self._name2plugin.items():
+ if plugin == val:
+ return name
+
+ def _verify_hook(self, hook, hookimpl):
+ if hook.is_historic() and hookimpl.hookwrapper:
+ raise PluginValidationError(
+ "Plugin %r\nhook %r\nhistoric incompatible to hookwrapper" %
+ (hookimpl.plugin_name, hook.name))
+
+ for arg in hookimpl.argnames:
+ if arg not in hook.argnames:
+ raise PluginValidationError(
+ "Plugin %r\nhook %r\nargument %r not available\n"
+ "plugin definition: %s\n"
+ "available hookargs: %s" %
+ (hookimpl.plugin_name, hook.name, arg,
+ _formatdef(hookimpl.function), ", ".join(hook.argnames)))
+
+ def check_pending(self):
+ """ Verify that all hooks which have not been verified against
+ a hook specification are optional, otherwise raise PluginValidationError"""
+ for name in self.hook.__dict__:
+ if name[0] != "_":
+ hook = getattr(self.hook, name)
+ if not hook.has_spec():
+ for hookimpl in (hook._wrappers + hook._nonwrappers):
+ if not hookimpl.optionalhook:
+ raise PluginValidationError(
+ "unknown hook %r in plugin %r" %
+ (name, hookimpl.plugin))
+
+ def load_setuptools_entrypoints(self, entrypoint_name):
+ """ Load modules from querying the specified setuptools entrypoint name.
+ Return the number of loaded plugins. """
+ from pkg_resources import iter_entry_points, DistributionNotFound
+ for ep in iter_entry_points(entrypoint_name):
+ # is the plugin registered or blocked?
+ if self.get_plugin(ep.name) or self.is_blocked(ep.name):
+ continue
+ try:
+ plugin = ep.load()
+ except DistributionNotFound:
+ continue
+ self.register(plugin, name=ep.name)
+ self._plugin_distinfo.append((plugin, ep.dist))
+ return len(self._plugin_distinfo)
+
+ def list_plugin_distinfo(self):
+ """ return list of distinfo/plugin tuples for all setuptools registered
+ plugins. """
+ return list(self._plugin_distinfo)
+
+ def list_name_plugin(self):
+ """ return list of name/plugin pairs. """
+ return list(self._name2plugin.items())
+
+ def get_hookcallers(self, plugin):
+ """ get all hook callers for the specified plugin. """
+ return self._plugin2hookcallers.get(plugin)
+
+ def add_hookcall_monitoring(self, before, after):
+ """ add before/after tracing functions for all hooks
+ and return an undo function which, when called,
+ will remove the added tracers.
+
+ ``before(hook_name, hook_impls, kwargs)`` will be called ahead
+ of all hook calls and receive a hookcaller instance, a list
+ of HookImpl instances and the keyword arguments for the hook call.
+
+ ``after(outcome, hook_name, hook_impls, kwargs)`` receives the
+ same arguments as ``before`` but also a :py:class:`_CallOutcome`` object
+ which represents the result of the overall hook call.
+ """
+ return _TracedHookExecution(self, before, after).undo
+
+ def enable_tracing(self):
+ """ enable tracing of hook calls and return an undo function. """
+ hooktrace = self.hook._trace
+
+ def before(hook_name, methods, kwargs):
+ hooktrace.root.indent += 1
+ hooktrace(hook_name, kwargs)
+
+ def after(outcome, hook_name, methods, kwargs):
+ if outcome.excinfo is None:
+ hooktrace("finish", hook_name, "-->", outcome.result)
+ hooktrace.root.indent -= 1
+
+ return self.add_hookcall_monitoring(before, after)
+
+ def subset_hook_caller(self, name, remove_plugins):
+ """ Return a new _HookCaller instance for the named method
+ which manages calls to all registered plugins except the
+ ones from remove_plugins. """
+ orig = getattr(self.hook, name)
+ plugins_to_remove = [plug for plug in remove_plugins if hasattr(plug, name)]
+ if plugins_to_remove:
+ hc = _HookCaller(orig.name, orig._hookexec, orig._specmodule_or_class,
+ orig.spec_opts)
+ for hookimpl in (orig._wrappers + orig._nonwrappers):
+ plugin = hookimpl.plugin
+ if plugin not in plugins_to_remove:
+ hc._add_hookimpl(hookimpl)
+ # we also keep track of this hook caller so it
+ # gets properly removed on plugin unregistration
+ self._plugin2hookcallers.setdefault(plugin, []).append(hc)
+ return hc
+ return orig
+
+
+class _MultiCall:
+ """ execute a call into multiple python functions/methods. """
+
+ # XXX note that the __multicall__ argument is supported only
+ # for pytest compatibility reasons. It was never officially
+ # supported there and is explicitly deprecated since 2.8
+ # so we can remove it soon, allowing to avoid the below recursion
+ # in execute() and simplify/speed up the execute loop.
+
+ def __init__(self, hook_impls, kwargs, specopts={}):
+ self.hook_impls = hook_impls
+ self.kwargs = kwargs
+ self.kwargs["__multicall__"] = self
+ self.specopts = specopts
+
+ def execute(self):
+ all_kwargs = self.kwargs
+ self.results = results = []
+ firstresult = self.specopts.get("firstresult")
+
+ while self.hook_impls:
+ hook_impl = self.hook_impls.pop()
+ args = [all_kwargs[argname] for argname in hook_impl.argnames]
+ if hook_impl.hookwrapper:
+ return _wrapped_call(hook_impl.function(*args), self.execute)
+ res = hook_impl.function(*args)
+ if res is not None:
+ if firstresult:
+ return res
+ results.append(res)
+
+ if not firstresult:
+ return results
+
+ def __repr__(self):
+ status = "%d meths" % (len(self.hook_impls),)
+ if hasattr(self, "results"):
+ status = ("%d results, " % len(self.results)) + status
+ return "<_MultiCall %s, kwargs=%r>" % (status, self.kwargs)
+
+
+def varnames(func, startindex=None):
+ """ return argument name tuple for a function, method, class or callable.
+
+ In case of a class, its "__init__" method is considered.
+ For methods the "self" parameter is not included unless you are passing
+ an unbound method with Python3 (which has no supports for unbound methods)
+ """
+ cache = getattr(func, "__dict__", {})
+ try:
+ return cache["_varnames"]
+ except KeyError:
+ pass
+ if inspect.isclass(func):
+ try:
+ func = func.__init__
+ except AttributeError:
+ return ()
+ startindex = 1
+ else:
+ if not inspect.isfunction(func) and not inspect.ismethod(func):
+ func = getattr(func, '__call__', func)
+ if startindex is None:
+ startindex = int(inspect.ismethod(func))
+
+ try:
+ rawcode = func.__code__
+ except AttributeError:
+ return ()
+ try:
+ x = rawcode.co_varnames[startindex:rawcode.co_argcount]
+ except AttributeError:
+ x = ()
+ else:
+ defaults = func.__defaults__
+ if defaults:
+ x = x[:-len(defaults)]
+ try:
+ cache["_varnames"] = x
+ except TypeError:
+ pass
+ return x
+
+
+class _HookRelay:
+ """ hook holder object for performing 1:N hook calls where N is the number
+ of registered plugins.
+
+ """
+
+ def __init__(self, trace):
+ self._trace = trace
+
+
+class _HookCaller(object):
+ def __init__(self, name, hook_execute, specmodule_or_class=None, spec_opts=None):
+ self.name = name
+ self._wrappers = []
+ self._nonwrappers = []
+ self._hookexec = hook_execute
+ if specmodule_or_class is not None:
+ assert spec_opts is not None
+ self.set_specification(specmodule_or_class, spec_opts)
+
+ def has_spec(self):
+ return hasattr(self, "_specmodule_or_class")
+
+ def set_specification(self, specmodule_or_class, spec_opts):
+ assert not self.has_spec()
+ self._specmodule_or_class = specmodule_or_class
+ specfunc = getattr(specmodule_or_class, self.name)
+ argnames = varnames(specfunc, startindex=inspect.isclass(specmodule_or_class))
+ assert "self" not in argnames # sanity check
+ self.argnames = ["__multicall__"] + list(argnames)
+ self.spec_opts = spec_opts
+ if spec_opts.get("historic"):
+ self._call_history = []
+
+ def is_historic(self):
+ return hasattr(self, "_call_history")
+
+ def _remove_plugin(self, plugin):
+ def remove(wrappers):
+ for i, method in enumerate(wrappers):
+ if method.plugin == plugin:
+ del wrappers[i]
+ return True
+ if remove(self._wrappers) is None:
+ if remove(self._nonwrappers) is None:
+ raise ValueError("plugin %r not found" % (plugin,))
+
+ def _add_hookimpl(self, hookimpl):
+ if hookimpl.hookwrapper:
+ methods = self._wrappers
+ else:
+ methods = self._nonwrappers
+
+ if hookimpl.trylast:
+ methods.insert(0, hookimpl)
+ elif hookimpl.tryfirst:
+ methods.append(hookimpl)
+ else:
+ # find last non-tryfirst method
+ i = len(methods) - 1
+ while i >= 0 and methods[i].tryfirst:
+ i -= 1
+ methods.insert(i + 1, hookimpl)
+
+ def __repr__(self):
+ return "<_HookCaller %r>" % (self.name,)
+
+ def __call__(self, **kwargs):
+ assert not self.is_historic()
+ return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
+
+ def call_historic(self, proc=None, kwargs=None):
+ self._call_history.append((kwargs or {}, proc))
+ # historizing hooks don't return results
+ self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
+
+ def call_extra(self, methods, kwargs):
+ """ Call the hook with some additional temporarily participating
+ methods using the specified kwargs as call parameters. """
+ old = list(self._nonwrappers), list(self._wrappers)
+ for method in methods:
+ opts = dict(hookwrapper=False, trylast=False, tryfirst=False)
+ hookimpl = HookImpl(None, "<temp>", method, opts)
+ self._add_hookimpl(hookimpl)
+ try:
+ return self(**kwargs)
+ finally:
+ self._nonwrappers, self._wrappers = old
+
+ def _maybe_apply_history(self, method):
+ if self.is_historic():
+ for kwargs, proc in self._call_history:
+ res = self._hookexec(self, [method], kwargs)
+ if res and proc is not None:
+ proc(res[0])
+
+
+class HookImpl:
+ def __init__(self, plugin, plugin_name, function, hook_impl_opts):
+ self.function = function
+ self.argnames = varnames(self.function)
+ self.plugin = plugin
+ self.opts = hook_impl_opts
+ self.plugin_name = plugin_name
+ self.__dict__.update(hook_impl_opts)
+
+
+class PluginValidationError(Exception):
+ """ plugin failed validation. """
+
+
+if hasattr(inspect, 'signature'):
+ def _formatdef(func):
+ return "%s%s" % (
+ func.__name__,
+ str(inspect.signature(func))
+ )
+else:
+ def _formatdef(func):
+ return "%s%s" % (
+ func.__name__,
+ inspect.formatargspec(*inspect.getargspec(func))
+ )