diff options
author | Matt A. Tobin <mattatobin@localhost.localdomain> | 2018-02-02 04:16:08 -0500 |
---|---|---|
committer | Matt A. Tobin <mattatobin@localhost.localdomain> | 2018-02-02 04:16:08 -0500 |
commit | 5f8de423f190bbb79a62f804151bc24824fa32d8 (patch) | |
tree | 10027f336435511475e392454359edea8e25895d /python/py | |
parent | 49ee0794b5d912db1f95dce6eb52d781dc210db5 (diff) | |
download | UXP-5f8de423f190bbb79a62f804151bc24824fa32d8.tar UXP-5f8de423f190bbb79a62f804151bc24824fa32d8.tar.gz UXP-5f8de423f190bbb79a62f804151bc24824fa32d8.tar.lz UXP-5f8de423f190bbb79a62f804151bc24824fa32d8.tar.xz UXP-5f8de423f190bbb79a62f804151bc24824fa32d8.zip |
Add m-esr52 at 52.6.0
Diffstat (limited to 'python/py')
40 files changed, 7851 insertions, 0 deletions
diff --git a/python/py/AUTHORS b/python/py/AUTHORS new file mode 100644 index 000000000..8c0cf9b71 --- /dev/null +++ b/python/py/AUTHORS @@ -0,0 +1,24 @@ +Holger Krekel, holger at merlinux eu +Benjamin Peterson, benjamin at python org +Ronny Pfannschmidt, Ronny.Pfannschmidt at gmx de +Guido Wesdorp, johnny at johnnydebris net +Samuele Pedroni, pedronis at openend se +Carl Friedrich Bolz, cfbolz at gmx de +Armin Rigo, arigo at tunes org +Maciek Fijalkowski, fijal at genesilico pl +Brian Dorsey, briandorsey at gmail com +Floris Bruynooghe, flub at devork be +merlinux GmbH, Germany, office at merlinux eu + +Contributors include:: + +Ross Lawley +Ralf Schmitt +Chris Lamb +Harald Armin Massa +Martijn Faassen +Ian Bicking +Jan Balster +Grig Gheorghiu +Bob Ippolito +Christian Tismer diff --git a/python/py/LICENSE b/python/py/LICENSE new file mode 100644 index 000000000..31ecdfb1d --- /dev/null +++ b/python/py/LICENSE @@ -0,0 +1,19 @@ + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE. + diff --git a/python/py/MANIFEST.in b/python/py/MANIFEST.in new file mode 100644 index 000000000..31fb010b4 --- /dev/null +++ b/python/py/MANIFEST.in @@ -0,0 +1,9 @@ +include CHANGELOG +include AUTHORS +include README.txt +include setup.py +include LICENSE +include conftest.py +include tox.ini +graft doc +graft testing diff --git a/python/py/PKG-INFO b/python/py/PKG-INFO new file mode 100644 index 000000000..30b14ae88 --- /dev/null +++ b/python/py/PKG-INFO @@ -0,0 +1,46 @@ +Metadata-Version: 1.1 +Name: py +Version: 1.4.31 +Summary: library with cross-python path, ini-parsing, io, code, log facilities +Home-page: http://pylib.readthedocs.org/ +Author: holger krekel, Ronny Pfannschmidt, Benjamin Peterson and others +Author-email: pytest-dev@python.org +License: MIT license +Description: .. image:: https://drone.io/bitbucket.org/pytest-dev/py/status.png
+ :target: https://drone.io/bitbucket.org/pytest-dev/py/latest
+ .. image:: https://pypip.in/v/py/badge.png
+ :target: https://pypi.python.org/pypi/py
+
+ The py lib is a Python development support library featuring
+ the following tools and modules:
+
+ * py.path: uniform local and svn path objects
+ * py.apipkg: explicit API control and lazy-importing
+ * py.iniconfig: easy parsing of .ini files
+ * py.code: dynamic code generation and introspection
+
+ NOTE: prior to the 1.4 release this distribution used to
+ contain py.test which is now its own package, see http://pytest.org
+
+ For questions and more information please visit http://pylib.readthedocs.org
+
+ Bugs and issues: http://bitbucket.org/pytest-dev/py/issues/
+
+ Authors: Holger Krekel and others, 2004-2015
+ +Platform: unix +Platform: linux +Platform: osx +Platform: cygwin +Platform: win32 +Classifier: Development Status :: 6 - Mature +Classifier: Intended Audience :: Developers +Classifier: License :: OSI Approved :: MIT License +Classifier: Operating System :: POSIX +Classifier: Operating System :: Microsoft :: Windows +Classifier: Operating System :: MacOS :: MacOS X +Classifier: Topic :: Software Development :: Testing +Classifier: Topic :: Software Development :: Libraries +Classifier: Topic :: Utilities +Classifier: Programming Language :: Python +Classifier: Programming Language :: Python :: 3 diff --git a/python/py/README.txt b/python/py/README.txt new file mode 100644 index 000000000..e327e9373 --- /dev/null +++ b/python/py/README.txt @@ -0,0 +1,21 @@ +.. image:: https://drone.io/bitbucket.org/pytest-dev/py/status.png
+ :target: https://drone.io/bitbucket.org/pytest-dev/py/latest
+.. image:: https://pypip.in/v/py/badge.png
+ :target: https://pypi.python.org/pypi/py
+
+The py lib is a Python development support library featuring
+the following tools and modules:
+
+* py.path: uniform local and svn path objects
+* py.apipkg: explicit API control and lazy-importing
+* py.iniconfig: easy parsing of .ini files
+* py.code: dynamic code generation and introspection
+
+NOTE: prior to the 1.4 release this distribution used to
+contain py.test which is now its own package, see http://pytest.org
+
+For questions and more information please visit http://pylib.readthedocs.org
+
+Bugs and issues: http://bitbucket.org/pytest-dev/py/issues/
+
+Authors: Holger Krekel and others, 2004-2015
diff --git a/python/py/py/__init__.py b/python/py/py/__init__.py new file mode 100644 index 000000000..bdb9aa218 --- /dev/null +++ b/python/py/py/__init__.py @@ -0,0 +1,150 @@ +""" +py.test and pylib: rapid testing and development utils + +this module uses apipkg.py for lazy-loading sub modules +and classes. The initpkg-dictionary below specifies +name->value mappings where value can be another namespace +dictionary or an import path. + +(c) Holger Krekel and others, 2004-2014 +""" +__version__ = '1.4.31' + +from py import _apipkg + +# so that py.error.* instances are picklable +import sys +sys.modules['py.error'] = _apipkg.AliasModule("py.error", "py._error", 'error') + +_apipkg.initpkg(__name__, attr={'_apipkg': _apipkg}, exportdefs={ + # access to all standard lib modules + 'std': '._std:std', + # access to all posix errno's as classes + 'error': '._error:error', + + '_pydir' : '.__metainfo:pydir', + 'version': 'py:__version__', # backward compatibility + + # pytest-2.0 has a flat namespace, we use alias modules + # to keep old references compatible + 'test' : 'pytest', + 'test.collect' : 'pytest', + 'test.cmdline' : 'pytest', + + # hook into the top-level standard library + 'process' : { + '__doc__' : '._process:__doc__', + 'cmdexec' : '._process.cmdexec:cmdexec', + 'kill' : '._process.killproc:kill', + 'ForkedFunc' : '._process.forkedfunc:ForkedFunc', + }, + + 'apipkg' : { + 'initpkg' : '._apipkg:initpkg', + 'ApiModule' : '._apipkg:ApiModule', + }, + + 'iniconfig' : { + 'IniConfig' : '._iniconfig:IniConfig', + 'ParseError' : '._iniconfig:ParseError', + }, + + 'path' : { + '__doc__' : '._path:__doc__', + 'svnwc' : '._path.svnwc:SvnWCCommandPath', + 'svnurl' : '._path.svnurl:SvnCommandPath', + 'local' : '._path.local:LocalPath', + 'SvnAuth' : '._path.svnwc:SvnAuth', + }, + + # python inspection/code-generation API + 'code' : { + '__doc__' : '._code:__doc__', + 'compile' : '._code.source:compile_', + 'Source' : '._code.source:Source', + 'Code' : '._code.code:Code', + 'Frame' : '._code.code:Frame', + 'ExceptionInfo' : '._code.code:ExceptionInfo', + 'Traceback' : '._code.code:Traceback', + 'getfslineno' : '._code.source:getfslineno', + 'getrawcode' : '._code.code:getrawcode', + 'patch_builtins' : '._code.code:patch_builtins', + 'unpatch_builtins' : '._code.code:unpatch_builtins', + '_AssertionError' : '._code.assertion:AssertionError', + '_reinterpret_old' : '._code.assertion:reinterpret_old', + '_reinterpret' : '._code.assertion:reinterpret', + '_reprcompare' : '._code.assertion:_reprcompare', + '_format_explanation' : '._code.assertion:_format_explanation', + }, + + # backports and additions of builtins + 'builtin' : { + '__doc__' : '._builtin:__doc__', + 'enumerate' : '._builtin:enumerate', + 'reversed' : '._builtin:reversed', + 'sorted' : '._builtin:sorted', + 'any' : '._builtin:any', + 'all' : '._builtin:all', + 'set' : '._builtin:set', + 'frozenset' : '._builtin:frozenset', + 'BaseException' : '._builtin:BaseException', + 'GeneratorExit' : '._builtin:GeneratorExit', + '_sysex' : '._builtin:_sysex', + 'print_' : '._builtin:print_', + '_reraise' : '._builtin:_reraise', + '_tryimport' : '._builtin:_tryimport', + 'exec_' : '._builtin:exec_', + '_basestring' : '._builtin:_basestring', + '_totext' : '._builtin:_totext', + '_isbytes' : '._builtin:_isbytes', + '_istext' : '._builtin:_istext', + '_getimself' : '._builtin:_getimself', + '_getfuncdict' : '._builtin:_getfuncdict', + '_getcode' : '._builtin:_getcode', + 'builtins' : '._builtin:builtins', + 'execfile' : '._builtin:execfile', + 'callable' : '._builtin:callable', + 'bytes' : '._builtin:bytes', + 'text' : '._builtin:text', + }, + + # input-output helping + 'io' : { + '__doc__' : '._io:__doc__', + 'dupfile' : '._io.capture:dupfile', + 'TextIO' : '._io.capture:TextIO', + 'BytesIO' : '._io.capture:BytesIO', + 'FDCapture' : '._io.capture:FDCapture', + 'StdCapture' : '._io.capture:StdCapture', + 'StdCaptureFD' : '._io.capture:StdCaptureFD', + 'TerminalWriter' : '._io.terminalwriter:TerminalWriter', + 'ansi_print' : '._io.terminalwriter:ansi_print', + 'get_terminal_width' : '._io.terminalwriter:get_terminal_width', + 'saferepr' : '._io.saferepr:saferepr', + }, + + # small and mean xml/html generation + 'xml' : { + '__doc__' : '._xmlgen:__doc__', + 'html' : '._xmlgen:html', + 'Tag' : '._xmlgen:Tag', + 'raw' : '._xmlgen:raw', + 'Namespace' : '._xmlgen:Namespace', + 'escape' : '._xmlgen:escape', + }, + + 'log' : { + # logging API ('producers' and 'consumers' connected via keywords) + '__doc__' : '._log:__doc__', + '_apiwarn' : '._log.warning:_apiwarn', + 'Producer' : '._log.log:Producer', + 'setconsumer' : '._log.log:setconsumer', + '_setstate' : '._log.log:setstate', + '_getstate' : '._log.log:getstate', + 'Path' : '._log.log:Path', + 'STDOUT' : '._log.log:STDOUT', + 'STDERR' : '._log.log:STDERR', + 'Syslog' : '._log.log:Syslog', + }, + +}) diff --git a/python/py/py/__metainfo.py b/python/py/py/__metainfo.py new file mode 100644 index 000000000..12581eb7a --- /dev/null +++ b/python/py/py/__metainfo.py @@ -0,0 +1,2 @@ +import py +pydir = py.path.local(py.__file__).dirpath() diff --git a/python/py/py/_apipkg.py b/python/py/py/_apipkg.py new file mode 100644 index 000000000..a73b8f6d0 --- /dev/null +++ b/python/py/py/_apipkg.py @@ -0,0 +1,181 @@ +""" +apipkg: control the exported namespace of a python package. + +see http://pypi.python.org/pypi/apipkg + +(c) holger krekel, 2009 - MIT license +""" +import os +import sys +from types import ModuleType + +__version__ = '1.3.dev' + +def _py_abspath(path): + """ + special version of abspath + that will leave paths from jython jars alone + """ + if path.startswith('__pyclasspath__'): + + return path + else: + return os.path.abspath(path) + +def initpkg(pkgname, exportdefs, attr=dict()): + """ initialize given package from the export definitions. """ + oldmod = sys.modules.get(pkgname) + d = {} + f = getattr(oldmod, '__file__', None) + if f: + f = _py_abspath(f) + d['__file__'] = f + if hasattr(oldmod, '__version__'): + d['__version__'] = oldmod.__version__ + if hasattr(oldmod, '__loader__'): + d['__loader__'] = oldmod.__loader__ + if hasattr(oldmod, '__path__'): + d['__path__'] = [_py_abspath(p) for p in oldmod.__path__] + if '__doc__' not in exportdefs and getattr(oldmod, '__doc__', None): + d['__doc__'] = oldmod.__doc__ + d.update(attr) + if hasattr(oldmod, "__dict__"): + oldmod.__dict__.update(d) + mod = ApiModule(pkgname, exportdefs, implprefix=pkgname, attr=d) + sys.modules[pkgname] = mod + +def importobj(modpath, attrname): + module = __import__(modpath, None, None, ['__doc__']) + if not attrname: + return module + + retval = module + names = attrname.split(".") + for x in names: + retval = getattr(retval, x) + return retval + +class ApiModule(ModuleType): + def __docget(self): + try: + return self.__doc + except AttributeError: + if '__doc__' in self.__map__: + return self.__makeattr('__doc__') + def __docset(self, value): + self.__doc = value + __doc__ = property(__docget, __docset) + + def __init__(self, name, importspec, implprefix=None, attr=None): + self.__name__ = name + self.__all__ = [x for x in importspec if x != '__onfirstaccess__'] + self.__map__ = {} + self.__implprefix__ = implprefix or name + if attr: + for name, val in attr.items(): + # print "setting", self.__name__, name, val + setattr(self, name, val) + for name, importspec in importspec.items(): + if isinstance(importspec, dict): + subname = '%s.%s' % (self.__name__, name) + apimod = ApiModule(subname, importspec, implprefix) + sys.modules[subname] = apimod + setattr(self, name, apimod) + else: + parts = importspec.split(':') + modpath = parts.pop(0) + attrname = parts and parts[0] or "" + if modpath[0] == '.': + modpath = implprefix + modpath + + if not attrname: + subname = '%s.%s' % (self.__name__, name) + apimod = AliasModule(subname, modpath) + sys.modules[subname] = apimod + if '.' not in name: + setattr(self, name, apimod) + else: + self.__map__[name] = (modpath, attrname) + + def __repr__(self): + l = [] + if hasattr(self, '__version__'): + l.append("version=" + repr(self.__version__)) + if hasattr(self, '__file__'): + l.append('from ' + repr(self.__file__)) + if l: + return '<ApiModule %r %s>' % (self.__name__, " ".join(l)) + return '<ApiModule %r>' % (self.__name__,) + + def __makeattr(self, name): + """lazily compute value for name or raise AttributeError if unknown.""" + # print "makeattr", self.__name__, name + target = None + if '__onfirstaccess__' in self.__map__: + target = self.__map__.pop('__onfirstaccess__') + importobj(*target)() + try: + modpath, attrname = self.__map__[name] + except KeyError: + if target is not None and name != '__onfirstaccess__': + # retry, onfirstaccess might have set attrs + return getattr(self, name) + raise AttributeError(name) + else: + result = importobj(modpath, attrname) + setattr(self, name, result) + try: + del self.__map__[name] + except KeyError: + pass # in a recursive-import situation a double-del can happen + return result + + __getattr__ = __makeattr + + def __dict__(self): + # force all the content of the module to be loaded when __dict__ is read + dictdescr = ModuleType.__dict__['__dict__'] + dict = dictdescr.__get__(self) + if dict is not None: + hasattr(self, 'some') + for name in self.__all__: + try: + self.__makeattr(name) + except AttributeError: + pass + return dict + __dict__ = property(__dict__) + + +def AliasModule(modname, modpath, attrname=None): + mod = [] + + def getmod(): + if not mod: + x = importobj(modpath, None) + if attrname is not None: + x = getattr(x, attrname) + mod.append(x) + return mod[0] + + class AliasModule(ModuleType): + + def __repr__(self): + x = modpath + if attrname: + x += "." + attrname + return '<AliasModule %r for %r>' % (modname, x) + + def __getattribute__(self, name): + try: + return getattr(getmod(), name) + except ImportError: + return None + + def __setattr__(self, name, value): + setattr(getmod(), name, value) + + def __delattr__(self, name): + delattr(getmod(), name) + + return AliasModule(str(modname)) diff --git a/python/py/py/_builtin.py b/python/py/py/_builtin.py new file mode 100644 index 000000000..52ee9d79c --- /dev/null +++ b/python/py/py/_builtin.py @@ -0,0 +1,248 @@ +import sys + +try: + reversed = reversed +except NameError: + def reversed(sequence): + """reversed(sequence) -> reverse iterator over values of the sequence + + Return a reverse iterator + """ + if hasattr(sequence, '__reversed__'): + return sequence.__reversed__() + if not hasattr(sequence, '__getitem__'): + raise TypeError("argument to reversed() must be a sequence") + return reversed_iterator(sequence) + + class reversed_iterator(object): + + def __init__(self, seq): + self.seq = seq + self.remaining = len(seq) + + def __iter__(self): + return self + + def next(self): + i = self.remaining + if i > 0: + i -= 1 + item = self.seq[i] + self.remaining = i + return item + raise StopIteration + + def __length_hint__(self): + return self.remaining + +try: + any = any +except NameError: + def any(iterable): + for x in iterable: + if x: + return True + return False + +try: + all = all +except NameError: + def all(iterable): + for x in iterable: + if not x: + return False + return True + +try: + sorted = sorted +except NameError: + builtin_cmp = cmp # need to use cmp as keyword arg + + def sorted(iterable, cmp=None, key=None, reverse=0): + use_cmp = None + if key is not None: + if cmp is None: + def use_cmp(x, y): + return builtin_cmp(x[0], y[0]) + else: + def use_cmp(x, y): + return cmp(x[0], y[0]) + l = [(key(element), element) for element in iterable] + else: + if cmp is not None: + use_cmp = cmp + l = list(iterable) + if use_cmp is not None: + l.sort(use_cmp) + else: + l.sort() + if reverse: + l.reverse() + if key is not None: + return [element for (_, element) in l] + return l + +try: + set, frozenset = set, frozenset +except NameError: + from sets import set, frozenset + +# pass through +enumerate = enumerate + +try: + BaseException = BaseException +except NameError: + BaseException = Exception + +try: + GeneratorExit = GeneratorExit +except NameError: + class GeneratorExit(Exception): + """ This exception is never raised, it is there to make it possible to + write code compatible with CPython 2.5 even in lower CPython + versions.""" + pass + GeneratorExit.__module__ = 'exceptions' + +_sysex = (KeyboardInterrupt, SystemExit, MemoryError, GeneratorExit) + +try: + callable = callable +except NameError: + def callable(obj): + return hasattr(obj, "__call__") + +if sys.version_info >= (3, 0): + exec ("print_ = print ; exec_=exec") + import builtins + + # some backward compatibility helpers + _basestring = str + def _totext(obj, encoding=None, errors=None): + if isinstance(obj, bytes): + if errors is None: + obj = obj.decode(encoding) + else: + obj = obj.decode(encoding, errors) + elif not isinstance(obj, str): + obj = str(obj) + return obj + + def _isbytes(x): + return isinstance(x, bytes) + def _istext(x): + return isinstance(x, str) + + text = str + bytes = bytes + + + def _getimself(function): + return getattr(function, '__self__', None) + + def _getfuncdict(function): + return getattr(function, "__dict__", None) + + def _getcode(function): + return getattr(function, "__code__", None) + + def execfile(fn, globs=None, locs=None): + if globs is None: + back = sys._getframe(1) + globs = back.f_globals + locs = back.f_locals + del back + elif locs is None: + locs = globs + fp = open(fn, "r") + try: + source = fp.read() + finally: + fp.close() + co = compile(source, fn, "exec", dont_inherit=True) + exec_(co, globs, locs) + +else: + import __builtin__ as builtins + _totext = unicode + _basestring = basestring + text = unicode + bytes = str + execfile = execfile + callable = callable + def _isbytes(x): + return isinstance(x, str) + def _istext(x): + return isinstance(x, unicode) + + def _getimself(function): + return getattr(function, 'im_self', None) + + def _getfuncdict(function): + return getattr(function, "__dict__", None) + + def _getcode(function): + try: + return getattr(function, "__code__") + except AttributeError: + return getattr(function, "func_code", None) + + def print_(*args, **kwargs): + """ minimal backport of py3k print statement. """ + sep = ' ' + if 'sep' in kwargs: + sep = kwargs.pop('sep') + end = '\n' + if 'end' in kwargs: + end = kwargs.pop('end') + file = 'file' in kwargs and kwargs.pop('file') or sys.stdout + if kwargs: + args = ", ".join([str(x) for x in kwargs]) + raise TypeError("invalid keyword arguments: %s" % args) + at_start = True + for x in args: + if not at_start: + file.write(sep) + file.write(str(x)) + at_start = False + file.write(end) + + def exec_(obj, globals=None, locals=None): + """ minimal backport of py3k exec statement. """ + __tracebackhide__ = True + if globals is None: + frame = sys._getframe(1) + globals = frame.f_globals + if locals is None: + locals = frame.f_locals + elif locals is None: + locals = globals + exec2(obj, globals, locals) + +if sys.version_info >= (3, 0): + def _reraise(cls, val, tb): + __tracebackhide__ = True + assert hasattr(val, '__traceback__') + raise cls.with_traceback(val, tb) +else: + exec (""" +def _reraise(cls, val, tb): + __tracebackhide__ = True + raise cls, val, tb +def exec2(obj, globals, locals): + __tracebackhide__ = True + exec obj in globals, locals +""") + +def _tryimport(*names): + """ return the first successfully imported module. """ + assert names + for name in names: + try: + __import__(name) + except ImportError: + excinfo = sys.exc_info() + else: + return sys.modules[name] + _reraise(*excinfo) diff --git a/python/py/py/_code/__init__.py b/python/py/py/_code/__init__.py new file mode 100644 index 000000000..f15acf851 --- /dev/null +++ b/python/py/py/_code/__init__.py @@ -0,0 +1 @@ +""" python inspection/code generation API """ diff --git a/python/py/py/_code/_assertionnew.py b/python/py/py/_code/_assertionnew.py new file mode 100644 index 000000000..afb1b31ff --- /dev/null +++ b/python/py/py/_code/_assertionnew.py @@ -0,0 +1,339 @@ +""" +Find intermediate evalutation results in assert statements through builtin AST. +This should replace _assertionold.py eventually. +""" + +import sys +import ast + +import py +from py._code.assertion import _format_explanation, BuiltinAssertionError + + +if sys.platform.startswith("java") and sys.version_info < (2, 5, 2): + # See http://bugs.jython.org/issue1497 + _exprs = ("BoolOp", "BinOp", "UnaryOp", "Lambda", "IfExp", "Dict", + "ListComp", "GeneratorExp", "Yield", "Compare", "Call", + "Repr", "Num", "Str", "Attribute", "Subscript", "Name", + "List", "Tuple") + _stmts = ("FunctionDef", "ClassDef", "Return", "Delete", "Assign", + "AugAssign", "Print", "For", "While", "If", "With", "Raise", + "TryExcept", "TryFinally", "Assert", "Import", "ImportFrom", + "Exec", "Global", "Expr", "Pass", "Break", "Continue") + _expr_nodes = set(getattr(ast, name) for name in _exprs) + _stmt_nodes = set(getattr(ast, name) for name in _stmts) + def _is_ast_expr(node): + return node.__class__ in _expr_nodes + def _is_ast_stmt(node): + return node.__class__ in _stmt_nodes +else: + def _is_ast_expr(node): + return isinstance(node, ast.expr) + def _is_ast_stmt(node): + return isinstance(node, ast.stmt) + + +class Failure(Exception): + """Error found while interpreting AST.""" + + def __init__(self, explanation=""): + self.cause = sys.exc_info() + self.explanation = explanation + + +def interpret(source, frame, should_fail=False): + mod = ast.parse(source) + visitor = DebugInterpreter(frame) + try: + visitor.visit(mod) + except Failure: + failure = sys.exc_info()[1] + return getfailure(failure) + if should_fail: + return ("(assertion failed, but when it was re-run for " + "printing intermediate values, it did not fail. Suggestions: " + "compute assert expression before the assert or use --no-assert)") + +def run(offending_line, frame=None): + if frame is None: + frame = py.code.Frame(sys._getframe(1)) + return interpret(offending_line, frame) + +def getfailure(failure): + explanation = _format_explanation(failure.explanation) + value = failure.cause[1] + if str(value): + lines = explanation.splitlines() + if not lines: + lines.append("") + lines[0] += " << %s" % (value,) + explanation = "\n".join(lines) + text = "%s: %s" % (failure.cause[0].__name__, explanation) + if text.startswith("AssertionError: assert "): + text = text[16:] + return text + + +operator_map = { + ast.BitOr : "|", + ast.BitXor : "^", + ast.BitAnd : "&", + ast.LShift : "<<", + ast.RShift : ">>", + ast.Add : "+", + ast.Sub : "-", + ast.Mult : "*", + ast.Div : "/", + ast.FloorDiv : "//", + ast.Mod : "%", + ast.Eq : "==", + ast.NotEq : "!=", + ast.Lt : "<", + ast.LtE : "<=", + ast.Gt : ">", + ast.GtE : ">=", + ast.Pow : "**", + ast.Is : "is", + ast.IsNot : "is not", + ast.In : "in", + ast.NotIn : "not in" +} + +unary_map = { + ast.Not : "not %s", + ast.Invert : "~%s", + ast.USub : "-%s", + ast.UAdd : "+%s" +} + + +class DebugInterpreter(ast.NodeVisitor): + """Interpret AST nodes to gleam useful debugging information. """ + + def __init__(self, frame): + self.frame = frame + + def generic_visit(self, node): + # Fallback when we don't have a special implementation. + if _is_ast_expr(node): + mod = ast.Expression(node) + co = self._compile(mod) + try: + result = self.frame.eval(co) + except Exception: + raise Failure() + explanation = self.frame.repr(result) + return explanation, result + elif _is_ast_stmt(node): + mod = ast.Module([node]) + co = self._compile(mod, "exec") + try: + self.frame.exec_(co) + except Exception: + raise Failure() + return None, None + else: + raise AssertionError("can't handle %s" %(node,)) + + def _compile(self, source, mode="eval"): + return compile(source, "<assertion interpretation>", mode) + + def visit_Expr(self, expr): + return self.visit(expr.value) + + def visit_Module(self, mod): + for stmt in mod.body: + self.visit(stmt) + + def visit_Name(self, name): + explanation, result = self.generic_visit(name) + # See if the name is local. + source = "%r in locals() is not globals()" % (name.id,) + co = self._compile(source) + try: + local = self.frame.eval(co) + except Exception: + # have to assume it isn't + local = False + if not local: + return name.id, result + return explanation, result + + def visit_Compare(self, comp): + left = comp.left + left_explanation, left_result = self.visit(left) + for op, next_op in zip(comp.ops, comp.comparators): + next_explanation, next_result = self.visit(next_op) + op_symbol = operator_map[op.__class__] + explanation = "%s %s %s" % (left_explanation, op_symbol, + next_explanation) + source = "__exprinfo_left %s __exprinfo_right" % (op_symbol,) + co = self._compile(source) + try: + result = self.frame.eval(co, __exprinfo_left=left_result, + __exprinfo_right=next_result) + except Exception: + raise Failure(explanation) + try: + if not result: + break + except KeyboardInterrupt: + raise + except: + break + left_explanation, left_result = next_explanation, next_result + + rcomp = py.code._reprcompare + if rcomp: + res = rcomp(op_symbol, left_result, next_result) + if res: + explanation = res + return explanation, result + + def visit_BoolOp(self, boolop): + is_or = isinstance(boolop.op, ast.Or) + explanations = [] + for operand in boolop.values: + explanation, result = self.visit(operand) + explanations.append(explanation) + if result == is_or: + break + name = is_or and " or " or " and " + explanation = "(" + name.join(explanations) + ")" + return explanation, result + + def visit_UnaryOp(self, unary): + pattern = unary_map[unary.op.__class__] + operand_explanation, operand_result = self.visit(unary.operand) + explanation = pattern % (operand_explanation,) + co = self._compile(pattern % ("__exprinfo_expr",)) + try: + result = self.frame.eval(co, __exprinfo_expr=operand_result) + except Exception: + raise Failure(explanation) + return explanation, result + + def visit_BinOp(self, binop): + left_explanation, left_result = self.visit(binop.left) + right_explanation, right_result = self.visit(binop.right) + symbol = operator_map[binop.op.__class__] + explanation = "(%s %s %s)" % (left_explanation, symbol, + right_explanation) + source = "__exprinfo_left %s __exprinfo_right" % (symbol,) + co = self._compile(source) + try: + result = self.frame.eval(co, __exprinfo_left=left_result, + __exprinfo_right=right_result) + except Exception: + raise Failure(explanation) + return explanation, result + + def visit_Call(self, call): + func_explanation, func = self.visit(call.func) + arg_explanations = [] + ns = {"__exprinfo_func" : func} + arguments = [] + for arg in call.args: + arg_explanation, arg_result = self.visit(arg) + arg_name = "__exprinfo_%s" % (len(ns),) + ns[arg_name] = arg_result + arguments.append(arg_name) + arg_explanations.append(arg_explanation) + for keyword in call.keywords: + arg_explanation, arg_result = self.visit(keyword.value) + arg_name = "__exprinfo_%s" % (len(ns),) + ns[arg_name] = arg_result + keyword_source = "%s=%%s" % (keyword.arg) + arguments.append(keyword_source % (arg_name,)) + arg_explanations.append(keyword_source % (arg_explanation,)) + if call.starargs: + arg_explanation, arg_result = self.visit(call.starargs) + arg_name = "__exprinfo_star" + ns[arg_name] = arg_result + arguments.append("*%s" % (arg_name,)) + arg_explanations.append("*%s" % (arg_explanation,)) + if call.kwargs: + arg_explanation, arg_result = self.visit(call.kwargs) + arg_name = "__exprinfo_kwds" + ns[arg_name] = arg_result + arguments.append("**%s" % (arg_name,)) + arg_explanations.append("**%s" % (arg_explanation,)) + args_explained = ", ".join(arg_explanations) + explanation = "%s(%s)" % (func_explanation, args_explained) + args = ", ".join(arguments) + source = "__exprinfo_func(%s)" % (args,) + co = self._compile(source) + try: + result = self.frame.eval(co, **ns) + except Exception: + raise Failure(explanation) + pattern = "%s\n{%s = %s\n}" + rep = self.frame.repr(result) + explanation = pattern % (rep, rep, explanation) + return explanation, result + + def _is_builtin_name(self, name): + pattern = "%r not in globals() and %r not in locals()" + source = pattern % (name.id, name.id) + co = self._compile(source) + try: + return self.frame.eval(co) + except Exception: + return False + + def visit_Attribute(self, attr): + if not isinstance(attr.ctx, ast.Load): + return self.generic_visit(attr) + source_explanation, source_result = self.visit(attr.value) + explanation = "%s.%s" % (source_explanation, attr.attr) + source = "__exprinfo_expr.%s" % (attr.attr,) + co = self._compile(source) + try: + result = self.frame.eval(co, __exprinfo_expr=source_result) + except Exception: + raise Failure(explanation) + explanation = "%s\n{%s = %s.%s\n}" % (self.frame.repr(result), + self.frame.repr(result), + source_explanation, attr.attr) + # Check if the attr is from an instance. + source = "%r in getattr(__exprinfo_expr, '__dict__', {})" + source = source % (attr.attr,) + co = self._compile(source) + try: + from_instance = self.frame.eval(co, __exprinfo_expr=source_result) + except Exception: + from_instance = True + if from_instance: + rep = self.frame.repr(result) + pattern = "%s\n{%s = %s\n}" + explanation = pattern % (rep, rep, explanation) + return explanation, result + + def visit_Assert(self, assrt): + test_explanation, test_result = self.visit(assrt.test) + if test_explanation.startswith("False\n{False =") and \ + test_explanation.endswith("\n"): + test_explanation = test_explanation[15:-2] + explanation = "assert %s" % (test_explanation,) + if not test_result: + try: + raise BuiltinAssertionError + except Exception: + raise Failure(explanation) + return explanation, test_result + + def visit_Assign(self, assign): + value_explanation, value_result = self.visit(assign.value) + explanation = "... = %s" % (value_explanation,) + name = ast.Name("__exprinfo_expr", ast.Load(), + lineno=assign.value.lineno, + col_offset=assign.value.col_offset) + new_assign = ast.Assign(assign.targets, name, lineno=assign.lineno, + col_offset=assign.col_offset) + mod = ast.Module([new_assign]) + co = self._compile(mod, "exec") + try: + self.frame.exec_(co, __exprinfo_expr=value_result) + except Exception: + raise Failure(explanation) + return explanation, value_result diff --git a/python/py/py/_code/_assertionold.py b/python/py/py/_code/_assertionold.py new file mode 100644 index 000000000..4e81fb3ef --- /dev/null +++ b/python/py/py/_code/_assertionold.py @@ -0,0 +1,555 @@ +import py +import sys, inspect +from compiler import parse, ast, pycodegen +from py._code.assertion import BuiltinAssertionError, _format_explanation + +passthroughex = py.builtin._sysex + +class Failure: + def __init__(self, node): + self.exc, self.value, self.tb = sys.exc_info() + self.node = node + +class View(object): + """View base class. + + If C is a subclass of View, then C(x) creates a proxy object around + the object x. The actual class of the proxy is not C in general, + but a *subclass* of C determined by the rules below. To avoid confusion + we call view class the class of the proxy (a subclass of C, so of View) + and object class the class of x. + + Attributes and methods not found in the proxy are automatically read on x. + Other operations like setting attributes are performed on the proxy, as + determined by its view class. The object x is available from the proxy + as its __obj__ attribute. + + The view class selection is determined by the __view__ tuples and the + optional __viewkey__ method. By default, the selected view class is the + most specific subclass of C whose __view__ mentions the class of x. + If no such subclass is found, the search proceeds with the parent + object classes. For example, C(True) will first look for a subclass + of C with __view__ = (..., bool, ...) and only if it doesn't find any + look for one with __view__ = (..., int, ...), and then ..., object,... + If everything fails the class C itself is considered to be the default. + + Alternatively, the view class selection can be driven by another aspect + of the object x, instead of the class of x, by overriding __viewkey__. + See last example at the end of this module. + """ + + _viewcache = {} + __view__ = () + + def __new__(rootclass, obj, *args, **kwds): + self = object.__new__(rootclass) + self.__obj__ = obj + self.__rootclass__ = rootclass + key = self.__viewkey__() + try: + self.__class__ = self._viewcache[key] + except KeyError: + self.__class__ = self._selectsubclass(key) + return self + + def __getattr__(self, attr): + # attributes not found in the normal hierarchy rooted on View + # are looked up in the object's real class + return getattr(self.__obj__, attr) + + def __viewkey__(self): + return self.__obj__.__class__ + + def __matchkey__(self, key, subclasses): + if inspect.isclass(key): + keys = inspect.getmro(key) + else: + keys = [key] + for key in keys: + result = [C for C in subclasses if key in C.__view__] + if result: + return result + return [] + + def _selectsubclass(self, key): + subclasses = list(enumsubclasses(self.__rootclass__)) + for C in subclasses: + if not isinstance(C.__view__, tuple): + C.__view__ = (C.__view__,) + choices = self.__matchkey__(key, subclasses) + if not choices: + return self.__rootclass__ + elif len(choices) == 1: + return choices[0] + else: + # combine the multiple choices + return type('?', tuple(choices), {}) + + def __repr__(self): + return '%s(%r)' % (self.__rootclass__.__name__, self.__obj__) + + +def enumsubclasses(cls): + for subcls in cls.__subclasses__(): + for subsubclass in enumsubclasses(subcls): + yield subsubclass + yield cls + + +class Interpretable(View): + """A parse tree node with a few extra methods.""" + explanation = None + + def is_builtin(self, frame): + return False + + def eval(self, frame): + # fall-back for unknown expression nodes + try: + expr = ast.Expression(self.__obj__) + expr.filename = '<eval>' + self.__obj__.filename = '<eval>' + co = pycodegen.ExpressionCodeGenerator(expr).getCode() + result = frame.eval(co) + except passthroughex: + raise + except: + raise Failure(self) + self.result = result + self.explanation = self.explanation or frame.repr(self.result) + + def run(self, frame): + # fall-back for unknown statement nodes + try: + expr = ast.Module(None, ast.Stmt([self.__obj__])) + expr.filename = '<run>' + co = pycodegen.ModuleCodeGenerator(expr).getCode() + frame.exec_(co) + except passthroughex: + raise + except: + raise Failure(self) + + def nice_explanation(self): + return _format_explanation(self.explanation) + + +class Name(Interpretable): + __view__ = ast.Name + + def is_local(self, frame): + source = '%r in locals() is not globals()' % self.name + try: + return frame.is_true(frame.eval(source)) + except passthroughex: + raise + except: + return False + + def is_global(self, frame): + source = '%r in globals()' % self.name + try: + return frame.is_true(frame.eval(source)) + except passthroughex: + raise + except: + return False + + def is_builtin(self, frame): + source = '%r not in locals() and %r not in globals()' % ( + self.name, self.name) + try: + return frame.is_true(frame.eval(source)) + except passthroughex: + raise + except: + return False + + def eval(self, frame): + super(Name, self).eval(frame) + if not self.is_local(frame): + self.explanation = self.name + +class Compare(Interpretable): + __view__ = ast.Compare + + def eval(self, frame): + expr = Interpretable(self.expr) + expr.eval(frame) + for operation, expr2 in self.ops: + if hasattr(self, 'result'): + # shortcutting in chained expressions + if not frame.is_true(self.result): + break + expr2 = Interpretable(expr2) + expr2.eval(frame) + self.explanation = "%s %s %s" % ( + expr.explanation, operation, expr2.explanation) + source = "__exprinfo_left %s __exprinfo_right" % operation + try: + self.result = frame.eval(source, + __exprinfo_left=expr.result, + __exprinfo_right=expr2.result) + except passthroughex: + raise + except: + raise Failure(self) + expr = expr2 + +class And(Interpretable): + __view__ = ast.And + + def eval(self, frame): + explanations = [] + for expr in self.nodes: + expr = Interpretable(expr) + expr.eval(frame) + explanations.append(expr.explanation) + self.result = expr.result + if not frame.is_true(expr.result): + break + self.explanation = '(' + ' and '.join(explanations) + ')' + +class Or(Interpretable): + __view__ = ast.Or + + def eval(self, frame): + explanations = [] + for expr in self.nodes: + expr = Interpretable(expr) + expr.eval(frame) + explanations.append(expr.explanation) + self.result = expr.result + if frame.is_true(expr.result): + break + self.explanation = '(' + ' or '.join(explanations) + ')' + + +# == Unary operations == +keepalive = [] +for astclass, astpattern in { + ast.Not : 'not __exprinfo_expr', + ast.Invert : '(~__exprinfo_expr)', + }.items(): + + class UnaryArith(Interpretable): + __view__ = astclass + + def eval(self, frame, astpattern=astpattern): + expr = Interpretable(self.expr) + expr.eval(frame) + self.explanation = astpattern.replace('__exprinfo_expr', + expr.explanation) + try: + self.result = frame.eval(astpattern, + __exprinfo_expr=expr.result) + except passthroughex: + raise + except: + raise Failure(self) + + keepalive.append(UnaryArith) + +# == Binary operations == +for astclass, astpattern in { + ast.Add : '(__exprinfo_left + __exprinfo_right)', + ast.Sub : '(__exprinfo_left - __exprinfo_right)', + ast.Mul : '(__exprinfo_left * __exprinfo_right)', + ast.Div : '(__exprinfo_left / __exprinfo_right)', + ast.Mod : '(__exprinfo_left % __exprinfo_right)', + ast.Power : '(__exprinfo_left ** __exprinfo_right)', + }.items(): + + class BinaryArith(Interpretable): + __view__ = astclass + + def eval(self, frame, astpattern=astpattern): + left = Interpretable(self.left) + left.eval(frame) + right = Interpretable(self.right) + right.eval(frame) + self.explanation = (astpattern + .replace('__exprinfo_left', left .explanation) + .replace('__exprinfo_right', right.explanation)) + try: + self.result = frame.eval(astpattern, + __exprinfo_left=left.result, + __exprinfo_right=right.result) + except passthroughex: + raise + except: + raise Failure(self) + + keepalive.append(BinaryArith) + + +class CallFunc(Interpretable): + __view__ = ast.CallFunc + + def is_bool(self, frame): + source = 'isinstance(__exprinfo_value, bool)' + try: + return frame.is_true(frame.eval(source, + __exprinfo_value=self.result)) + except passthroughex: + raise + except: + return False + + def eval(self, frame): + node = Interpretable(self.node) + node.eval(frame) + explanations = [] + vars = {'__exprinfo_fn': node.result} + source = '__exprinfo_fn(' + for a in self.args: + if isinstance(a, ast.Keyword): + keyword = a.name + a = a.expr + else: + keyword = None + a = Interpretable(a) + a.eval(frame) + argname = '__exprinfo_%d' % len(vars) + vars[argname] = a.result + if keyword is None: + source += argname + ',' + explanations.append(a.explanation) + else: + source += '%s=%s,' % (keyword, argname) + explanations.append('%s=%s' % (keyword, a.explanation)) + if self.star_args: + star_args = Interpretable(self.star_args) + star_args.eval(frame) + argname = '__exprinfo_star' + vars[argname] = star_args.result + source += '*' + argname + ',' + explanations.append('*' + star_args.explanation) + if self.dstar_args: + dstar_args = Interpretable(self.dstar_args) + dstar_args.eval(frame) + argname = '__exprinfo_kwds' + vars[argname] = dstar_args.result + source += '**' + argname + ',' + explanations.append('**' + dstar_args.explanation) + self.explanation = "%s(%s)" % ( + node.explanation, ', '.join(explanations)) + if source.endswith(','): + source = source[:-1] + source += ')' + try: + self.result = frame.eval(source, **vars) + except passthroughex: + raise + except: + raise Failure(self) + if not node.is_builtin(frame) or not self.is_bool(frame): + r = frame.repr(self.result) + self.explanation = '%s\n{%s = %s\n}' % (r, r, self.explanation) + +class Getattr(Interpretable): + __view__ = ast.Getattr + + def eval(self, frame): + expr = Interpretable(self.expr) + expr.eval(frame) + source = '__exprinfo_expr.%s' % self.attrname + try: + self.result = frame.eval(source, __exprinfo_expr=expr.result) + except passthroughex: + raise + except: + raise Failure(self) + self.explanation = '%s.%s' % (expr.explanation, self.attrname) + # if the attribute comes from the instance, its value is interesting + source = ('hasattr(__exprinfo_expr, "__dict__") and ' + '%r in __exprinfo_expr.__dict__' % self.attrname) + try: + from_instance = frame.is_true( + frame.eval(source, __exprinfo_expr=expr.result)) + except passthroughex: + raise + except: + from_instance = True + if from_instance: + r = frame.repr(self.result) + self.explanation = '%s\n{%s = %s\n}' % (r, r, self.explanation) + +# == Re-interpretation of full statements == + +class Assert(Interpretable): + __view__ = ast.Assert + + def run(self, frame): + test = Interpretable(self.test) + test.eval(frame) + # simplify 'assert False where False = ...' + if (test.explanation.startswith('False\n{False = ') and + test.explanation.endswith('\n}')): + test.explanation = test.explanation[15:-2] + # print the result as 'assert <explanation>' + self.result = test.result + self.explanation = 'assert ' + test.explanation + if not frame.is_true(test.result): + try: + raise BuiltinAssertionError + except passthroughex: + raise + except: + raise Failure(self) + +class Assign(Interpretable): + __view__ = ast.Assign + + def run(self, frame): + expr = Interpretable(self.expr) + expr.eval(frame) + self.result = expr.result + self.explanation = '... = ' + expr.explanation + # fall-back-run the rest of the assignment + ass = ast.Assign(self.nodes, ast.Name('__exprinfo_expr')) + mod = ast.Module(None, ast.Stmt([ass])) + mod.filename = '<run>' + co = pycodegen.ModuleCodeGenerator(mod).getCode() + try: + frame.exec_(co, __exprinfo_expr=expr.result) + except passthroughex: + raise + except: + raise Failure(self) + +class Discard(Interpretable): + __view__ = ast.Discard + + def run(self, frame): + expr = Interpretable(self.expr) + expr.eval(frame) + self.result = expr.result + self.explanation = expr.explanation + +class Stmt(Interpretable): + __view__ = ast.Stmt + + def run(self, frame): + for stmt in self.nodes: + stmt = Interpretable(stmt) + stmt.run(frame) + + +def report_failure(e): + explanation = e.node.nice_explanation() + if explanation: + explanation = ", in: " + explanation + else: + explanation = "" + sys.stdout.write("%s: %s%s\n" % (e.exc.__name__, e.value, explanation)) + +def check(s, frame=None): + if frame is None: + frame = sys._getframe(1) + frame = py.code.Frame(frame) + expr = parse(s, 'eval') + assert isinstance(expr, ast.Expression) + node = Interpretable(expr.node) + try: + node.eval(frame) + except passthroughex: + raise + except Failure: + e = sys.exc_info()[1] + report_failure(e) + else: + if not frame.is_true(node.result): + sys.stderr.write("assertion failed: %s\n" % node.nice_explanation()) + + +########################################################### +# API / Entry points +# ######################################################### + +def interpret(source, frame, should_fail=False): + module = Interpretable(parse(source, 'exec').node) + #print "got module", module + if isinstance(frame, py.std.types.FrameType): + frame = py.code.Frame(frame) + try: + module.run(frame) + except Failure: + e = sys.exc_info()[1] + return getfailure(e) + except passthroughex: + raise + except: + import traceback + traceback.print_exc() + if should_fail: + return ("(assertion failed, but when it was re-run for " + "printing intermediate values, it did not fail. Suggestions: " + "compute assert expression before the assert or use --nomagic)") + else: + return None + +def getmsg(excinfo): + if isinstance(excinfo, tuple): + excinfo = py.code.ExceptionInfo(excinfo) + #frame, line = gettbline(tb) + #frame = py.code.Frame(frame) + #return interpret(line, frame) + + tb = excinfo.traceback[-1] + source = str(tb.statement).strip() + x = interpret(source, tb.frame, should_fail=True) + if not isinstance(x, str): + raise TypeError("interpret returned non-string %r" % (x,)) + return x + +def getfailure(e): + explanation = e.node.nice_explanation() + if str(e.value): + lines = explanation.split('\n') + lines[0] += " << %s" % (e.value,) + explanation = '\n'.join(lines) + text = "%s: %s" % (e.exc.__name__, explanation) + if text.startswith('AssertionError: assert '): + text = text[16:] + return text + +def run(s, frame=None): + if frame is None: + frame = sys._getframe(1) + frame = py.code.Frame(frame) + module = Interpretable(parse(s, 'exec').node) + try: + module.run(frame) + except Failure: + e = sys.exc_info()[1] + report_failure(e) + + +if __name__ == '__main__': + # example: + def f(): + return 5 + def g(): + return 3 + def h(x): + return 'never' + check("f() * g() == 5") + check("not f()") + check("not (f() and g() or 0)") + check("f() == g()") + i = 4 + check("i == f()") + check("len(f()) == 0") + check("isinstance(2+3+4, float)") + + run("x = i") + check("x == 5") + + run("assert not f(), 'oops'") + run("a, b, c = 1, 2") + run("a, b, c = f()") + + check("max([f(),g()]) == 4") + check("'hello'[g()] == 'h'") + run("'guk%d' % h(f())") diff --git a/python/py/py/_code/_py2traceback.py b/python/py/py/_code/_py2traceback.py new file mode 100644 index 000000000..d65e27cb7 --- /dev/null +++ b/python/py/py/_code/_py2traceback.py @@ -0,0 +1,79 @@ +# copied from python-2.7.3's traceback.py +# CHANGES: +# - some_str is replaced, trying to create unicode strings +# +import types + +def format_exception_only(etype, value): + """Format the exception part of a traceback. + + The arguments are the exception type and value such as given by + sys.last_type and sys.last_value. The return value is a list of + strings, each ending in a newline. + + Normally, the list contains a single string; however, for + SyntaxError exceptions, it contains several lines that (when + printed) display detailed information about where the syntax + error occurred. + + The message indicating which exception occurred is always the last + string in the list. + + """ + + # An instance should not have a meaningful value parameter, but + # sometimes does, particularly for string exceptions, such as + # >>> raise string1, string2 # deprecated + # + # Clear these out first because issubtype(string1, SyntaxError) + # would throw another exception and mask the original problem. + if (isinstance(etype, BaseException) or + isinstance(etype, types.InstanceType) or + etype is None or type(etype) is str): + return [_format_final_exc_line(etype, value)] + + stype = etype.__name__ + + if not issubclass(etype, SyntaxError): + return [_format_final_exc_line(stype, value)] + + # It was a syntax error; show exactly where the problem was found. + lines = [] + try: + msg, (filename, lineno, offset, badline) = value.args + except Exception: + pass + else: + filename = filename or "<string>" + lines.append(' File "%s", line %d\n' % (filename, lineno)) + if badline is not None: + lines.append(' %s\n' % badline.strip()) + if offset is not None: + caretspace = badline.rstrip('\n')[:offset].lstrip() + # non-space whitespace (likes tabs) must be kept for alignment + caretspace = ((c.isspace() and c or ' ') for c in caretspace) + # only three spaces to account for offset1 == pos 0 + lines.append(' %s^\n' % ''.join(caretspace)) + value = msg + + lines.append(_format_final_exc_line(stype, value)) + return lines + +def _format_final_exc_line(etype, value): + """Return a list of a single line -- normal case for format_exception_only""" + valuestr = _some_str(value) + if value is None or not valuestr: + line = "%s\n" % etype + else: + line = "%s: %s\n" % (etype, valuestr) + return line + +def _some_str(value): + try: + return unicode(value) + except Exception: + try: + return str(value) + except Exception: + pass + return '<unprintable %s object>' % type(value).__name__ diff --git a/python/py/py/_code/assertion.py b/python/py/py/_code/assertion.py new file mode 100644 index 000000000..4ce80c75b --- /dev/null +++ b/python/py/py/_code/assertion.py @@ -0,0 +1,94 @@ +import sys +import py + +BuiltinAssertionError = py.builtin.builtins.AssertionError + +_reprcompare = None # if set, will be called by assert reinterp for comparison ops + +def _format_explanation(explanation): + """This formats an explanation + + Normally all embedded newlines are escaped, however there are + three exceptions: \n{, \n} and \n~. The first two are intended + cover nested explanations, see function and attribute explanations + for examples (.visit_Call(), visit_Attribute()). The last one is + for when one explanation needs to span multiple lines, e.g. when + displaying diffs. + """ + raw_lines = (explanation or '').split('\n') + # escape newlines not followed by {, } and ~ + lines = [raw_lines[0]] + for l in raw_lines[1:]: + if l.startswith('{') or l.startswith('}') or l.startswith('~'): + lines.append(l) + else: + lines[-1] += '\\n' + l + + result = lines[:1] + stack = [0] + stackcnt = [0] + for line in lines[1:]: + if line.startswith('{'): + if stackcnt[-1]: + s = 'and ' + else: + s = 'where ' + stack.append(len(result)) + stackcnt[-1] += 1 + stackcnt.append(0) + result.append(' +' + ' '*(len(stack)-1) + s + line[1:]) + elif line.startswith('}'): + assert line.startswith('}') + stack.pop() + stackcnt.pop() + result[stack[-1]] += line[1:] + else: + assert line.startswith('~') + result.append(' '*len(stack) + line[1:]) + assert len(stack) == 1 + return '\n'.join(result) + + +class AssertionError(BuiltinAssertionError): + def __init__(self, *args): + BuiltinAssertionError.__init__(self, *args) + if args: + try: + self.msg = str(args[0]) + except py.builtin._sysex: + raise + except: + self.msg = "<[broken __repr__] %s at %0xd>" %( + args[0].__class__, id(args[0])) + else: + f = py.code.Frame(sys._getframe(1)) + try: + source = f.code.fullsource + if source is not None: + try: + source = source.getstatement(f.lineno, assertion=True) + except IndexError: + source = None + else: + source = str(source.deindent()).strip() + except py.error.ENOENT: + source = None + # this can also occur during reinterpretation, when the + # co_filename is set to "<run>". + if source: + self.msg = reinterpret(source, f, should_fail=True) + else: + self.msg = "<could not determine information>" + if not self.args: + self.args = (self.msg,) + +if sys.version_info > (3, 0): + AssertionError.__module__ = "builtins" + reinterpret_old = "old reinterpretation not available for py3" +else: + from py._code._assertionold import interpret as reinterpret_old +if sys.version_info >= (2, 6) or (sys.platform.startswith("java")): + from py._code._assertionnew import interpret as reinterpret +else: + reinterpret = reinterpret_old + diff --git a/python/py/py/_code/code.py b/python/py/py/_code/code.py new file mode 100644 index 000000000..f14c562a2 --- /dev/null +++ b/python/py/py/_code/code.py @@ -0,0 +1,787 @@ +import py +import sys +from inspect import CO_VARARGS, CO_VARKEYWORDS + +builtin_repr = repr + +reprlib = py.builtin._tryimport('repr', 'reprlib') + +if sys.version_info[0] >= 3: + from traceback import format_exception_only +else: + from py._code._py2traceback import format_exception_only + +class Code(object): + """ wrapper around Python code objects """ + def __init__(self, rawcode): + if not hasattr(rawcode, "co_filename"): + rawcode = py.code.getrawcode(rawcode) + try: + self.filename = rawcode.co_filename + self.firstlineno = rawcode.co_firstlineno - 1 + self.name = rawcode.co_name + except AttributeError: + raise TypeError("not a code object: %r" %(rawcode,)) + self.raw = rawcode + + def __eq__(self, other): + return self.raw == other.raw + + def __ne__(self, other): + return not self == other + + @property + def path(self): + """ return a path object pointing to source code (note that it + might not point to an actually existing file). """ + p = py.path.local(self.raw.co_filename) + # maybe don't try this checking + if not p.check(): + # XXX maybe try harder like the weird logic + # in the standard lib [linecache.updatecache] does? + p = self.raw.co_filename + return p + + @property + def fullsource(self): + """ return a py.code.Source object for the full source file of the code + """ + from py._code import source + full, _ = source.findsource(self.raw) + return full + + def source(self): + """ return a py.code.Source object for the code object's source only + """ + # return source only for that part of code + return py.code.Source(self.raw) + + def getargs(self, var=False): + """ return a tuple with the argument names for the code object + + if 'var' is set True also return the names of the variable and + keyword arguments when present + """ + # handfull shortcut for getting args + raw = self.raw + argcount = raw.co_argcount + if var: + argcount += raw.co_flags & CO_VARARGS + argcount += raw.co_flags & CO_VARKEYWORDS + return raw.co_varnames[:argcount] + +class Frame(object): + """Wrapper around a Python frame holding f_locals and f_globals + in which expressions can be evaluated.""" + + def __init__(self, frame): + self.lineno = frame.f_lineno - 1 + self.f_globals = frame.f_globals + self.f_locals = frame.f_locals + self.raw = frame + self.code = py.code.Code(frame.f_code) + + @property + def statement(self): + """ statement this frame is at """ + if self.code.fullsource is None: + return py.code.Source("") + return self.code.fullsource.getstatement(self.lineno) + + def eval(self, code, **vars): + """ evaluate 'code' in the frame + + 'vars' are optional additional local variables + + returns the result of the evaluation + """ + f_locals = self.f_locals.copy() + f_locals.update(vars) + return eval(code, self.f_globals, f_locals) + + def exec_(self, code, **vars): + """ exec 'code' in the frame + + 'vars' are optiona; additional local variables + """ + f_locals = self.f_locals.copy() + f_locals.update(vars) + py.builtin.exec_(code, self.f_globals, f_locals ) + + def repr(self, object): + """ return a 'safe' (non-recursive, one-line) string repr for 'object' + """ + return py.io.saferepr(object) + + def is_true(self, object): + return object + + def getargs(self, var=False): + """ return a list of tuples (name, value) for all arguments + + if 'var' is set True also include the variable and keyword + arguments when present + """ + retval = [] + for arg in self.code.getargs(var): + try: + retval.append((arg, self.f_locals[arg])) + except KeyError: + pass # this can occur when using Psyco + return retval + +class TracebackEntry(object): + """ a single entry in a traceback """ + + _repr_style = None + exprinfo = None + + def __init__(self, rawentry): + self._rawentry = rawentry + self.lineno = rawentry.tb_lineno - 1 + + def set_repr_style(self, mode): + assert mode in ("short", "long") + self._repr_style = mode + + @property + def frame(self): + return py.code.Frame(self._rawentry.tb_frame) + + @property + def relline(self): + return self.lineno - self.frame.code.firstlineno + + def __repr__(self): + return "<TracebackEntry %s:%d>" %(self.frame.code.path, self.lineno+1) + + @property + def statement(self): + """ py.code.Source object for the current statement """ + source = self.frame.code.fullsource + return source.getstatement(self.lineno) + + @property + def path(self): + """ path to the source code """ + return self.frame.code.path + + def getlocals(self): + return self.frame.f_locals + locals = property(getlocals, None, None, "locals of underlaying frame") + + def reinterpret(self): + """Reinterpret the failing statement and returns a detailed information + about what operations are performed.""" + if self.exprinfo is None: + source = str(self.statement).strip() + x = py.code._reinterpret(source, self.frame, should_fail=True) + if not isinstance(x, str): + raise TypeError("interpret returned non-string %r" % (x,)) + self.exprinfo = x + return self.exprinfo + + def getfirstlinesource(self): + # on Jython this firstlineno can be -1 apparently + return max(self.frame.code.firstlineno, 0) + + def getsource(self, astcache=None): + """ return failing source code. """ + # we use the passed in astcache to not reparse asttrees + # within exception info printing + from py._code.source import getstatementrange_ast + source = self.frame.code.fullsource + if source is None: + return None + key = astnode = None + if astcache is not None: + key = self.frame.code.path + if key is not None: + astnode = astcache.get(key, None) + start = self.getfirstlinesource() + try: + astnode, _, end = getstatementrange_ast(self.lineno, source, + astnode=astnode) + except SyntaxError: + end = self.lineno + 1 + else: + if key is not None: + astcache[key] = astnode + return source[start:end] + + source = property(getsource) + + def ishidden(self): + """ return True if the current frame has a var __tracebackhide__ + resolving to True + + mostly for internal use + """ + try: + return self.frame.f_locals['__tracebackhide__'] + except KeyError: + try: + return self.frame.f_globals['__tracebackhide__'] + except KeyError: + return False + + def __str__(self): + try: + fn = str(self.path) + except py.error.Error: + fn = '???' + name = self.frame.code.name + try: + line = str(self.statement).lstrip() + except KeyboardInterrupt: + raise + except: + line = "???" + return " File %r:%d in %s\n %s\n" %(fn, self.lineno+1, name, line) + + def name(self): + return self.frame.code.raw.co_name + name = property(name, None, None, "co_name of underlaying code") + +class Traceback(list): + """ Traceback objects encapsulate and offer higher level + access to Traceback entries. + """ + Entry = TracebackEntry + def __init__(self, tb): + """ initialize from given python traceback object. """ + if hasattr(tb, 'tb_next'): + def f(cur): + while cur is not None: + yield self.Entry(cur) + cur = cur.tb_next + list.__init__(self, f(tb)) + else: + list.__init__(self, tb) + + def cut(self, path=None, lineno=None, firstlineno=None, excludepath=None): + """ return a Traceback instance wrapping part of this Traceback + + by provding any combination of path, lineno and firstlineno, the + first frame to start the to-be-returned traceback is determined + + this allows cutting the first part of a Traceback instance e.g. + for formatting reasons (removing some uninteresting bits that deal + with handling of the exception/traceback) + """ + for x in self: + code = x.frame.code + codepath = code.path + if ((path is None or codepath == path) and + (excludepath is None or not hasattr(codepath, 'relto') or + not codepath.relto(excludepath)) and + (lineno is None or x.lineno == lineno) and + (firstlineno is None or x.frame.code.firstlineno == firstlineno)): + return Traceback(x._rawentry) + return self + + def __getitem__(self, key): + val = super(Traceback, self).__getitem__(key) + if isinstance(key, type(slice(0))): + val = self.__class__(val) + return val + + def filter(self, fn=lambda x: not x.ishidden()): + """ return a Traceback instance with certain items removed + + fn is a function that gets a single argument, a TracebackItem + instance, and should return True when the item should be added + to the Traceback, False when not + + by default this removes all the TracebackItems which are hidden + (see ishidden() above) + """ + return Traceback(filter(fn, self)) + + def getcrashentry(self): + """ return last non-hidden traceback entry that lead + to the exception of a traceback. + """ + for i in range(-1, -len(self)-1, -1): + entry = self[i] + if not entry.ishidden(): + return entry + return self[-1] + + def recursionindex(self): + """ return the index of the frame/TracebackItem where recursion + originates if appropriate, None if no recursion occurred + """ + cache = {} + for i, entry in enumerate(self): + # id for the code.raw is needed to work around + # the strange metaprogramming in the decorator lib from pypi + # which generates code objects that have hash/value equality + #XXX needs a test + key = entry.frame.code.path, id(entry.frame.code.raw), entry.lineno + #print "checking for recursion at", key + l = cache.setdefault(key, []) + if l: + f = entry.frame + loc = f.f_locals + for otherloc in l: + if f.is_true(f.eval(co_equal, + __recursioncache_locals_1=loc, + __recursioncache_locals_2=otherloc)): + return i + l.append(entry.frame.f_locals) + return None + +co_equal = compile('__recursioncache_locals_1 == __recursioncache_locals_2', + '?', 'eval') + +class ExceptionInfo(object): + """ wraps sys.exc_info() objects and offers + help for navigating the traceback. + """ + _striptext = '' + def __init__(self, tup=None, exprinfo=None): + if tup is None: + tup = sys.exc_info() + if exprinfo is None and isinstance(tup[1], AssertionError): + exprinfo = getattr(tup[1], 'msg', None) + if exprinfo is None: + exprinfo = str(tup[1]) + if exprinfo and exprinfo.startswith('assert '): + self._striptext = 'AssertionError: ' + self._excinfo = tup + #: the exception class + self.type = tup[0] + #: the exception instance + self.value = tup[1] + #: the exception raw traceback + self.tb = tup[2] + #: the exception type name + self.typename = self.type.__name__ + #: the exception traceback (py.code.Traceback instance) + self.traceback = py.code.Traceback(self.tb) + + def __repr__(self): + return "<ExceptionInfo %s tblen=%d>" % (self.typename, len(self.traceback)) + + def exconly(self, tryshort=False): + """ return the exception as a string + + when 'tryshort' resolves to True, and the exception is a + py.code._AssertionError, only the actual exception part of + the exception representation is returned (so 'AssertionError: ' is + removed from the beginning) + """ + lines = format_exception_only(self.type, self.value) + text = ''.join(lines) + text = text.rstrip() + if tryshort: + if text.startswith(self._striptext): + text = text[len(self._striptext):] + return text + + def errisinstance(self, exc): + """ return True if the exception is an instance of exc """ + return isinstance(self.value, exc) + + def _getreprcrash(self): + exconly = self.exconly(tryshort=True) + entry = self.traceback.getcrashentry() + path, lineno = entry.frame.code.raw.co_filename, entry.lineno + return ReprFileLocation(path, lineno+1, exconly) + + def getrepr(self, showlocals=False, style="long", + abspath=False, tbfilter=True, funcargs=False): + """ return str()able representation of this exception info. + showlocals: show locals per traceback entry + style: long|short|no|native traceback style + tbfilter: hide entries (where __tracebackhide__ is true) + + in case of style==native, tbfilter and showlocals is ignored. + """ + if style == 'native': + return ReprExceptionInfo(ReprTracebackNative( + py.std.traceback.format_exception( + self.type, + self.value, + self.traceback[0]._rawentry, + )), self._getreprcrash()) + + fmt = FormattedExcinfo(showlocals=showlocals, style=style, + abspath=abspath, tbfilter=tbfilter, funcargs=funcargs) + return fmt.repr_excinfo(self) + + def __str__(self): + entry = self.traceback[-1] + loc = ReprFileLocation(entry.path, entry.lineno + 1, self.exconly()) + return str(loc) + + def __unicode__(self): + entry = self.traceback[-1] + loc = ReprFileLocation(entry.path, entry.lineno + 1, self.exconly()) + return unicode(loc) + + +class FormattedExcinfo(object): + """ presenting information about failing Functions and Generators. """ + # for traceback entries + flow_marker = ">" + fail_marker = "E" + + def __init__(self, showlocals=False, style="long", abspath=True, tbfilter=True, funcargs=False): + self.showlocals = showlocals + self.style = style + self.tbfilter = tbfilter + self.funcargs = funcargs + self.abspath = abspath + self.astcache = {} + + def _getindent(self, source): + # figure out indent for given source + try: + s = str(source.getstatement(len(source)-1)) + except KeyboardInterrupt: + raise + except: + try: + s = str(source[-1]) + except KeyboardInterrupt: + raise + except: + return 0 + return 4 + (len(s) - len(s.lstrip())) + + def _getentrysource(self, entry): + source = entry.getsource(self.astcache) + if source is not None: + source = source.deindent() + return source + + def _saferepr(self, obj): + return py.io.saferepr(obj) + + def repr_args(self, entry): + if self.funcargs: + args = [] + for argname, argvalue in entry.frame.getargs(var=True): + args.append((argname, self._saferepr(argvalue))) + return ReprFuncArgs(args) + + def get_source(self, source, line_index=-1, excinfo=None, short=False): + """ return formatted and marked up source lines. """ + lines = [] + if source is None or line_index >= len(source.lines): + source = py.code.Source("???") + line_index = 0 + if line_index < 0: + line_index += len(source) + space_prefix = " " + if short: + lines.append(space_prefix + source.lines[line_index].strip()) + else: + for line in source.lines[:line_index]: + lines.append(space_prefix + line) + lines.append(self.flow_marker + " " + source.lines[line_index]) + for line in source.lines[line_index+1:]: + lines.append(space_prefix + line) + if excinfo is not None: + indent = 4 if short else self._getindent(source) + lines.extend(self.get_exconly(excinfo, indent=indent, markall=True)) + return lines + + def get_exconly(self, excinfo, indent=4, markall=False): + lines = [] + indent = " " * indent + # get the real exception information out + exlines = excinfo.exconly(tryshort=True).split('\n') + failindent = self.fail_marker + indent[1:] + for line in exlines: + lines.append(failindent + line) + if not markall: + failindent = indent + return lines + + def repr_locals(self, locals): + if self.showlocals: + lines = [] + keys = [loc for loc in locals if loc[0] != "@"] + keys.sort() + for name in keys: + value = locals[name] + if name == '__builtins__': + lines.append("__builtins__ = <builtins>") + else: + # This formatting could all be handled by the + # _repr() function, which is only reprlib.Repr in + # disguise, so is very configurable. + str_repr = self._saferepr(value) + #if len(str_repr) < 70 or not isinstance(value, + # (list, tuple, dict)): + lines.append("%-10s = %s" %(name, str_repr)) + #else: + # self._line("%-10s =\\" % (name,)) + # # XXX + # py.std.pprint.pprint(value, stream=self.excinfowriter) + return ReprLocals(lines) + + def repr_traceback_entry(self, entry, excinfo=None): + source = self._getentrysource(entry) + if source is None: + source = py.code.Source("???") + line_index = 0 + else: + # entry.getfirstlinesource() can be -1, should be 0 on jython + line_index = entry.lineno - max(entry.getfirstlinesource(), 0) + + lines = [] + style = entry._repr_style + if style is None: + style = self.style + if style in ("short", "long"): + short = style == "short" + reprargs = self.repr_args(entry) if not short else None + s = self.get_source(source, line_index, excinfo, short=short) + lines.extend(s) + if short: + message = "in %s" %(entry.name) + else: + message = excinfo and excinfo.typename or "" + path = self._makepath(entry.path) + filelocrepr = ReprFileLocation(path, entry.lineno+1, message) + localsrepr = None + if not short: + localsrepr = self.repr_locals(entry.locals) + return ReprEntry(lines, reprargs, localsrepr, filelocrepr, style) + if excinfo: + lines.extend(self.get_exconly(excinfo, indent=4)) + return ReprEntry(lines, None, None, None, style) + + def _makepath(self, path): + if not self.abspath: + try: + np = py.path.local().bestrelpath(path) + except OSError: + return path + if len(np) < len(str(path)): + path = np + return path + + def repr_traceback(self, excinfo): + traceback = excinfo.traceback + if self.tbfilter: + traceback = traceback.filter() + recursionindex = None + if excinfo.errisinstance(RuntimeError): + if "maximum recursion depth exceeded" in str(excinfo.value): + recursionindex = traceback.recursionindex() + last = traceback[-1] + entries = [] + extraline = None + for index, entry in enumerate(traceback): + einfo = (last == entry) and excinfo or None + reprentry = self.repr_traceback_entry(entry, einfo) + entries.append(reprentry) + if index == recursionindex: + extraline = "!!! Recursion detected (same locals & position)" + break + return ReprTraceback(entries, extraline, style=self.style) + + def repr_excinfo(self, excinfo): + reprtraceback = self.repr_traceback(excinfo) + reprcrash = excinfo._getreprcrash() + return ReprExceptionInfo(reprtraceback, reprcrash) + +class TerminalRepr: + def __str__(self): + s = self.__unicode__() + if sys.version_info[0] < 3: + s = s.encode('utf-8') + return s + + def __unicode__(self): + # FYI this is called from pytest-xdist's serialization of exception + # information. + io = py.io.TextIO() + tw = py.io.TerminalWriter(file=io) + self.toterminal(tw) + return io.getvalue().strip() + + def __repr__(self): + return "<%s instance at %0x>" %(self.__class__, id(self)) + + +class ReprExceptionInfo(TerminalRepr): + def __init__(self, reprtraceback, reprcrash): + self.reprtraceback = reprtraceback + self.reprcrash = reprcrash + self.sections = [] + + def addsection(self, name, content, sep="-"): + self.sections.append((name, content, sep)) + + def toterminal(self, tw): + self.reprtraceback.toterminal(tw) + for name, content, sep in self.sections: + tw.sep(sep, name) + tw.line(content) + +class ReprTraceback(TerminalRepr): + entrysep = "_ " + + def __init__(self, reprentries, extraline, style): + self.reprentries = reprentries + self.extraline = extraline + self.style = style + + def toterminal(self, tw): + # the entries might have different styles + last_style = None + for i, entry in enumerate(self.reprentries): + if entry.style == "long": + tw.line("") + entry.toterminal(tw) + if i < len(self.reprentries) - 1: + next_entry = self.reprentries[i+1] + if entry.style == "long" or \ + entry.style == "short" and next_entry.style == "long": + tw.sep(self.entrysep) + + if self.extraline: + tw.line(self.extraline) + +class ReprTracebackNative(ReprTraceback): + def __init__(self, tblines): + self.style = "native" + self.reprentries = [ReprEntryNative(tblines)] + self.extraline = None + +class ReprEntryNative(TerminalRepr): + style = "native" + + def __init__(self, tblines): + self.lines = tblines + + def toterminal(self, tw): + tw.write("".join(self.lines)) + +class ReprEntry(TerminalRepr): + localssep = "_ " + + def __init__(self, lines, reprfuncargs, reprlocals, filelocrepr, style): + self.lines = lines + self.reprfuncargs = reprfuncargs + self.reprlocals = reprlocals + self.reprfileloc = filelocrepr + self.style = style + + def toterminal(self, tw): + if self.style == "short": + self.reprfileloc.toterminal(tw) + for line in self.lines: + red = line.startswith("E ") + tw.line(line, bold=True, red=red) + #tw.line("") + return + if self.reprfuncargs: + self.reprfuncargs.toterminal(tw) + for line in self.lines: + red = line.startswith("E ") + tw.line(line, bold=True, red=red) + if self.reprlocals: + #tw.sep(self.localssep, "Locals") + tw.line("") + self.reprlocals.toterminal(tw) + if self.reprfileloc: + if self.lines: + tw.line("") + self.reprfileloc.toterminal(tw) + + def __str__(self): + return "%s\n%s\n%s" % ("\n".join(self.lines), + self.reprlocals, + self.reprfileloc) + +class ReprFileLocation(TerminalRepr): + def __init__(self, path, lineno, message): + self.path = str(path) + self.lineno = lineno + self.message = message + + def toterminal(self, tw): + # filename and lineno output for each entry, + # using an output format that most editors unterstand + msg = self.message + i = msg.find("\n") + if i != -1: + msg = msg[:i] + tw.line("%s:%s: %s" %(self.path, self.lineno, msg)) + +class ReprLocals(TerminalRepr): + def __init__(self, lines): + self.lines = lines + + def toterminal(self, tw): + for line in self.lines: + tw.line(line) + +class ReprFuncArgs(TerminalRepr): + def __init__(self, args): + self.args = args + + def toterminal(self, tw): + if self.args: + linesofar = "" + for name, value in self.args: + ns = "%s = %s" %(name, value) + if len(ns) + len(linesofar) + 2 > tw.fullwidth: + if linesofar: + tw.line(linesofar) + linesofar = ns + else: + if linesofar: + linesofar += ", " + ns + else: + linesofar = ns + if linesofar: + tw.line(linesofar) + tw.line("") + + + +oldbuiltins = {} + +def patch_builtins(assertion=True, compile=True): + """ put compile and AssertionError builtins to Python's builtins. """ + if assertion: + from py._code import assertion + l = oldbuiltins.setdefault('AssertionError', []) + l.append(py.builtin.builtins.AssertionError) + py.builtin.builtins.AssertionError = assertion.AssertionError + if compile: + l = oldbuiltins.setdefault('compile', []) + l.append(py.builtin.builtins.compile) + py.builtin.builtins.compile = py.code.compile + +def unpatch_builtins(assertion=True, compile=True): + """ remove compile and AssertionError builtins from Python builtins. """ + if assertion: + py.builtin.builtins.AssertionError = oldbuiltins['AssertionError'].pop() + if compile: + py.builtin.builtins.compile = oldbuiltins['compile'].pop() + +def getrawcode(obj, trycall=True): + """ return code object for given function. """ + try: + return obj.__code__ + except AttributeError: + obj = getattr(obj, 'im_func', obj) + obj = getattr(obj, 'func_code', obj) + obj = getattr(obj, 'f_code', obj) + obj = getattr(obj, '__code__', obj) + if trycall and not hasattr(obj, 'co_firstlineno'): + if hasattr(obj, '__call__') and not py.std.inspect.isclass(obj): + x = getrawcode(obj.__call__, trycall=False) + if hasattr(x, 'co_firstlineno'): + return x + return obj + diff --git a/python/py/py/_code/source.py b/python/py/py/_code/source.py new file mode 100644 index 000000000..3a648e635 --- /dev/null +++ b/python/py/py/_code/source.py @@ -0,0 +1,419 @@ +from __future__ import generators + +from bisect import bisect_right +import sys +import inspect, tokenize +import py +from types import ModuleType +cpy_compile = compile + +try: + import _ast + from _ast import PyCF_ONLY_AST as _AST_FLAG +except ImportError: + _AST_FLAG = 0 + _ast = None + + +class Source(object): + """ a immutable object holding a source code fragment, + possibly deindenting it. + """ + _compilecounter = 0 + def __init__(self, *parts, **kwargs): + self.lines = lines = [] + de = kwargs.get('deindent', True) + rstrip = kwargs.get('rstrip', True) + for part in parts: + if not part: + partlines = [] + if isinstance(part, Source): + partlines = part.lines + elif isinstance(part, (tuple, list)): + partlines = [x.rstrip("\n") for x in part] + elif isinstance(part, py.builtin._basestring): + partlines = part.split('\n') + if rstrip: + while partlines: + if partlines[-1].strip(): + break + partlines.pop() + else: + partlines = getsource(part, deindent=de).lines + if de: + partlines = deindent(partlines) + lines.extend(partlines) + + def __eq__(self, other): + try: + return self.lines == other.lines + except AttributeError: + if isinstance(other, str): + return str(self) == other + return False + + def __getitem__(self, key): + if isinstance(key, int): + return self.lines[key] + else: + if key.step not in (None, 1): + raise IndexError("cannot slice a Source with a step") + return self.__getslice__(key.start, key.stop) + + def __len__(self): + return len(self.lines) + + def __getslice__(self, start, end): + newsource = Source() + newsource.lines = self.lines[start:end] + return newsource + + def strip(self): + """ return new source object with trailing + and leading blank lines removed. + """ + start, end = 0, len(self) + while start < end and not self.lines[start].strip(): + start += 1 + while end > start and not self.lines[end-1].strip(): + end -= 1 + source = Source() + source.lines[:] = self.lines[start:end] + return source + + def putaround(self, before='', after='', indent=' ' * 4): + """ return a copy of the source object with + 'before' and 'after' wrapped around it. + """ + before = Source(before) + after = Source(after) + newsource = Source() + lines = [ (indent + line) for line in self.lines] + newsource.lines = before.lines + lines + after.lines + return newsource + + def indent(self, indent=' ' * 4): + """ return a copy of the source object with + all lines indented by the given indent-string. + """ + newsource = Source() + newsource.lines = [(indent+line) for line in self.lines] + return newsource + + def getstatement(self, lineno, assertion=False): + """ return Source statement which contains the + given linenumber (counted from 0). + """ + start, end = self.getstatementrange(lineno, assertion) + return self[start:end] + + def getstatementrange(self, lineno, assertion=False): + """ return (start, end) tuple which spans the minimal + statement region which containing the given lineno. + """ + if not (0 <= lineno < len(self)): + raise IndexError("lineno out of range") + ast, start, end = getstatementrange_ast(lineno, self) + return start, end + + def deindent(self, offset=None): + """ return a new source object deindented by offset. + If offset is None then guess an indentation offset from + the first non-blank line. Subsequent lines which have a + lower indentation offset will be copied verbatim as + they are assumed to be part of multilines. + """ + # XXX maybe use the tokenizer to properly handle multiline + # strings etc.pp? + newsource = Source() + newsource.lines[:] = deindent(self.lines, offset) + return newsource + + def isparseable(self, deindent=True): + """ return True if source is parseable, heuristically + deindenting it by default. + """ + try: + import parser + except ImportError: + syntax_checker = lambda x: compile(x, 'asd', 'exec') + else: + syntax_checker = parser.suite + + if deindent: + source = str(self.deindent()) + else: + source = str(self) + try: + #compile(source+'\n', "x", "exec") + syntax_checker(source+'\n') + except KeyboardInterrupt: + raise + except Exception: + return False + else: + return True + + def __str__(self): + return "\n".join(self.lines) + + def compile(self, filename=None, mode='exec', + flag=generators.compiler_flag, + dont_inherit=0, _genframe=None): + """ return compiled code object. if filename is None + invent an artificial filename which displays + the source/line position of the caller frame. + """ + if not filename or py.path.local(filename).check(file=0): + if _genframe is None: + _genframe = sys._getframe(1) # the caller + fn,lineno = _genframe.f_code.co_filename, _genframe.f_lineno + base = "<%d-codegen " % self._compilecounter + self.__class__._compilecounter += 1 + if not filename: + filename = base + '%s:%d>' % (fn, lineno) + else: + filename = base + '%r %s:%d>' % (filename, fn, lineno) + source = "\n".join(self.lines) + '\n' + try: + co = cpy_compile(source, filename, mode, flag) + except SyntaxError: + ex = sys.exc_info()[1] + # re-represent syntax errors from parsing python strings + msglines = self.lines[:ex.lineno] + if ex.offset: + msglines.append(" "*ex.offset + '^') + msglines.append("(code was compiled probably from here: %s)" % filename) + newex = SyntaxError('\n'.join(msglines)) + newex.offset = ex.offset + newex.lineno = ex.lineno + newex.text = ex.text + raise newex + else: + if flag & _AST_FLAG: + return co + lines = [(x + "\n") for x in self.lines] + if sys.version_info[0] >= 3: + # XXX py3's inspect.getsourcefile() checks for a module + # and a pep302 __loader__ ... we don't have a module + # at code compile-time so we need to fake it here + m = ModuleType("_pycodecompile_pseudo_module") + py.std.inspect.modulesbyfile[filename] = None + py.std.sys.modules[None] = m + m.__loader__ = 1 + py.std.linecache.cache[filename] = (1, None, lines, filename) + return co + +# +# public API shortcut functions +# + +def compile_(source, filename=None, mode='exec', flags= + generators.compiler_flag, dont_inherit=0): + """ compile the given source to a raw code object, + and maintain an internal cache which allows later + retrieval of the source code for the code object + and any recursively created code objects. + """ + if _ast is not None and isinstance(source, _ast.AST): + # XXX should Source support having AST? + return cpy_compile(source, filename, mode, flags, dont_inherit) + _genframe = sys._getframe(1) # the caller + s = Source(source) + co = s.compile(filename, mode, flags, _genframe=_genframe) + return co + + +def getfslineno(obj): + """ Return source location (path, lineno) for the given object. + If the source cannot be determined return ("", -1) + """ + try: + code = py.code.Code(obj) + except TypeError: + try: + fn = (py.std.inspect.getsourcefile(obj) or + py.std.inspect.getfile(obj)) + except TypeError: + return "", -1 + + fspath = fn and py.path.local(fn) or None + lineno = -1 + if fspath: + try: + _, lineno = findsource(obj) + except IOError: + pass + else: + fspath = code.path + lineno = code.firstlineno + assert isinstance(lineno, int) + return fspath, lineno + +# +# helper functions +# + +def findsource(obj): + try: + sourcelines, lineno = py.std.inspect.findsource(obj) + except py.builtin._sysex: + raise + except: + return None, -1 + source = Source() + source.lines = [line.rstrip() for line in sourcelines] + return source, lineno + +def getsource(obj, **kwargs): + obj = py.code.getrawcode(obj) + try: + strsrc = inspect.getsource(obj) + except IndentationError: + strsrc = "\"Buggy python version consider upgrading, cannot get source\"" + assert isinstance(strsrc, str) + return Source(strsrc, **kwargs) + +def deindent(lines, offset=None): + if offset is None: + for line in lines: + line = line.expandtabs() + s = line.lstrip() + if s: + offset = len(line)-len(s) + break + else: + offset = 0 + if offset == 0: + return list(lines) + newlines = [] + def readline_generator(lines): + for line in lines: + yield line + '\n' + while True: + yield '' + + it = readline_generator(lines) + + try: + for _, _, (sline, _), (eline, _), _ in tokenize.generate_tokens(lambda: next(it)): + if sline > len(lines): + break # End of input reached + if sline > len(newlines): + line = lines[sline - 1].expandtabs() + if line.lstrip() and line[:offset].isspace(): + line = line[offset:] # Deindent + newlines.append(line) + + for i in range(sline, eline): + # Don't deindent continuing lines of + # multiline tokens (i.e. multiline strings) + newlines.append(lines[i]) + except (IndentationError, tokenize.TokenError): + pass + # Add any lines we didn't see. E.g. if an exception was raised. + newlines.extend(lines[len(newlines):]) + return newlines + + +def get_statement_startend2(lineno, node): + import ast + # flatten all statements and except handlers into one lineno-list + # AST's line numbers start indexing at 1 + l = [] + for x in ast.walk(node): + if isinstance(x, _ast.stmt) or isinstance(x, _ast.ExceptHandler): + l.append(x.lineno - 1) + for name in "finalbody", "orelse": + val = getattr(x, name, None) + if val: + # treat the finally/orelse part as its own statement + l.append(val[0].lineno - 1 - 1) + l.sort() + insert_index = bisect_right(l, lineno) + start = l[insert_index - 1] + if insert_index >= len(l): + end = None + else: + end = l[insert_index] + return start, end + + +def getstatementrange_ast(lineno, source, assertion=False, astnode=None): + if astnode is None: + content = str(source) + if sys.version_info < (2,7): + content += "\n" + try: + astnode = compile(content, "source", "exec", 1024) # 1024 for AST + except ValueError: + start, end = getstatementrange_old(lineno, source, assertion) + return None, start, end + start, end = get_statement_startend2(lineno, astnode) + # we need to correct the end: + # - ast-parsing strips comments + # - there might be empty lines + # - we might have lesser indented code blocks at the end + if end is None: + end = len(source.lines) + + if end > start + 1: + # make sure we don't span differently indented code blocks + # by using the BlockFinder helper used which inspect.getsource() uses itself + block_finder = inspect.BlockFinder() + # if we start with an indented line, put blockfinder to "started" mode + block_finder.started = source.lines[start][0].isspace() + it = ((x + "\n") for x in source.lines[start:end]) + try: + for tok in tokenize.generate_tokens(lambda: next(it)): + block_finder.tokeneater(*tok) + except (inspect.EndOfBlock, IndentationError): + end = block_finder.last + start + except Exception: + pass + + # the end might still point to a comment or empty line, correct it + while end: + line = source.lines[end - 1].lstrip() + if line.startswith("#") or not line: + end -= 1 + else: + break + return astnode, start, end + + +def getstatementrange_old(lineno, source, assertion=False): + """ return (start, end) tuple which spans the minimal + statement region which containing the given lineno. + raise an IndexError if no such statementrange can be found. + """ + # XXX this logic is only used on python2.4 and below + # 1. find the start of the statement + from codeop import compile_command + for start in range(lineno, -1, -1): + if assertion: + line = source.lines[start] + # the following lines are not fully tested, change with care + if 'super' in line and 'self' in line and '__init__' in line: + raise IndexError("likely a subclass") + if "assert" not in line and "raise" not in line: + continue + trylines = source.lines[start:lineno+1] + # quick hack to prepare parsing an indented line with + # compile_command() (which errors on "return" outside defs) + trylines.insert(0, 'def xxx():') + trysource = '\n '.join(trylines) + # ^ space here + try: + compile_command(trysource) + except (SyntaxError, OverflowError, ValueError): + continue + + # 2. find the end of the statement + for end in range(lineno+1, len(source)+1): + trysource = source[start:end] + if trysource.isparseable(): + return start, end + raise SyntaxError("no valid source range around line %d " % (lineno,)) + + diff --git a/python/py/py/_error.py b/python/py/py/_error.py new file mode 100644 index 000000000..550fb521a --- /dev/null +++ b/python/py/py/_error.py @@ -0,0 +1,88 @@ +""" +create errno-specific classes for IO or os calls. + +""" +import sys, os, errno + +class Error(EnvironmentError): + def __repr__(self): + return "%s.%s %r: %s " %(self.__class__.__module__, + self.__class__.__name__, + self.__class__.__doc__, + " ".join(map(str, self.args)), + #repr(self.args) + ) + + def __str__(self): + s = "[%s]: %s" %(self.__class__.__doc__, + " ".join(map(str, self.args)), + ) + return s + +_winerrnomap = { + 2: errno.ENOENT, + 3: errno.ENOENT, + 17: errno.EEXIST, + 13: errno.EBUSY, # empty cd drive, but ENOMEDIUM seems unavailiable + 22: errno.ENOTDIR, + 20: errno.ENOTDIR, + 267: errno.ENOTDIR, + 5: errno.EACCES, # anything better? +} + +class ErrorMaker(object): + """ lazily provides Exception classes for each possible POSIX errno + (as defined per the 'errno' module). All such instances + subclass EnvironmentError. + """ + Error = Error + _errno2class = {} + + def __getattr__(self, name): + if name[0] == "_": + raise AttributeError(name) + eno = getattr(errno, name) + cls = self._geterrnoclass(eno) + setattr(self, name, cls) + return cls + + def _geterrnoclass(self, eno): + try: + return self._errno2class[eno] + except KeyError: + clsname = errno.errorcode.get(eno, "UnknownErrno%d" %(eno,)) + errorcls = type(Error)(clsname, (Error,), + {'__module__':'py.error', + '__doc__': os.strerror(eno)}) + self._errno2class[eno] = errorcls + return errorcls + + def checked_call(self, func, *args, **kwargs): + """ call a function and raise an errno-exception if applicable. """ + __tracebackhide__ = True + try: + return func(*args, **kwargs) + except self.Error: + raise + except (OSError, EnvironmentError): + cls, value, tb = sys.exc_info() + if not hasattr(value, 'errno'): + raise + __tracebackhide__ = False + errno = value.errno + try: + if not isinstance(value, WindowsError): + raise NameError + except NameError: + # we are not on Windows, or we got a proper OSError + cls = self._geterrnoclass(errno) + else: + try: + cls = self._geterrnoclass(_winerrnomap[errno]) + except KeyError: + raise value + raise cls("%s%r" % (func.__name__, args)) + __tracebackhide__ = True + + +error = ErrorMaker() diff --git a/python/py/py/_iniconfig.py b/python/py/py/_iniconfig.py new file mode 100644 index 000000000..92b50bd85 --- /dev/null +++ b/python/py/py/_iniconfig.py @@ -0,0 +1,162 @@ +""" brain-dead simple parser for ini-style files. +(C) Ronny Pfannschmidt, Holger Krekel -- MIT licensed +""" +__version__ = "0.2.dev2" + +__all__ = ['IniConfig', 'ParseError'] + +COMMENTCHARS = "#;" + +class ParseError(Exception): + def __init__(self, path, lineno, msg): + Exception.__init__(self, path, lineno, msg) + self.path = path + self.lineno = lineno + self.msg = msg + + def __str__(self): + return "%s:%s: %s" %(self.path, self.lineno+1, self.msg) + +class SectionWrapper(object): + def __init__(self, config, name): + self.config = config + self.name = name + + def lineof(self, name): + return self.config.lineof(self.name, name) + + def get(self, key, default=None, convert=str): + return self.config.get(self.name, key, convert=convert, default=default) + + def __getitem__(self, key): + return self.config.sections[self.name][key] + + def __iter__(self): + section = self.config.sections.get(self.name, []) + def lineof(key): + return self.config.lineof(self.name, key) + for name in sorted(section, key=lineof): + yield name + + def items(self): + for name in self: + yield name, self[name] + + +class IniConfig(object): + def __init__(self, path, data=None): + self.path = str(path) # convenience + if data is None: + f = open(self.path) + try: + tokens = self._parse(iter(f)) + finally: + f.close() + else: + tokens = self._parse(data.splitlines(True)) + + self._sources = {} + self.sections = {} + + for lineno, section, name, value in tokens: + if section is None: + self._raise(lineno, 'no section header defined') + self._sources[section, name] = lineno + if name is None: + if section in self.sections: + self._raise(lineno, 'duplicate section %r'%(section, )) + self.sections[section] = {} + else: + if name in self.sections[section]: + self._raise(lineno, 'duplicate name %r'%(name, )) + self.sections[section][name] = value + + def _raise(self, lineno, msg): + raise ParseError(self.path, lineno, msg) + + def _parse(self, line_iter): + result = [] + section = None + for lineno, line in enumerate(line_iter): + name, data = self._parseline(line, lineno) + # new value + if name is not None and data is not None: + result.append((lineno, section, name, data)) + # new section + elif name is not None and data is None: + if not name: + self._raise(lineno, 'empty section name') + section = name + result.append((lineno, section, None, None)) + # continuation + elif name is None and data is not None: + if not result: + self._raise(lineno, 'unexpected value continuation') + last = result.pop() + last_name, last_data = last[-2:] + if last_name is None: + self._raise(lineno, 'unexpected value continuation') + + if last_data: + data = '%s\n%s' % (last_data, data) + result.append(last[:-1] + (data,)) + return result + + def _parseline(self, line, lineno): + # blank lines + if iscommentline(line): + line = "" + else: + line = line.rstrip() + if not line: + return None, None + # section + if line[0] == '[': + realline = line + for c in COMMENTCHARS: + line = line.split(c)[0].rstrip() + if line[-1] == "]": + return line[1:-1], None + return None, realline.strip() + # value + elif not line[0].isspace(): + try: + name, value = line.split('=', 1) + if ":" in name: + raise ValueError() + except ValueError: + try: + name, value = line.split(":", 1) + except ValueError: + self._raise(lineno, 'unexpected line: %r' % line) + return name.strip(), value.strip() + # continuation + else: + return None, line.strip() + + def lineof(self, section, name=None): + lineno = self._sources.get((section, name)) + if lineno is not None: + return lineno + 1 + + def get(self, section, name, default=None, convert=str): + try: + return convert(self.sections[section][name]) + except KeyError: + return default + + def __getitem__(self, name): + if name not in self.sections: + raise KeyError(name) + return SectionWrapper(self, name) + + def __iter__(self): + for name in sorted(self.sections, key=self.lineof): + yield SectionWrapper(self, name) + + def __contains__(self, arg): + return arg in self.sections + +def iscommentline(line): + c = line.lstrip()[:1] + return c in COMMENTCHARS diff --git a/python/py/py/_io/__init__.py b/python/py/py/_io/__init__.py new file mode 100644 index 000000000..835f01f3a --- /dev/null +++ b/python/py/py/_io/__init__.py @@ -0,0 +1 @@ +""" input/output helping """ diff --git a/python/py/py/_io/capture.py b/python/py/py/_io/capture.py new file mode 100644 index 000000000..bc157ed97 --- /dev/null +++ b/python/py/py/_io/capture.py @@ -0,0 +1,371 @@ +import os +import sys +import py +import tempfile + +try: + from io import StringIO +except ImportError: + from StringIO import StringIO + +if sys.version_info < (3,0): + class TextIO(StringIO): + def write(self, data): + if not isinstance(data, unicode): + data = unicode(data, getattr(self, '_encoding', 'UTF-8'), 'replace') + StringIO.write(self, data) +else: + TextIO = StringIO + +try: + from io import BytesIO +except ImportError: + class BytesIO(StringIO): + def write(self, data): + if isinstance(data, unicode): + raise TypeError("not a byte value: %r" %(data,)) + StringIO.write(self, data) + +patchsysdict = {0: 'stdin', 1: 'stdout', 2: 'stderr'} + +class FDCapture: + """ Capture IO to/from a given os-level filedescriptor. """ + + def __init__(self, targetfd, tmpfile=None, now=True, patchsys=False): + """ save targetfd descriptor, and open a new + temporary file there. If no tmpfile is + specified a tempfile.Tempfile() will be opened + in text mode. + """ + self.targetfd = targetfd + if tmpfile is None and targetfd != 0: + f = tempfile.TemporaryFile('wb+') + tmpfile = dupfile(f, encoding="UTF-8") + f.close() + self.tmpfile = tmpfile + self._savefd = os.dup(self.targetfd) + if patchsys: + self._oldsys = getattr(sys, patchsysdict[targetfd]) + if now: + self.start() + + def start(self): + try: + os.fstat(self._savefd) + except OSError: + raise ValueError("saved filedescriptor not valid, " + "did you call start() twice?") + if self.targetfd == 0 and not self.tmpfile: + fd = os.open(devnullpath, os.O_RDONLY) + os.dup2(fd, 0) + os.close(fd) + if hasattr(self, '_oldsys'): + setattr(sys, patchsysdict[self.targetfd], DontReadFromInput()) + else: + os.dup2(self.tmpfile.fileno(), self.targetfd) + if hasattr(self, '_oldsys'): + setattr(sys, patchsysdict[self.targetfd], self.tmpfile) + + def done(self): + """ unpatch and clean up, returns the self.tmpfile (file object) + """ + os.dup2(self._savefd, self.targetfd) + os.close(self._savefd) + if self.targetfd != 0: + self.tmpfile.seek(0) + if hasattr(self, '_oldsys'): + setattr(sys, patchsysdict[self.targetfd], self._oldsys) + return self.tmpfile + + def writeorg(self, data): + """ write a string to the original file descriptor + """ + tempfp = tempfile.TemporaryFile() + try: + os.dup2(self._savefd, tempfp.fileno()) + tempfp.write(data) + finally: + tempfp.close() + + +def dupfile(f, mode=None, buffering=0, raising=False, encoding=None): + """ return a new open file object that's a duplicate of f + + mode is duplicated if not given, 'buffering' controls + buffer size (defaulting to no buffering) and 'raising' + defines whether an exception is raised when an incompatible + file object is passed in (if raising is False, the file + object itself will be returned) + """ + try: + fd = f.fileno() + mode = mode or f.mode + except AttributeError: + if raising: + raise + return f + newfd = os.dup(fd) + if sys.version_info >= (3,0): + if encoding is not None: + mode = mode.replace("b", "") + buffering = True + return os.fdopen(newfd, mode, buffering, encoding, closefd=True) + else: + f = os.fdopen(newfd, mode, buffering) + if encoding is not None: + return EncodedFile(f, encoding) + return f + +class EncodedFile(object): + def __init__(self, _stream, encoding): + self._stream = _stream + self.encoding = encoding + + def write(self, obj): + if isinstance(obj, unicode): + obj = obj.encode(self.encoding) + elif isinstance(obj, str): + pass + else: + obj = str(obj) + self._stream.write(obj) + + def writelines(self, linelist): + data = ''.join(linelist) + self.write(data) + + def __getattr__(self, name): + return getattr(self._stream, name) + +class Capture(object): + def call(cls, func, *args, **kwargs): + """ return a (res, out, err) tuple where + out and err represent the output/error output + during function execution. + call the given function with args/kwargs + and capture output/error during its execution. + """ + so = cls() + try: + res = func(*args, **kwargs) + finally: + out, err = so.reset() + return res, out, err + call = classmethod(call) + + def reset(self): + """ reset sys.stdout/stderr and return captured output as strings. """ + if hasattr(self, '_reset'): + raise ValueError("was already reset") + self._reset = True + outfile, errfile = self.done(save=False) + out, err = "", "" + if outfile and not outfile.closed: + out = outfile.read() + outfile.close() + if errfile and errfile != outfile and not errfile.closed: + err = errfile.read() + errfile.close() + return out, err + + def suspend(self): + """ return current snapshot captures, memorize tempfiles. """ + outerr = self.readouterr() + outfile, errfile = self.done() + return outerr + + +class StdCaptureFD(Capture): + """ This class allows to capture writes to FD1 and FD2 + and may connect a NULL file to FD0 (and prevent + reads from sys.stdin). If any of the 0,1,2 file descriptors + is invalid it will not be captured. + """ + def __init__(self, out=True, err=True, mixed=False, + in_=True, patchsys=True, now=True): + self._options = { + "out": out, + "err": err, + "mixed": mixed, + "in_": in_, + "patchsys": patchsys, + "now": now, + } + self._save() + if now: + self.startall() + + def _save(self): + in_ = self._options['in_'] + out = self._options['out'] + err = self._options['err'] + mixed = self._options['mixed'] + patchsys = self._options['patchsys'] + if in_: + try: + self.in_ = FDCapture(0, tmpfile=None, now=False, + patchsys=patchsys) + except OSError: + pass + if out: + tmpfile = None + if hasattr(out, 'write'): + tmpfile = out + try: + self.out = FDCapture(1, tmpfile=tmpfile, + now=False, patchsys=patchsys) + self._options['out'] = self.out.tmpfile + except OSError: + pass + if err: + if out and mixed: + tmpfile = self.out.tmpfile + elif hasattr(err, 'write'): + tmpfile = err + else: + tmpfile = None + try: + self.err = FDCapture(2, tmpfile=tmpfile, + now=False, patchsys=patchsys) + self._options['err'] = self.err.tmpfile + except OSError: + pass + + def startall(self): + if hasattr(self, 'in_'): + self.in_.start() + if hasattr(self, 'out'): + self.out.start() + if hasattr(self, 'err'): + self.err.start() + + def resume(self): + """ resume capturing with original temp files. """ + self.startall() + + def done(self, save=True): + """ return (outfile, errfile) and stop capturing. """ + outfile = errfile = None + if hasattr(self, 'out') and not self.out.tmpfile.closed: + outfile = self.out.done() + if hasattr(self, 'err') and not self.err.tmpfile.closed: + errfile = self.err.done() + if hasattr(self, 'in_'): + tmpfile = self.in_.done() + if save: + self._save() + return outfile, errfile + + def readouterr(self): + """ return snapshot value of stdout/stderr capturings. """ + if hasattr(self, "out"): + out = self._readsnapshot(self.out.tmpfile) + else: + out = "" + if hasattr(self, "err"): + err = self._readsnapshot(self.err.tmpfile) + else: + err = "" + return [out, err] + + def _readsnapshot(self, f): + f.seek(0) + res = f.read() + enc = getattr(f, "encoding", None) + if enc: + res = py.builtin._totext(res, enc, "replace") + f.truncate(0) + f.seek(0) + return res + + +class StdCapture(Capture): + """ This class allows to capture writes to sys.stdout|stderr "in-memory" + and will raise errors on tries to read from sys.stdin. It only + modifies sys.stdout|stderr|stdin attributes and does not + touch underlying File Descriptors (use StdCaptureFD for that). + """ + def __init__(self, out=True, err=True, in_=True, mixed=False, now=True): + self._oldout = sys.stdout + self._olderr = sys.stderr + self._oldin = sys.stdin + if out and not hasattr(out, 'file'): + out = TextIO() + self.out = out + if err: + if mixed: + err = out + elif not hasattr(err, 'write'): + err = TextIO() + self.err = err + self.in_ = in_ + if now: + self.startall() + + def startall(self): + if self.out: + sys.stdout = self.out + if self.err: + sys.stderr = self.err + if self.in_: + sys.stdin = self.in_ = DontReadFromInput() + + def done(self, save=True): + """ return (outfile, errfile) and stop capturing. """ + outfile = errfile = None + if self.out and not self.out.closed: + sys.stdout = self._oldout + outfile = self.out + outfile.seek(0) + if self.err and not self.err.closed: + sys.stderr = self._olderr + errfile = self.err + errfile.seek(0) + if self.in_: + sys.stdin = self._oldin + return outfile, errfile + + def resume(self): + """ resume capturing with original temp files. """ + self.startall() + + def readouterr(self): + """ return snapshot value of stdout/stderr capturings. """ + out = err = "" + if self.out: + out = self.out.getvalue() + self.out.truncate(0) + self.out.seek(0) + if self.err: + err = self.err.getvalue() + self.err.truncate(0) + self.err.seek(0) + return out, err + +class DontReadFromInput: + """Temporary stub class. Ideally when stdin is accessed, the + capturing should be turned off, with possibly all data captured + so far sent to the screen. This should be configurable, though, + because in automated test runs it is better to crash than + hang indefinitely. + """ + def read(self, *args): + raise IOError("reading from stdin while output is captured") + readline = read + readlines = read + __iter__ = read + + def fileno(self): + raise ValueError("redirected Stdin is pseudofile, has no fileno()") + def isatty(self): + return False + def close(self): + pass + +try: + devnullpath = os.devnull +except AttributeError: + if os.name == 'nt': + devnullpath = 'NUL' + else: + devnullpath = '/dev/null' diff --git a/python/py/py/_io/saferepr.py b/python/py/py/_io/saferepr.py new file mode 100644 index 000000000..8518290ef --- /dev/null +++ b/python/py/py/_io/saferepr.py @@ -0,0 +1,71 @@ +import py +import sys + +builtin_repr = repr + +reprlib = py.builtin._tryimport('repr', 'reprlib') + +class SafeRepr(reprlib.Repr): + """ subclass of repr.Repr that limits the resulting size of repr() + and includes information on exceptions raised during the call. + """ + def repr(self, x): + return self._callhelper(reprlib.Repr.repr, self, x) + + def repr_unicode(self, x, level): + # Strictly speaking wrong on narrow builds + def repr(u): + if "'" not in u: + return py.builtin._totext("'%s'") % u + elif '"' not in u: + return py.builtin._totext('"%s"') % u + else: + return py.builtin._totext("'%s'") % u.replace("'", r"\'") + s = repr(x[:self.maxstring]) + if len(s) > self.maxstring: + i = max(0, (self.maxstring-3)//2) + j = max(0, self.maxstring-3-i) + s = repr(x[:i] + x[len(x)-j:]) + s = s[:i] + '...' + s[len(s)-j:] + return s + + def repr_instance(self, x, level): + return self._callhelper(builtin_repr, x) + + def _callhelper(self, call, x, *args): + try: + # Try the vanilla repr and make sure that the result is a string + s = call(x, *args) + except py.builtin._sysex: + raise + except: + cls, e, tb = sys.exc_info() + exc_name = getattr(cls, '__name__', 'unknown') + try: + exc_info = str(e) + except py.builtin._sysex: + raise + except: + exc_info = 'unknown' + return '<[%s("%s") raised in repr()] %s object at 0x%x>' % ( + exc_name, exc_info, x.__class__.__name__, id(x)) + else: + if len(s) > self.maxsize: + i = max(0, (self.maxsize-3)//2) + j = max(0, self.maxsize-3-i) + s = s[:i] + '...' + s[len(s)-j:] + return s + +def saferepr(obj, maxsize=240): + """ return a size-limited safe repr-string for the given object. + Failing __repr__ functions of user instances will be represented + with a short exception info and 'saferepr' generally takes + care to never raise exceptions itself. This function is a wrapper + around the Repr/reprlib functionality of the standard 2.6 lib. + """ + # review exception handling + srepr = SafeRepr() + srepr.maxstring = maxsize + srepr.maxsize = maxsize + srepr.maxother = 160 + return srepr.repr(obj) diff --git a/python/py/py/_io/terminalwriter.py b/python/py/py/_io/terminalwriter.py new file mode 100644 index 000000000..cef1ff580 --- /dev/null +++ b/python/py/py/_io/terminalwriter.py @@ -0,0 +1,348 @@ +""" + +Helper functions for writing to terminals and files. + +""" + + +import sys, os +import py +py3k = sys.version_info[0] >= 3 +from py.builtin import text, bytes + +win32_and_ctypes = False +colorama = None +if sys.platform == "win32": + try: + import colorama + except ImportError: + try: + import ctypes + win32_and_ctypes = True + except ImportError: + pass + + +def _getdimensions(): + import termios,fcntl,struct + call = fcntl.ioctl(1,termios.TIOCGWINSZ,"\000"*8) + height,width = struct.unpack( "hhhh", call ) [:2] + return height, width + + +def get_terminal_width(): + height = width = 0 + try: + height, width = _getdimensions() + except py.builtin._sysex: + raise + except: + # pass to fallback below + pass + + if width == 0: + # FALLBACK: + # * some exception happened + # * or this is emacs terminal which reports (0,0) + width = int(os.environ.get('COLUMNS', 80)) + + # XXX the windows getdimensions may be bogus, let's sanify a bit + if width < 40: + width = 80 + return width + +terminal_width = get_terminal_width() + +# XXX unify with _escaped func below +def ansi_print(text, esc, file=None, newline=True, flush=False): + if file is None: + file = sys.stderr + text = text.rstrip() + if esc and not isinstance(esc, tuple): + esc = (esc,) + if esc and sys.platform != "win32" and file.isatty(): + text = (''.join(['\x1b[%sm' % cod for cod in esc]) + + text + + '\x1b[0m') # ANSI color code "reset" + if newline: + text += '\n' + + if esc and win32_and_ctypes and file.isatty(): + if 1 in esc: + bold = True + esc = tuple([x for x in esc if x != 1]) + else: + bold = False + esctable = {() : FOREGROUND_WHITE, # normal + (31,): FOREGROUND_RED, # red + (32,): FOREGROUND_GREEN, # green + (33,): FOREGROUND_GREEN|FOREGROUND_RED, # yellow + (34,): FOREGROUND_BLUE, # blue + (35,): FOREGROUND_BLUE|FOREGROUND_RED, # purple + (36,): FOREGROUND_BLUE|FOREGROUND_GREEN, # cyan + (37,): FOREGROUND_WHITE, # white + (39,): FOREGROUND_WHITE, # reset + } + attr = esctable.get(esc, FOREGROUND_WHITE) + if bold: + attr |= FOREGROUND_INTENSITY + STD_OUTPUT_HANDLE = -11 + STD_ERROR_HANDLE = -12 + if file is sys.stderr: + handle = GetStdHandle(STD_ERROR_HANDLE) + else: + handle = GetStdHandle(STD_OUTPUT_HANDLE) + oldcolors = GetConsoleInfo(handle).wAttributes + attr |= (oldcolors & 0x0f0) + SetConsoleTextAttribute(handle, attr) + while len(text) > 32768: + file.write(text[:32768]) + text = text[32768:] + if text: + file.write(text) + SetConsoleTextAttribute(handle, oldcolors) + else: + file.write(text) + + if flush: + file.flush() + +def should_do_markup(file): + if os.environ.get('PY_COLORS') == '1': + return True + if os.environ.get('PY_COLORS') == '0': + return False + return hasattr(file, 'isatty') and file.isatty() \ + and os.environ.get('TERM') != 'dumb' \ + and not (sys.platform.startswith('java') and os._name == 'nt') + +class TerminalWriter(object): + _esctable = dict(black=30, red=31, green=32, yellow=33, + blue=34, purple=35, cyan=36, white=37, + Black=40, Red=41, Green=42, Yellow=43, + Blue=44, Purple=45, Cyan=46, White=47, + bold=1, light=2, blink=5, invert=7) + + # XXX deprecate stringio argument + def __init__(self, file=None, stringio=False, encoding=None): + if file is None: + if stringio: + self.stringio = file = py.io.TextIO() + else: + file = py.std.sys.stdout + elif py.builtin.callable(file) and not ( + hasattr(file, "write") and hasattr(file, "flush")): + file = WriteFile(file, encoding=encoding) + if hasattr(file, "isatty") and file.isatty() and colorama: + file = colorama.AnsiToWin32(file).stream + self.encoding = encoding or getattr(file, 'encoding', "utf-8") + self._file = file + self.fullwidth = get_terminal_width() + self.hasmarkup = should_do_markup(file) + self._lastlen = 0 + + def _escaped(self, text, esc): + if esc and self.hasmarkup: + text = (''.join(['\x1b[%sm' % cod for cod in esc]) + + text +'\x1b[0m') + return text + + def markup(self, text, **kw): + esc = [] + for name in kw: + if name not in self._esctable: + raise ValueError("unknown markup: %r" %(name,)) + if kw[name]: + esc.append(self._esctable[name]) + return self._escaped(text, tuple(esc)) + + def sep(self, sepchar, title=None, fullwidth=None, **kw): + if fullwidth is None: + fullwidth = self.fullwidth + # the goal is to have the line be as long as possible + # under the condition that len(line) <= fullwidth + if sys.platform == "win32": + # if we print in the last column on windows we are on a + # new line but there is no way to verify/neutralize this + # (we may not know the exact line width) + # so let's be defensive to avoid empty lines in the output + fullwidth -= 1 + if title is not None: + # we want 2 + 2*len(fill) + len(title) <= fullwidth + # i.e. 2 + 2*len(sepchar)*N + len(title) <= fullwidth + # 2*len(sepchar)*N <= fullwidth - len(title) - 2 + # N <= (fullwidth - len(title) - 2) // (2*len(sepchar)) + N = (fullwidth - len(title) - 2) // (2*len(sepchar)) + fill = sepchar * N + line = "%s %s %s" % (fill, title, fill) + else: + # we want len(sepchar)*N <= fullwidth + # i.e. N <= fullwidth // len(sepchar) + line = sepchar * (fullwidth // len(sepchar)) + # in some situations there is room for an extra sepchar at the right, + # in particular if we consider that with a sepchar like "_ " the + # trailing space is not important at the end of the line + if len(line) + len(sepchar.rstrip()) <= fullwidth: + line += sepchar.rstrip() + + self.line(line, **kw) + + def write(self, msg, **kw): + if msg: + if not isinstance(msg, (bytes, text)): + msg = text(msg) + if self.hasmarkup and kw: + markupmsg = self.markup(msg, **kw) + else: + markupmsg = msg + write_out(self._file, markupmsg) + + def line(self, s='', **kw): + self.write(s, **kw) + self._checkfill(s) + self.write('\n') + + def reline(self, line, **kw): + if not self.hasmarkup: + raise ValueError("cannot use rewrite-line without terminal") + self.write(line, **kw) + self._checkfill(line) + self.write('\r') + self._lastlen = len(line) + + def _checkfill(self, line): + diff2last = self._lastlen - len(line) + if diff2last > 0: + self.write(" " * diff2last) + +class Win32ConsoleWriter(TerminalWriter): + def write(self, msg, **kw): + if msg: + if not isinstance(msg, (bytes, text)): + msg = text(msg) + oldcolors = None + if self.hasmarkup and kw: + handle = GetStdHandle(STD_OUTPUT_HANDLE) + oldcolors = GetConsoleInfo(handle).wAttributes + default_bg = oldcolors & 0x00F0 + attr = default_bg + if kw.pop('bold', False): + attr |= FOREGROUND_INTENSITY + + if kw.pop('red', False): + attr |= FOREGROUND_RED + elif kw.pop('blue', False): + attr |= FOREGROUND_BLUE + elif kw.pop('green', False): + attr |= FOREGROUND_GREEN + elif kw.pop('yellow', False): + attr |= FOREGROUND_GREEN|FOREGROUND_RED + else: + attr |= oldcolors & 0x0007 + + SetConsoleTextAttribute(handle, attr) + write_out(self._file, msg) + if oldcolors: + SetConsoleTextAttribute(handle, oldcolors) + +class WriteFile(object): + def __init__(self, writemethod, encoding=None): + self.encoding = encoding + self._writemethod = writemethod + + def write(self, data): + if self.encoding: + data = data.encode(self.encoding, "replace") + self._writemethod(data) + + def flush(self): + return + + +if win32_and_ctypes: + TerminalWriter = Win32ConsoleWriter + import ctypes + from ctypes import wintypes + + # ctypes access to the Windows console + STD_OUTPUT_HANDLE = -11 + STD_ERROR_HANDLE = -12 + FOREGROUND_BLACK = 0x0000 # black text + FOREGROUND_BLUE = 0x0001 # text color contains blue. + FOREGROUND_GREEN = 0x0002 # text color contains green. + FOREGROUND_RED = 0x0004 # text color contains red. + FOREGROUND_WHITE = 0x0007 + FOREGROUND_INTENSITY = 0x0008 # text color is intensified. + BACKGROUND_BLACK = 0x0000 # background color black + BACKGROUND_BLUE = 0x0010 # background color contains blue. + BACKGROUND_GREEN = 0x0020 # background color contains green. + BACKGROUND_RED = 0x0040 # background color contains red. + BACKGROUND_WHITE = 0x0070 + BACKGROUND_INTENSITY = 0x0080 # background color is intensified. + + SHORT = ctypes.c_short + class COORD(ctypes.Structure): + _fields_ = [('X', SHORT), + ('Y', SHORT)] + class SMALL_RECT(ctypes.Structure): + _fields_ = [('Left', SHORT), + ('Top', SHORT), + ('Right', SHORT), + ('Bottom', SHORT)] + class CONSOLE_SCREEN_BUFFER_INFO(ctypes.Structure): + _fields_ = [('dwSize', COORD), + ('dwCursorPosition', COORD), + ('wAttributes', wintypes.WORD), + ('srWindow', SMALL_RECT), + ('dwMaximumWindowSize', COORD)] + + _GetStdHandle = ctypes.windll.kernel32.GetStdHandle + _GetStdHandle.argtypes = [wintypes.DWORD] + _GetStdHandle.restype = wintypes.HANDLE + def GetStdHandle(kind): + return _GetStdHandle(kind) + + SetConsoleTextAttribute = ctypes.windll.kernel32.SetConsoleTextAttribute + SetConsoleTextAttribute.argtypes = [wintypes.HANDLE, wintypes.WORD] + SetConsoleTextAttribute.restype = wintypes.BOOL + + _GetConsoleScreenBufferInfo = \ + ctypes.windll.kernel32.GetConsoleScreenBufferInfo + _GetConsoleScreenBufferInfo.argtypes = [wintypes.HANDLE, + ctypes.POINTER(CONSOLE_SCREEN_BUFFER_INFO)] + _GetConsoleScreenBufferInfo.restype = wintypes.BOOL + def GetConsoleInfo(handle): + info = CONSOLE_SCREEN_BUFFER_INFO() + _GetConsoleScreenBufferInfo(handle, ctypes.byref(info)) + return info + + def _getdimensions(): + handle = GetStdHandle(STD_OUTPUT_HANDLE) + info = GetConsoleInfo(handle) + # Substract one from the width, otherwise the cursor wraps + # and the ending \n causes an empty line to display. + return info.dwSize.Y, info.dwSize.X - 1 + +def write_out(fil, msg): + # XXX sometimes "msg" is of type bytes, sometimes text which + # complicates the situation. Should we try to enforce unicode? + try: + # on py27 and above writing out to sys.stdout with an encoding + # should usually work for unicode messages (if the encoding is + # capable of it) + fil.write(msg) + except UnicodeEncodeError: + # on py26 it might not work because stdout expects bytes + if fil.encoding: + try: + fil.write(msg.encode(fil.encoding)) + except UnicodeEncodeError: + # it might still fail if the encoding is not capable + pass + else: + fil.flush() + return + # fallback: escape all unicode characters + msg = msg.encode("unicode-escape").decode("ascii") + fil.write(msg) + fil.flush() diff --git a/python/py/py/_log/__init__.py b/python/py/py/_log/__init__.py new file mode 100644 index 000000000..fad62e960 --- /dev/null +++ b/python/py/py/_log/__init__.py @@ -0,0 +1,2 @@ +""" logging API ('producers' and 'consumers' connected via keywords) """ + diff --git a/python/py/py/_log/log.py b/python/py/py/_log/log.py new file mode 100644 index 000000000..ce47e8c75 --- /dev/null +++ b/python/py/py/_log/log.py @@ -0,0 +1,186 @@ +""" +basic logging functionality based on a producer/consumer scheme. + +XXX implement this API: (maybe put it into slogger.py?) + + log = Logger( + info=py.log.STDOUT, + debug=py.log.STDOUT, + command=None) + log.info("hello", "world") + log.command("hello", "world") + + log = Logger(info=Logger(something=...), + debug=py.log.STDOUT, + command=None) +""" +import py, sys + +class Message(object): + def __init__(self, keywords, args): + self.keywords = keywords + self.args = args + + def content(self): + return " ".join(map(str, self.args)) + + def prefix(self): + return "[%s] " % (":".join(self.keywords)) + + def __str__(self): + return self.prefix() + self.content() + + +class Producer(object): + """ (deprecated) Log producer API which sends messages to be logged + to a 'consumer' object, which then prints them to stdout, + stderr, files, etc. Used extensively by PyPy-1.1. + """ + + Message = Message # to allow later customization + keywords2consumer = {} + + def __init__(self, keywords, keywordmapper=None, **kw): + if hasattr(keywords, 'split'): + keywords = tuple(keywords.split()) + self._keywords = keywords + if keywordmapper is None: + keywordmapper = default_keywordmapper + self._keywordmapper = keywordmapper + + def __repr__(self): + return "<py.log.Producer %s>" % ":".join(self._keywords) + + def __getattr__(self, name): + if '_' in name: + raise AttributeError(name) + producer = self.__class__(self._keywords + (name,)) + setattr(self, name, producer) + return producer + + def __call__(self, *args): + """ write a message to the appropriate consumer(s) """ + func = self._keywordmapper.getconsumer(self._keywords) + if func is not None: + func(self.Message(self._keywords, args)) + +class KeywordMapper: + def __init__(self): + self.keywords2consumer = {} + + def getstate(self): + return self.keywords2consumer.copy() + def setstate(self, state): + self.keywords2consumer.clear() + self.keywords2consumer.update(state) + + def getconsumer(self, keywords): + """ return a consumer matching the given keywords. + + tries to find the most suitable consumer by walking, starting from + the back, the list of keywords, the first consumer matching a + keyword is returned (falling back to py.log.default) + """ + for i in range(len(keywords), 0, -1): + try: + return self.keywords2consumer[keywords[:i]] + except KeyError: + continue + return self.keywords2consumer.get('default', default_consumer) + + def setconsumer(self, keywords, consumer): + """ set a consumer for a set of keywords. """ + # normalize to tuples + if isinstance(keywords, str): + keywords = tuple(filter(None, keywords.split())) + elif hasattr(keywords, '_keywords'): + keywords = keywords._keywords + elif not isinstance(keywords, tuple): + raise TypeError("key %r is not a string or tuple" % (keywords,)) + if consumer is not None and not py.builtin.callable(consumer): + if not hasattr(consumer, 'write'): + raise TypeError( + "%r should be None, callable or file-like" % (consumer,)) + consumer = File(consumer) + self.keywords2consumer[keywords] = consumer + +def default_consumer(msg): + """ the default consumer, prints the message to stdout (using 'print') """ + sys.stderr.write(str(msg)+"\n") + +default_keywordmapper = KeywordMapper() + +def setconsumer(keywords, consumer): + default_keywordmapper.setconsumer(keywords, consumer) + +def setstate(state): + default_keywordmapper.setstate(state) +def getstate(): + return default_keywordmapper.getstate() + +# +# Consumers +# + +class File(object): + """ log consumer wrapping a file(-like) object """ + def __init__(self, f): + assert hasattr(f, 'write') + #assert isinstance(f, file) or not hasattr(f, 'open') + self._file = f + + def __call__(self, msg): + """ write a message to the log """ + self._file.write(str(msg) + "\n") + if hasattr(self._file, 'flush'): + self._file.flush() + +class Path(object): + """ log consumer that opens and writes to a Path """ + def __init__(self, filename, append=False, + delayed_create=False, buffering=False): + self._append = append + self._filename = str(filename) + self._buffering = buffering + if not delayed_create: + self._openfile() + + def _openfile(self): + mode = self._append and 'a' or 'w' + f = open(self._filename, mode) + self._file = f + + def __call__(self, msg): + """ write a message to the log """ + if not hasattr(self, "_file"): + self._openfile() + self._file.write(str(msg) + "\n") + if not self._buffering: + self._file.flush() + +def STDOUT(msg): + """ consumer that writes to sys.stdout """ + sys.stdout.write(str(msg)+"\n") + +def STDERR(msg): + """ consumer that writes to sys.stderr """ + sys.stderr.write(str(msg)+"\n") + +class Syslog: + """ consumer that writes to the syslog daemon """ + + def __init__(self, priority = None): + if priority is None: + priority = self.LOG_INFO + self.priority = priority + + def __call__(self, msg): + """ write a message to the log """ + py.std.syslog.syslog(self.priority, str(msg)) + +for _prio in "EMERG ALERT CRIT ERR WARNING NOTICE INFO DEBUG".split(): + _prio = "LOG_" + _prio + try: + setattr(Syslog, _prio, getattr(py.std.syslog, _prio)) + except AttributeError: + pass diff --git a/python/py/py/_log/warning.py b/python/py/py/_log/warning.py new file mode 100644 index 000000000..722e31e91 --- /dev/null +++ b/python/py/py/_log/warning.py @@ -0,0 +1,76 @@ +import py, sys + +class DeprecationWarning(DeprecationWarning): + def __init__(self, msg, path, lineno): + self.msg = msg + self.path = path + self.lineno = lineno + def __repr__(self): + return "%s:%d: %s" %(self.path, self.lineno+1, self.msg) + def __str__(self): + return self.msg + +def _apiwarn(startversion, msg, stacklevel=2, function=None): + # below is mostly COPIED from python2.4/warnings.py's def warn() + # Get context information + if isinstance(stacklevel, str): + frame = sys._getframe(1) + level = 1 + found = frame.f_code.co_filename.find(stacklevel) != -1 + while frame: + co = frame.f_code + if co.co_filename.find(stacklevel) == -1: + if found: + stacklevel = level + break + else: + found = True + level += 1 + frame = frame.f_back + else: + stacklevel = 1 + msg = "%s (since version %s)" %(msg, startversion) + warn(msg, stacklevel=stacklevel+1, function=function) + +def warn(msg, stacklevel=1, function=None): + if function is not None: + filename = py.std.inspect.getfile(function) + lineno = py.code.getrawcode(function).co_firstlineno + else: + try: + caller = sys._getframe(stacklevel) + except ValueError: + globals = sys.__dict__ + lineno = 1 + else: + globals = caller.f_globals + lineno = caller.f_lineno + if '__name__' in globals: + module = globals['__name__'] + else: + module = "<string>" + filename = globals.get('__file__') + if filename: + fnl = filename.lower() + if fnl.endswith(".pyc") or fnl.endswith(".pyo"): + filename = filename[:-1] + elif fnl.endswith("$py.class"): + filename = filename.replace('$py.class', '.py') + else: + if module == "__main__": + try: + filename = sys.argv[0] + except AttributeError: + # embedded interpreters don't have sys.argv, see bug #839151 + filename = '__main__' + if not filename: + filename = module + path = py.path.local(filename) + warning = DeprecationWarning(msg, path, lineno) + py.std.warnings.warn_explicit(warning, category=Warning, + filename=str(warning.path), + lineno=warning.lineno, + registry=py.std.warnings.__dict__.setdefault( + "__warningsregistry__", {}) + ) + diff --git a/python/py/py/_path/__init__.py b/python/py/py/_path/__init__.py new file mode 100644 index 000000000..51f3246f8 --- /dev/null +++ b/python/py/py/_path/__init__.py @@ -0,0 +1 @@ +""" unified file system api """ diff --git a/python/py/py/_path/cacheutil.py b/python/py/py/_path/cacheutil.py new file mode 100644 index 000000000..992250475 --- /dev/null +++ b/python/py/py/_path/cacheutil.py @@ -0,0 +1,114 @@ +""" +This module contains multithread-safe cache implementations. + +All Caches have + + getorbuild(key, builder) + delentry(key) + +methods and allow configuration when instantiating the cache class. +""" +from time import time as gettime + +class BasicCache(object): + def __init__(self, maxentries=128): + self.maxentries = maxentries + self.prunenum = int(maxentries - maxentries/8) + self._dict = {} + + def clear(self): + self._dict.clear() + + def _getentry(self, key): + return self._dict[key] + + def _putentry(self, key, entry): + self._prunelowestweight() + self._dict[key] = entry + + def delentry(self, key, raising=False): + try: + del self._dict[key] + except KeyError: + if raising: + raise + + def getorbuild(self, key, builder): + try: + entry = self._getentry(key) + except KeyError: + entry = self._build(key, builder) + self._putentry(key, entry) + return entry.value + + def _prunelowestweight(self): + """ prune out entries with lowest weight. """ + numentries = len(self._dict) + if numentries >= self.maxentries: + # evict according to entry's weight + items = [(entry.weight, key) + for key, entry in self._dict.items()] + items.sort() + index = numentries - self.prunenum + if index > 0: + for weight, key in items[:index]: + # in MT situations the element might be gone + self.delentry(key, raising=False) + +class BuildcostAccessCache(BasicCache): + """ A BuildTime/Access-counting cache implementation. + the weight of a value is computed as the product of + + num-accesses-of-a-value * time-to-build-the-value + + The values with the least such weights are evicted + if the cache maxentries threshold is superceded. + For implementation flexibility more than one object + might be evicted at a time. + """ + # time function to use for measuring build-times + + def _build(self, key, builder): + start = gettime() + val = builder() + end = gettime() + return WeightedCountingEntry(val, end-start) + + +class WeightedCountingEntry(object): + def __init__(self, value, oneweight): + self._value = value + self.weight = self._oneweight = oneweight + + def value(self): + self.weight += self._oneweight + return self._value + value = property(value) + +class AgingCache(BasicCache): + """ This cache prunes out cache entries that are too old. + """ + def __init__(self, maxentries=128, maxseconds=10.0): + super(AgingCache, self).__init__(maxentries) + self.maxseconds = maxseconds + + def _getentry(self, key): + entry = self._dict[key] + if entry.isexpired(): + self.delentry(key) + raise KeyError(key) + return entry + + def _build(self, key, builder): + val = builder() + entry = AgingEntry(val, gettime() + self.maxseconds) + return entry + +class AgingEntry(object): + def __init__(self, value, expirationtime): + self.value = value + self.weight = expirationtime + + def isexpired(self): + t = gettime() + return t >= self.weight diff --git a/python/py/py/_path/common.py b/python/py/py/_path/common.py new file mode 100644 index 000000000..d407434cb --- /dev/null +++ b/python/py/py/_path/common.py @@ -0,0 +1,403 @@ +""" +""" +import os, sys, posixpath +import py + +# Moved from local.py. +iswin32 = sys.platform == "win32" or (getattr(os, '_name', False) == 'nt') + +class Checkers: + _depend_on_existence = 'exists', 'link', 'dir', 'file' + + def __init__(self, path): + self.path = path + + def dir(self): + raise NotImplementedError + + def file(self): + raise NotImplementedError + + def dotfile(self): + return self.path.basename.startswith('.') + + def ext(self, arg): + if not arg.startswith('.'): + arg = '.' + arg + return self.path.ext == arg + + def exists(self): + raise NotImplementedError + + def basename(self, arg): + return self.path.basename == arg + + def basestarts(self, arg): + return self.path.basename.startswith(arg) + + def relto(self, arg): + return self.path.relto(arg) + + def fnmatch(self, arg): + return self.path.fnmatch(arg) + + def endswith(self, arg): + return str(self.path).endswith(arg) + + def _evaluate(self, kw): + for name, value in kw.items(): + invert = False + meth = None + try: + meth = getattr(self, name) + except AttributeError: + if name[:3] == 'not': + invert = True + try: + meth = getattr(self, name[3:]) + except AttributeError: + pass + if meth is None: + raise TypeError( + "no %r checker available for %r" % (name, self.path)) + try: + if py.code.getrawcode(meth).co_argcount > 1: + if (not meth(value)) ^ invert: + return False + else: + if bool(value) ^ bool(meth()) ^ invert: + return False + except (py.error.ENOENT, py.error.ENOTDIR, py.error.EBUSY): + # EBUSY feels not entirely correct, + # but its kind of necessary since ENOMEDIUM + # is not accessible in python + for name in self._depend_on_existence: + if name in kw: + if kw.get(name): + return False + name = 'not' + name + if name in kw: + if not kw.get(name): + return False + return True + +class NeverRaised(Exception): + pass + +class PathBase(object): + """ shared implementation for filesystem path objects.""" + Checkers = Checkers + + def __div__(self, other): + return self.join(str(other)) + __truediv__ = __div__ # py3k + + def basename(self): + """ basename part of path. """ + return self._getbyspec('basename')[0] + basename = property(basename, None, None, basename.__doc__) + + def dirname(self): + """ dirname part of path. """ + return self._getbyspec('dirname')[0] + dirname = property(dirname, None, None, dirname.__doc__) + + def purebasename(self): + """ pure base name of the path.""" + return self._getbyspec('purebasename')[0] + purebasename = property(purebasename, None, None, purebasename.__doc__) + + def ext(self): + """ extension of the path (including the '.').""" + return self._getbyspec('ext')[0] + ext = property(ext, None, None, ext.__doc__) + + def dirpath(self, *args, **kwargs): + """ return the directory path joined with any given path arguments. """ + return self.new(basename='').join(*args, **kwargs) + + def read_binary(self): + """ read and return a bytestring from reading the path. """ + with self.open('rb') as f: + return f.read() + + def read_text(self, encoding): + """ read and return a Unicode string from reading the path. """ + with self.open("r", encoding=encoding) as f: + return f.read() + + + def read(self, mode='r'): + """ read and return a bytestring from reading the path. """ + with self.open(mode) as f: + return f.read() + + def readlines(self, cr=1): + """ read and return a list of lines from the path. if cr is False, the +newline will be removed from the end of each line. """ + if not cr: + content = self.read('rU') + return content.split('\n') + else: + f = self.open('rU') + try: + return f.readlines() + finally: + f.close() + + def load(self): + """ (deprecated) return object unpickled from self.read() """ + f = self.open('rb') + try: + return py.error.checked_call(py.std.pickle.load, f) + finally: + f.close() + + def move(self, target): + """ move this path to target. """ + if target.relto(self): + raise py.error.EINVAL(target, + "cannot move path into a subdirectory of itself") + try: + self.rename(target) + except py.error.EXDEV: # invalid cross-device link + self.copy(target) + self.remove() + + def __repr__(self): + """ return a string representation of this path. """ + return repr(str(self)) + + def check(self, **kw): + """ check a path for existence and properties. + + Without arguments, return True if the path exists, otherwise False. + + valid checkers:: + + file=1 # is a file + file=0 # is not a file (may not even exist) + dir=1 # is a dir + link=1 # is a link + exists=1 # exists + + You can specify multiple checker definitions, for example:: + + path.check(file=1, link=1) # a link pointing to a file + """ + if not kw: + kw = {'exists' : 1} + return self.Checkers(self)._evaluate(kw) + + def fnmatch(self, pattern): + """return true if the basename/fullname matches the glob-'pattern'. + + valid pattern characters:: + + * matches everything + ? matches any single character + [seq] matches any character in seq + [!seq] matches any char not in seq + + If the pattern contains a path-separator then the full path + is used for pattern matching and a '*' is prepended to the + pattern. + + if the pattern doesn't contain a path-separator the pattern + is only matched against the basename. + """ + return FNMatcher(pattern)(self) + + def relto(self, relpath): + """ return a string which is the relative part of the path + to the given 'relpath'. + """ + if not isinstance(relpath, (str, PathBase)): + raise TypeError("%r: not a string or path object" %(relpath,)) + strrelpath = str(relpath) + if strrelpath and strrelpath[-1] != self.sep: + strrelpath += self.sep + #assert strrelpath[-1] == self.sep + #assert strrelpath[-2] != self.sep + strself = self.strpath + if sys.platform == "win32" or getattr(os, '_name', None) == 'nt': + if os.path.normcase(strself).startswith( + os.path.normcase(strrelpath)): + return strself[len(strrelpath):] + elif strself.startswith(strrelpath): + return strself[len(strrelpath):] + return "" + + def ensure_dir(self, *args): + """ ensure the path joined with args is a directory. """ + return self.ensure(*args, **{"dir": True}) + + def bestrelpath(self, dest): + """ return a string which is a relative path from self + (assumed to be a directory) to dest such that + self.join(bestrelpath) == dest and if not such + path can be determined return dest. + """ + try: + if self == dest: + return os.curdir + base = self.common(dest) + if not base: # can be the case on windows + return str(dest) + self2base = self.relto(base) + reldest = dest.relto(base) + if self2base: + n = self2base.count(self.sep) + 1 + else: + n = 0 + l = [os.pardir] * n + if reldest: + l.append(reldest) + target = dest.sep.join(l) + return target + except AttributeError: + return str(dest) + + def exists(self): + return self.check() + + def isdir(self): + return self.check(dir=1) + + def isfile(self): + return self.check(file=1) + + def parts(self, reverse=False): + """ return a root-first list of all ancestor directories + plus the path itself. + """ + current = self + l = [self] + while 1: + last = current + current = current.dirpath() + if last == current: + break + l.append(current) + if not reverse: + l.reverse() + return l + + def common(self, other): + """ return the common part shared with the other path + or None if there is no common part. + """ + last = None + for x, y in zip(self.parts(), other.parts()): + if x != y: + return last + last = x + return last + + def __add__(self, other): + """ return new path object with 'other' added to the basename""" + return self.new(basename=self.basename+str(other)) + + def __cmp__(self, other): + """ return sort value (-1, 0, +1). """ + try: + return cmp(self.strpath, other.strpath) + except AttributeError: + return cmp(str(self), str(other)) # self.path, other.path) + + def __lt__(self, other): + try: + return self.strpath < other.strpath + except AttributeError: + return str(self) < str(other) + + def visit(self, fil=None, rec=None, ignore=NeverRaised, bf=False, sort=False): + """ yields all paths below the current one + + fil is a filter (glob pattern or callable), if not matching the + path will not be yielded, defaulting to None (everything is + returned) + + rec is a filter (glob pattern or callable) that controls whether + a node is descended, defaulting to None + + ignore is an Exception class that is ignoredwhen calling dirlist() + on any of the paths (by default, all exceptions are reported) + + bf if True will cause a breadthfirst search instead of the + default depthfirst. Default: False + + sort if True will sort entries within each directory level. + """ + for x in Visitor(fil, rec, ignore, bf, sort).gen(self): + yield x + + def _sortlist(self, res, sort): + if sort: + if hasattr(sort, '__call__'): + res.sort(sort) + else: + res.sort() + + def samefile(self, other): + """ return True if other refers to the same stat object as self. """ + return self.strpath == str(other) + +class Visitor: + def __init__(self, fil, rec, ignore, bf, sort): + if isinstance(fil, str): + fil = FNMatcher(fil) + if isinstance(rec, str): + self.rec = FNMatcher(rec) + elif not hasattr(rec, '__call__') and rec: + self.rec = lambda path: True + else: + self.rec = rec + self.fil = fil + self.ignore = ignore + self.breadthfirst = bf + self.optsort = sort and sorted or (lambda x: x) + + def gen(self, path): + try: + entries = path.listdir() + except self.ignore: + return + rec = self.rec + dirs = self.optsort([p for p in entries + if p.check(dir=1) and (rec is None or rec(p))]) + if not self.breadthfirst: + for subdir in dirs: + for p in self.gen(subdir): + yield p + for p in self.optsort(entries): + if self.fil is None or self.fil(p): + yield p + if self.breadthfirst: + for subdir in dirs: + for p in self.gen(subdir): + yield p + +class FNMatcher: + def __init__(self, pattern): + self.pattern = pattern + + def __call__(self, path): + pattern = self.pattern + + if (pattern.find(path.sep) == -1 and + iswin32 and + pattern.find(posixpath.sep) != -1): + # Running on Windows, the pattern has no Windows path separators, + # and the pattern has one or more Posix path separators. Replace + # the Posix path separators with the Windows path separator. + pattern = pattern.replace(posixpath.sep, path.sep) + + if pattern.find(path.sep) == -1: + name = path.basename + else: + name = str(path) # path.strpath # XXX svn? + if not os.path.isabs(pattern): + pattern = '*' + path.sep + pattern + return py.std.fnmatch.fnmatch(name, pattern) + diff --git a/python/py/py/_path/local.py b/python/py/py/_path/local.py new file mode 100644 index 000000000..d569404ec --- /dev/null +++ b/python/py/py/_path/local.py @@ -0,0 +1,911 @@ +""" +local path implementation. +""" +from __future__ import with_statement + +from contextlib import contextmanager +import sys, os, re, atexit, io +import py +from py._path import common +from py._path.common import iswin32 +from stat import S_ISLNK, S_ISDIR, S_ISREG + +from os.path import abspath, normpath, isabs, exists, isdir, isfile, islink, dirname + +if sys.version_info > (3,0): + def map_as_list(func, iter): + return list(map(func, iter)) +else: + map_as_list = map + +class Stat(object): + def __getattr__(self, name): + return getattr(self._osstatresult, "st_" + name) + + def __init__(self, path, osstatresult): + self.path = path + self._osstatresult = osstatresult + + @property + def owner(self): + if iswin32: + raise NotImplementedError("XXX win32") + import pwd + entry = py.error.checked_call(pwd.getpwuid, self.uid) + return entry[0] + + @property + def group(self): + """ return group name of file. """ + if iswin32: + raise NotImplementedError("XXX win32") + import grp + entry = py.error.checked_call(grp.getgrgid, self.gid) + return entry[0] + + def isdir(self): + return S_ISDIR(self._osstatresult.st_mode) + + def isfile(self): + return S_ISREG(self._osstatresult.st_mode) + + def islink(self): + st = self.path.lstat() + return S_ISLNK(self._osstatresult.st_mode) + +class PosixPath(common.PathBase): + def chown(self, user, group, rec=0): + """ change ownership to the given user and group. + user and group may be specified by a number or + by a name. if rec is True change ownership + recursively. + """ + uid = getuserid(user) + gid = getgroupid(group) + if rec: + for x in self.visit(rec=lambda x: x.check(link=0)): + if x.check(link=0): + py.error.checked_call(os.chown, str(x), uid, gid) + py.error.checked_call(os.chown, str(self), uid, gid) + + def readlink(self): + """ return value of a symbolic link. """ + return py.error.checked_call(os.readlink, self.strpath) + + def mklinkto(self, oldname): + """ posix style hard link to another name. """ + py.error.checked_call(os.link, str(oldname), str(self)) + + def mksymlinkto(self, value, absolute=1): + """ create a symbolic link with the given value (pointing to another name). """ + if absolute: + py.error.checked_call(os.symlink, str(value), self.strpath) + else: + base = self.common(value) + # with posix local paths '/' is always a common base + relsource = self.__class__(value).relto(base) + reldest = self.relto(base) + n = reldest.count(self.sep) + target = self.sep.join(('..', )*n + (relsource, )) + py.error.checked_call(os.symlink, target, self.strpath) + +def getuserid(user): + import pwd + if not isinstance(user, int): + user = pwd.getpwnam(user)[2] + return user + +def getgroupid(group): + import grp + if not isinstance(group, int): + group = grp.getgrnam(group)[2] + return group + +FSBase = not iswin32 and PosixPath or common.PathBase + +class LocalPath(FSBase): + """ object oriented interface to os.path and other local filesystem + related information. + """ + class ImportMismatchError(ImportError): + """ raised on pyimport() if there is a mismatch of __file__'s""" + + sep = os.sep + class Checkers(common.Checkers): + def _stat(self): + try: + return self._statcache + except AttributeError: + try: + self._statcache = self.path.stat() + except py.error.ELOOP: + self._statcache = self.path.lstat() + return self._statcache + + def dir(self): + return S_ISDIR(self._stat().mode) + + def file(self): + return S_ISREG(self._stat().mode) + + def exists(self): + return self._stat() + + def link(self): + st = self.path.lstat() + return S_ISLNK(st.mode) + + def __init__(self, path=None, expanduser=False): + """ Initialize and return a local Path instance. + + Path can be relative to the current directory. + If path is None it defaults to the current working directory. + If expanduser is True, tilde-expansion is performed. + Note that Path instances always carry an absolute path. + Note also that passing in a local path object will simply return + the exact same path object. Use new() to get a new copy. + """ + if path is None: + self.strpath = py.error.checked_call(os.getcwd) + elif isinstance(path, common.PathBase): + self.strpath = path.strpath + elif isinstance(path, py.builtin._basestring): + if expanduser: + path = os.path.expanduser(path) + self.strpath = abspath(path) + else: + raise ValueError("can only pass None, Path instances " + "or non-empty strings to LocalPath") + + def __hash__(self): + return hash(self.strpath) + + def __eq__(self, other): + s1 = self.strpath + s2 = getattr(other, "strpath", other) + if iswin32: + s1 = s1.lower() + try: + s2 = s2.lower() + except AttributeError: + return False + return s1 == s2 + + def __ne__(self, other): + return not (self == other) + + def __lt__(self, other): + return self.strpath < getattr(other, "strpath", other) + + def __gt__(self, other): + return self.strpath > getattr(other, "strpath", other) + + def samefile(self, other): + """ return True if 'other' references the same file as 'self'. + """ + other = getattr(other, "strpath", other) + if not isabs(other): + other = abspath(other) + if self == other: + return True + if iswin32: + return False # there is no samefile + return py.error.checked_call( + os.path.samefile, self.strpath, other) + + def remove(self, rec=1, ignore_errors=False): + """ remove a file or directory (or a directory tree if rec=1). + if ignore_errors is True, errors while removing directories will + be ignored. + """ + if self.check(dir=1, link=0): + if rec: + # force remove of readonly files on windows + if iswin32: + self.chmod(448, rec=1) # octcal 0700 + py.error.checked_call(py.std.shutil.rmtree, self.strpath, + ignore_errors=ignore_errors) + else: + py.error.checked_call(os.rmdir, self.strpath) + else: + if iswin32: + self.chmod(448) # octcal 0700 + py.error.checked_call(os.remove, self.strpath) + + def computehash(self, hashtype="md5", chunksize=524288): + """ return hexdigest of hashvalue for this file. """ + try: + try: + import hashlib as mod + except ImportError: + if hashtype == "sha1": + hashtype = "sha" + mod = __import__(hashtype) + hash = getattr(mod, hashtype)() + except (AttributeError, ImportError): + raise ValueError("Don't know how to compute %r hash" %(hashtype,)) + f = self.open('rb') + try: + while 1: + buf = f.read(chunksize) + if not buf: + return hash.hexdigest() + hash.update(buf) + finally: + f.close() + + def new(self, **kw): + """ create a modified version of this path. + the following keyword arguments modify various path parts:: + + a:/some/path/to/a/file.ext + xx drive + xxxxxxxxxxxxxxxxx dirname + xxxxxxxx basename + xxxx purebasename + xxx ext + """ + obj = object.__new__(self.__class__) + if not kw: + obj.strpath = self.strpath + return obj + drive, dirname, basename, purebasename,ext = self._getbyspec( + "drive,dirname,basename,purebasename,ext") + if 'basename' in kw: + if 'purebasename' in kw or 'ext' in kw: + raise ValueError("invalid specification %r" % kw) + else: + pb = kw.setdefault('purebasename', purebasename) + try: + ext = kw['ext'] + except KeyError: + pass + else: + if ext and not ext.startswith('.'): + ext = '.' + ext + kw['basename'] = pb + ext + + if ('dirname' in kw and not kw['dirname']): + kw['dirname'] = drive + else: + kw.setdefault('dirname', dirname) + kw.setdefault('sep', self.sep) + obj.strpath = normpath( + "%(dirname)s%(sep)s%(basename)s" % kw) + return obj + + def _getbyspec(self, spec): + """ see new for what 'spec' can be. """ + res = [] + parts = self.strpath.split(self.sep) + + args = filter(None, spec.split(',') ) + append = res.append + for name in args: + if name == 'drive': + append(parts[0]) + elif name == 'dirname': + append(self.sep.join(parts[:-1])) + else: + basename = parts[-1] + if name == 'basename': + append(basename) + else: + i = basename.rfind('.') + if i == -1: + purebasename, ext = basename, '' + else: + purebasename, ext = basename[:i], basename[i:] + if name == 'purebasename': + append(purebasename) + elif name == 'ext': + append(ext) + else: + raise ValueError("invalid part specification %r" % name) + return res + + def dirpath(self, *args, **kwargs): + """ return the directory path joined with any given path arguments. """ + if not kwargs: + path = object.__new__(self.__class__) + path.strpath = dirname(self.strpath) + if args: + path = path.join(*args) + return path + return super(LocalPath, self).dirpath(*args, **kwargs) + + def join(self, *args, **kwargs): + """ return a new path by appending all 'args' as path + components. if abs=1 is used restart from root if any + of the args is an absolute path. + """ + sep = self.sep + strargs = [getattr(arg, "strpath", arg) for arg in args] + strpath = self.strpath + if kwargs.get('abs'): + newargs = [] + for arg in reversed(strargs): + if isabs(arg): + strpath = arg + strargs = newargs + break + newargs.insert(0, arg) + for arg in strargs: + arg = arg.strip(sep) + if iswin32: + # allow unix style paths even on windows. + arg = arg.strip('/') + arg = arg.replace('/', sep) + strpath = strpath + sep + arg + obj = object.__new__(self.__class__) + obj.strpath = normpath(strpath) + return obj + + def open(self, mode='r', ensure=False, encoding=None): + """ return an opened file with the given mode. + + If ensure is True, create parent directories if needed. + """ + if ensure: + self.dirpath().ensure(dir=1) + if encoding: + return py.error.checked_call(io.open, self.strpath, mode, encoding=encoding) + return py.error.checked_call(open, self.strpath, mode) + + def _fastjoin(self, name): + child = object.__new__(self.__class__) + child.strpath = self.strpath + self.sep + name + return child + + def islink(self): + return islink(self.strpath) + + def check(self, **kw): + if not kw: + return exists(self.strpath) + if len(kw) == 1: + if "dir" in kw: + return not kw["dir"] ^ isdir(self.strpath) + if "file" in kw: + return not kw["file"] ^ isfile(self.strpath) + return super(LocalPath, self).check(**kw) + + _patternchars = set("*?[" + os.path.sep) + def listdir(self, fil=None, sort=None): + """ list directory contents, possibly filter by the given fil func + and possibly sorted. + """ + if fil is None and sort is None: + names = py.error.checked_call(os.listdir, self.strpath) + return map_as_list(self._fastjoin, names) + if isinstance(fil, py.builtin._basestring): + if not self._patternchars.intersection(fil): + child = self._fastjoin(fil) + if exists(child.strpath): + return [child] + return [] + fil = common.FNMatcher(fil) + names = py.error.checked_call(os.listdir, self.strpath) + res = [] + for name in names: + child = self._fastjoin(name) + if fil is None or fil(child): + res.append(child) + self._sortlist(res, sort) + return res + + def size(self): + """ return size of the underlying file object """ + return self.stat().size + + def mtime(self): + """ return last modification time of the path. """ + return self.stat().mtime + + def copy(self, target, mode=False): + """ copy path to target.""" + if self.check(file=1): + if target.check(dir=1): + target = target.join(self.basename) + assert self!=target + copychunked(self, target) + if mode: + copymode(self.strpath, target.strpath) + else: + def rec(p): + return p.check(link=0) + for x in self.visit(rec=rec): + relpath = x.relto(self) + newx = target.join(relpath) + newx.dirpath().ensure(dir=1) + if x.check(link=1): + newx.mksymlinkto(x.readlink()) + continue + elif x.check(file=1): + copychunked(x, newx) + elif x.check(dir=1): + newx.ensure(dir=1) + if mode: + copymode(x.strpath, newx.strpath) + + def rename(self, target): + """ rename this path to target. """ + target = getattr(target, "strpath", target) + return py.error.checked_call(os.rename, self.strpath, target) + + def dump(self, obj, bin=1): + """ pickle object into path location""" + f = self.open('wb') + try: + py.error.checked_call(py.std.pickle.dump, obj, f, bin) + finally: + f.close() + + def mkdir(self, *args): + """ create & return the directory joined with args. """ + p = self.join(*args) + py.error.checked_call(os.mkdir, getattr(p, "strpath", p)) + return p + + def write_binary(self, data, ensure=False): + """ write binary data into path. If ensure is True create + missing parent directories. + """ + if ensure: + self.dirpath().ensure(dir=1) + with self.open('wb') as f: + f.write(data) + + def write_text(self, data, encoding, ensure=False): + """ write text data into path using the specified encoding. + If ensure is True create missing parent directories. + """ + if ensure: + self.dirpath().ensure(dir=1) + with self.open('w', encoding=encoding) as f: + f.write(data) + + def write(self, data, mode='w', ensure=False): + """ write data into path. If ensure is True create + missing parent directories. + """ + if ensure: + self.dirpath().ensure(dir=1) + if 'b' in mode: + if not py.builtin._isbytes(data): + raise ValueError("can only process bytes") + else: + if not py.builtin._istext(data): + if not py.builtin._isbytes(data): + data = str(data) + else: + data = py.builtin._totext(data, sys.getdefaultencoding()) + f = self.open(mode) + try: + f.write(data) + finally: + f.close() + + def _ensuredirs(self): + parent = self.dirpath() + if parent == self: + return self + if parent.check(dir=0): + parent._ensuredirs() + if self.check(dir=0): + try: + self.mkdir() + except py.error.EEXIST: + # race condition: file/dir created by another thread/process. + # complain if it is not a dir + if self.check(dir=0): + raise + return self + + def ensure(self, *args, **kwargs): + """ ensure that an args-joined path exists (by default as + a file). if you specify a keyword argument 'dir=True' + then the path is forced to be a directory path. + """ + p = self.join(*args) + if kwargs.get('dir', 0): + return p._ensuredirs() + else: + p.dirpath()._ensuredirs() + if not p.check(file=1): + p.open('w').close() + return p + + def stat(self, raising=True): + """ Return an os.stat() tuple. """ + if raising == True: + return Stat(self, py.error.checked_call(os.stat, self.strpath)) + try: + return Stat(self, os.stat(self.strpath)) + except KeyboardInterrupt: + raise + except Exception: + return None + + def lstat(self): + """ Return an os.lstat() tuple. """ + return Stat(self, py.error.checked_call(os.lstat, self.strpath)) + + def setmtime(self, mtime=None): + """ set modification time for the given path. if 'mtime' is None + (the default) then the file's mtime is set to current time. + + Note that the resolution for 'mtime' is platform dependent. + """ + if mtime is None: + return py.error.checked_call(os.utime, self.strpath, mtime) + try: + return py.error.checked_call(os.utime, self.strpath, (-1, mtime)) + except py.error.EINVAL: + return py.error.checked_call(os.utime, self.strpath, (self.atime(), mtime)) + + def chdir(self): + """ change directory to self and return old current directory """ + try: + old = self.__class__() + except py.error.ENOENT: + old = None + py.error.checked_call(os.chdir, self.strpath) + return old + + + @contextmanager + def as_cwd(self): + """ return context manager which changes to current dir during the + managed "with" context. On __enter__ it returns the old dir. + """ + old = self.chdir() + try: + yield old + finally: + old.chdir() + + def realpath(self): + """ return a new path which contains no symbolic links.""" + return self.__class__(os.path.realpath(self.strpath)) + + def atime(self): + """ return last access time of the path. """ + return self.stat().atime + + def __repr__(self): + return 'local(%r)' % self.strpath + + def __str__(self): + """ return string representation of the Path. """ + return self.strpath + + def chmod(self, mode, rec=0): + """ change permissions to the given mode. If mode is an + integer it directly encodes the os-specific modes. + if rec is True perform recursively. + """ + if not isinstance(mode, int): + raise TypeError("mode %r must be an integer" % (mode,)) + if rec: + for x in self.visit(rec=rec): + py.error.checked_call(os.chmod, str(x), mode) + py.error.checked_call(os.chmod, self.strpath, mode) + + def pypkgpath(self): + """ return the Python package path by looking for the last + directory upwards which still contains an __init__.py. + Return None if a pkgpath can not be determined. + """ + pkgpath = None + for parent in self.parts(reverse=True): + if parent.isdir(): + if not parent.join('__init__.py').exists(): + break + if not isimportable(parent.basename): + break + pkgpath = parent + return pkgpath + + def _ensuresyspath(self, ensuremode, path): + if ensuremode: + s = str(path) + if ensuremode == "append": + if s not in sys.path: + sys.path.append(s) + else: + if s != sys.path[0]: + sys.path.insert(0, s) + + def pyimport(self, modname=None, ensuresyspath=True): + """ return path as an imported python module. + + If modname is None, look for the containing package + and construct an according module name. + The module will be put/looked up in sys.modules. + if ensuresyspath is True then the root dir for importing + the file (taking __init__.py files into account) will + be prepended to sys.path if it isn't there already. + If ensuresyspath=="append" the root dir will be appended + if it isn't already contained in sys.path. + if ensuresyspath is False no modification of syspath happens. + """ + if not self.check(): + raise py.error.ENOENT(self) + + pkgpath = None + if modname is None: + pkgpath = self.pypkgpath() + if pkgpath is not None: + pkgroot = pkgpath.dirpath() + names = self.new(ext="").relto(pkgroot).split(self.sep) + if names[-1] == "__init__": + names.pop() + modname = ".".join(names) + else: + pkgroot = self.dirpath() + modname = self.purebasename + + self._ensuresyspath(ensuresyspath, pkgroot) + __import__(modname) + mod = sys.modules[modname] + if self.basename == "__init__.py": + return mod # we don't check anything as we might + # we in a namespace package ... too icky to check + modfile = mod.__file__ + if modfile[-4:] in ('.pyc', '.pyo'): + modfile = modfile[:-1] + elif modfile.endswith('$py.class'): + modfile = modfile[:-9] + '.py' + if modfile.endswith(os.path.sep + "__init__.py"): + if self.basename != "__init__.py": + modfile = modfile[:-12] + try: + issame = self.samefile(modfile) + except py.error.ENOENT: + issame = False + if not issame: + raise self.ImportMismatchError(modname, modfile, self) + return mod + else: + try: + return sys.modules[modname] + except KeyError: + # we have a custom modname, do a pseudo-import + mod = py.std.types.ModuleType(modname) + mod.__file__ = str(self) + sys.modules[modname] = mod + try: + py.builtin.execfile(str(self), mod.__dict__) + except: + del sys.modules[modname] + raise + return mod + + def sysexec(self, *argv, **popen_opts): + """ return stdout text from executing a system child process, + where the 'self' path points to executable. + The process is directly invoked and not through a system shell. + """ + from subprocess import Popen, PIPE + argv = map_as_list(str, argv) + popen_opts['stdout'] = popen_opts['stderr'] = PIPE + proc = Popen([str(self)] + argv, **popen_opts) + stdout, stderr = proc.communicate() + ret = proc.wait() + if py.builtin._isbytes(stdout): + stdout = py.builtin._totext(stdout, sys.getdefaultencoding()) + if ret != 0: + if py.builtin._isbytes(stderr): + stderr = py.builtin._totext(stderr, sys.getdefaultencoding()) + raise py.process.cmdexec.Error(ret, ret, str(self), + stdout, stderr,) + return stdout + + def sysfind(cls, name, checker=None, paths=None): + """ return a path object found by looking at the systems + underlying PATH specification. If the checker is not None + it will be invoked to filter matching paths. If a binary + cannot be found, None is returned + Note: This is probably not working on plain win32 systems + but may work on cygwin. + """ + if isabs(name): + p = py.path.local(name) + if p.check(file=1): + return p + else: + if paths is None: + if iswin32: + paths = py.std.os.environ['Path'].split(';') + if '' not in paths and '.' not in paths: + paths.append('.') + try: + systemroot = os.environ['SYSTEMROOT'] + except KeyError: + pass + else: + paths = [re.sub('%SystemRoot%', systemroot, path) + for path in paths] + else: + paths = py.std.os.environ['PATH'].split(':') + tryadd = [] + if iswin32: + tryadd += os.environ['PATHEXT'].split(os.pathsep) + tryadd.append("") + + for x in paths: + for addext in tryadd: + p = py.path.local(x).join(name, abs=True) + addext + try: + if p.check(file=1): + if checker: + if not checker(p): + continue + return p + except py.error.EACCES: + pass + return None + sysfind = classmethod(sysfind) + + def _gethomedir(cls): + try: + x = os.environ['HOME'] + except KeyError: + try: + x = os.environ["HOMEDRIVE"] + os.environ['HOMEPATH'] + except KeyError: + return None + return cls(x) + _gethomedir = classmethod(_gethomedir) + + #""" + #special class constructors for local filesystem paths + #""" + def get_temproot(cls): + """ return the system's temporary directory + (where tempfiles are usually created in) + """ + return py.path.local(py.std.tempfile.gettempdir()) + get_temproot = classmethod(get_temproot) + + def mkdtemp(cls, rootdir=None): + """ return a Path object pointing to a fresh new temporary directory + (which we created ourself). + """ + import tempfile + if rootdir is None: + rootdir = cls.get_temproot() + return cls(py.error.checked_call(tempfile.mkdtemp, dir=str(rootdir))) + mkdtemp = classmethod(mkdtemp) + + def make_numbered_dir(cls, prefix='session-', rootdir=None, keep=3, + lock_timeout = 172800): # two days + """ return unique directory with a number greater than the current + maximum one. The number is assumed to start directly after prefix. + if keep is true directories with a number less than (maxnum-keep) + will be removed. + """ + if rootdir is None: + rootdir = cls.get_temproot() + + def parse_num(path): + """ parse the number out of a path (if it matches the prefix) """ + bn = path.basename + if bn.startswith(prefix): + try: + return int(bn[len(prefix):]) + except ValueError: + pass + + # compute the maximum number currently in use with the + # prefix + lastmax = None + while True: + maxnum = -1 + for path in rootdir.listdir(): + num = parse_num(path) + if num is not None: + maxnum = max(maxnum, num) + + # make the new directory + try: + udir = rootdir.mkdir(prefix + str(maxnum+1)) + except py.error.EEXIST: + # race condition: another thread/process created the dir + # in the meantime. Try counting again + if lastmax == maxnum: + raise + lastmax = maxnum + continue + break + + # put a .lock file in the new directory that will be removed at + # process exit + if lock_timeout: + lockfile = udir.join('.lock') + mypid = os.getpid() + if hasattr(lockfile, 'mksymlinkto'): + lockfile.mksymlinkto(str(mypid)) + else: + lockfile.write(str(mypid)) + def try_remove_lockfile(): + # in a fork() situation, only the last process should + # remove the .lock, otherwise the other processes run the + # risk of seeing their temporary dir disappear. For now + # we remove the .lock in the parent only (i.e. we assume + # that the children finish before the parent). + if os.getpid() != mypid: + return + try: + lockfile.remove() + except py.error.Error: + pass + atexit.register(try_remove_lockfile) + + # prune old directories + if keep: + for path in rootdir.listdir(): + num = parse_num(path) + if num is not None and num <= (maxnum - keep): + lf = path.join('.lock') + try: + t1 = lf.lstat().mtime + t2 = lockfile.lstat().mtime + if not lock_timeout or abs(t2-t1) < lock_timeout: + continue # skip directories still locked + except py.error.Error: + pass # assume that it means that there is no 'lf' + try: + path.remove(rec=1) + except KeyboardInterrupt: + raise + except: # this might be py.error.Error, WindowsError ... + pass + + # make link... + try: + username = os.environ['USER'] #linux, et al + except KeyError: + try: + username = os.environ['USERNAME'] #windows + except KeyError: + username = 'current' + + src = str(udir) + dest = src[:src.rfind('-')] + '-' + username + try: + os.unlink(dest) + except OSError: + pass + try: + os.symlink(src, dest) + except (OSError, AttributeError, NotImplementedError): + pass + + return udir + make_numbered_dir = classmethod(make_numbered_dir) + +def copymode(src, dest): + py.std.shutil.copymode(src, dest) + +def copychunked(src, dest): + chunksize = 524288 # half a meg of bytes + fsrc = src.open('rb') + try: + fdest = dest.open('wb') + try: + while 1: + buf = fsrc.read(chunksize) + if not buf: + break + fdest.write(buf) + finally: + fdest.close() + finally: + fsrc.close() + +def isimportable(name): + if name and (name[0].isalpha() or name[0] == '_'): + name = name.replace("_", '') + return not name or name.isalnum() diff --git a/python/py/py/_path/svnurl.py b/python/py/py/_path/svnurl.py new file mode 100644 index 000000000..78d71317a --- /dev/null +++ b/python/py/py/_path/svnurl.py @@ -0,0 +1,380 @@ +""" +module defining a subversion path object based on the external +command 'svn'. This modules aims to work with svn 1.3 and higher +but might also interact well with earlier versions. +""" + +import os, sys, time, re +import py +from py import path, process +from py._path import common +from py._path import svnwc as svncommon +from py._path.cacheutil import BuildcostAccessCache, AgingCache + +DEBUG=False + +class SvnCommandPath(svncommon.SvnPathBase): + """ path implementation that offers access to (possibly remote) subversion + repositories. """ + + _lsrevcache = BuildcostAccessCache(maxentries=128) + _lsnorevcache = AgingCache(maxentries=1000, maxseconds=60.0) + + def __new__(cls, path, rev=None, auth=None): + self = object.__new__(cls) + if isinstance(path, cls): + rev = path.rev + auth = path.auth + path = path.strpath + svncommon.checkbadchars(path) + path = path.rstrip('/') + self.strpath = path + self.rev = rev + self.auth = auth + return self + + def __repr__(self): + if self.rev == -1: + return 'svnurl(%r)' % self.strpath + else: + return 'svnurl(%r, %r)' % (self.strpath, self.rev) + + def _svnwithrev(self, cmd, *args): + """ execute an svn command, append our own url and revision """ + if self.rev is None: + return self._svnwrite(cmd, *args) + else: + args = ['-r', self.rev] + list(args) + return self._svnwrite(cmd, *args) + + def _svnwrite(self, cmd, *args): + """ execute an svn command, append our own url """ + l = ['svn %s' % cmd] + args = ['"%s"' % self._escape(item) for item in args] + l.extend(args) + l.append('"%s"' % self._encodedurl()) + # fixing the locale because we can't otherwise parse + string = " ".join(l) + if DEBUG: + print("execing %s" % string) + out = self._svncmdexecauth(string) + return out + + def _svncmdexecauth(self, cmd): + """ execute an svn command 'as is' """ + cmd = svncommon.fixlocale() + cmd + if self.auth is not None: + cmd += ' ' + self.auth.makecmdoptions() + return self._cmdexec(cmd) + + def _cmdexec(self, cmd): + try: + out = process.cmdexec(cmd) + except py.process.cmdexec.Error: + e = sys.exc_info()[1] + if (e.err.find('File Exists') != -1 or + e.err.find('File already exists') != -1): + raise py.error.EEXIST(self) + raise + return out + + def _svnpopenauth(self, cmd): + """ execute an svn command, return a pipe for reading stdin """ + cmd = svncommon.fixlocale() + cmd + if self.auth is not None: + cmd += ' ' + self.auth.makecmdoptions() + return self._popen(cmd) + + def _popen(self, cmd): + return os.popen(cmd) + + def _encodedurl(self): + return self._escape(self.strpath) + + def _norev_delentry(self, path): + auth = self.auth and self.auth.makecmdoptions() or None + self._lsnorevcache.delentry((str(path), auth)) + + def open(self, mode='r'): + """ return an opened file with the given mode. """ + if mode not in ("r", "rU",): + raise ValueError("mode %r not supported" % (mode,)) + assert self.check(file=1) # svn cat returns an empty file otherwise + if self.rev is None: + return self._svnpopenauth('svn cat "%s"' % ( + self._escape(self.strpath), )) + else: + return self._svnpopenauth('svn cat -r %s "%s"' % ( + self.rev, self._escape(self.strpath))) + + def dirpath(self, *args, **kwargs): + """ return the directory path of the current path joined + with any given path arguments. + """ + l = self.strpath.split(self.sep) + if len(l) < 4: + raise py.error.EINVAL(self, "base is not valid") + elif len(l) == 4: + return self.join(*args, **kwargs) + else: + return self.new(basename='').join(*args, **kwargs) + + # modifying methods (cache must be invalidated) + def mkdir(self, *args, **kwargs): + """ create & return the directory joined with args. + pass a 'msg' keyword argument to set the commit message. + """ + commit_msg = kwargs.get('msg', "mkdir by py lib invocation") + createpath = self.join(*args) + createpath._svnwrite('mkdir', '-m', commit_msg) + self._norev_delentry(createpath.dirpath()) + return createpath + + def copy(self, target, msg='copied by py lib invocation'): + """ copy path to target with checkin message msg.""" + if getattr(target, 'rev', None) is not None: + raise py.error.EINVAL(target, "revisions are immutable") + self._svncmdexecauth('svn copy -m "%s" "%s" "%s"' %(msg, + self._escape(self), self._escape(target))) + self._norev_delentry(target.dirpath()) + + def rename(self, target, msg="renamed by py lib invocation"): + """ rename this path to target with checkin message msg. """ + if getattr(self, 'rev', None) is not None: + raise py.error.EINVAL(self, "revisions are immutable") + self._svncmdexecauth('svn move -m "%s" --force "%s" "%s"' %( + msg, self._escape(self), self._escape(target))) + self._norev_delentry(self.dirpath()) + self._norev_delentry(self) + + def remove(self, rec=1, msg='removed by py lib invocation'): + """ remove a file or directory (or a directory tree if rec=1) with +checkin message msg.""" + if self.rev is not None: + raise py.error.EINVAL(self, "revisions are immutable") + self._svncmdexecauth('svn rm -m "%s" "%s"' %(msg, self._escape(self))) + self._norev_delentry(self.dirpath()) + + def export(self, topath): + """ export to a local path + + topath should not exist prior to calling this, returns a + py.path.local instance + """ + topath = py.path.local(topath) + args = ['"%s"' % (self._escape(self),), + '"%s"' % (self._escape(topath),)] + if self.rev is not None: + args = ['-r', str(self.rev)] + args + self._svncmdexecauth('svn export %s' % (' '.join(args),)) + return topath + + def ensure(self, *args, **kwargs): + """ ensure that an args-joined path exists (by default as + a file). If you specify a keyword argument 'dir=True' + then the path is forced to be a directory path. + """ + if getattr(self, 'rev', None) is not None: + raise py.error.EINVAL(self, "revisions are immutable") + target = self.join(*args) + dir = kwargs.get('dir', 0) + for x in target.parts(reverse=True): + if x.check(): + break + else: + raise py.error.ENOENT(target, "has not any valid base!") + if x == target: + if not x.check(dir=dir): + raise dir and py.error.ENOTDIR(x) or py.error.EISDIR(x) + return x + tocreate = target.relto(x) + basename = tocreate.split(self.sep, 1)[0] + tempdir = py.path.local.mkdtemp() + try: + tempdir.ensure(tocreate, dir=dir) + cmd = 'svn import -m "%s" "%s" "%s"' % ( + "ensure %s" % self._escape(tocreate), + self._escape(tempdir.join(basename)), + x.join(basename)._encodedurl()) + self._svncmdexecauth(cmd) + self._norev_delentry(x) + finally: + tempdir.remove() + return target + + # end of modifying methods + def _propget(self, name): + res = self._svnwithrev('propget', name) + return res[:-1] # strip trailing newline + + def _proplist(self): + res = self._svnwithrev('proplist') + lines = res.split('\n') + lines = [x.strip() for x in lines[1:]] + return svncommon.PropListDict(self, lines) + + def info(self): + """ return an Info structure with svn-provided information. """ + parent = self.dirpath() + nameinfo_seq = parent._listdir_nameinfo() + bn = self.basename + for name, info in nameinfo_seq: + if name == bn: + return info + raise py.error.ENOENT(self) + + + def _listdir_nameinfo(self): + """ return sequence of name-info directory entries of self """ + def builder(): + try: + res = self._svnwithrev('ls', '-v') + except process.cmdexec.Error: + e = sys.exc_info()[1] + if e.err.find('non-existent in that revision') != -1: + raise py.error.ENOENT(self, e.err) + elif e.err.find("E200009:") != -1: + raise py.error.ENOENT(self, e.err) + elif e.err.find('File not found') != -1: + raise py.error.ENOENT(self, e.err) + elif e.err.find('not part of a repository')!=-1: + raise py.error.ENOENT(self, e.err) + elif e.err.find('Unable to open')!=-1: + raise py.error.ENOENT(self, e.err) + elif e.err.lower().find('method not allowed')!=-1: + raise py.error.EACCES(self, e.err) + raise py.error.Error(e.err) + lines = res.split('\n') + nameinfo_seq = [] + for lsline in lines: + if lsline: + info = InfoSvnCommand(lsline) + if info._name != '.': # svn 1.5 produces '.' dirs, + nameinfo_seq.append((info._name, info)) + nameinfo_seq.sort() + return nameinfo_seq + auth = self.auth and self.auth.makecmdoptions() or None + if self.rev is not None: + return self._lsrevcache.getorbuild((self.strpath, self.rev, auth), + builder) + else: + return self._lsnorevcache.getorbuild((self.strpath, auth), + builder) + + def listdir(self, fil=None, sort=None): + """ list directory contents, possibly filter by the given fil func + and possibly sorted. + """ + if isinstance(fil, str): + fil = common.FNMatcher(fil) + nameinfo_seq = self._listdir_nameinfo() + if len(nameinfo_seq) == 1: + name, info = nameinfo_seq[0] + if name == self.basename and info.kind == 'file': + #if not self.check(dir=1): + raise py.error.ENOTDIR(self) + paths = [self.join(name) for (name, info) in nameinfo_seq] + if fil: + paths = [x for x in paths if fil(x)] + self._sortlist(paths, sort) + return paths + + + def log(self, rev_start=None, rev_end=1, verbose=False): + """ return a list of LogEntry instances for this path. +rev_start is the starting revision (defaulting to the first one). +rev_end is the last revision (defaulting to HEAD). +if verbose is True, then the LogEntry instances also know which files changed. +""" + assert self.check() #make it simpler for the pipe + rev_start = rev_start is None and "HEAD" or rev_start + rev_end = rev_end is None and "HEAD" or rev_end + + if rev_start == "HEAD" and rev_end == 1: + rev_opt = "" + else: + rev_opt = "-r %s:%s" % (rev_start, rev_end) + verbose_opt = verbose and "-v" or "" + xmlpipe = self._svnpopenauth('svn log --xml %s %s "%s"' % + (rev_opt, verbose_opt, self.strpath)) + from xml.dom import minidom + tree = minidom.parse(xmlpipe) + result = [] + for logentry in filter(None, tree.firstChild.childNodes): + if logentry.nodeType == logentry.ELEMENT_NODE: + result.append(svncommon.LogEntry(logentry)) + return result + +#01234567890123456789012345678901234567890123467 +# 2256 hpk 165 Nov 24 17:55 __init__.py +# XXX spotted by Guido, SVN 1.3.0 has different aligning, breaks the code!!! +# 1312 johnny 1627 May 05 14:32 test_decorators.py +# +class InfoSvnCommand: + # the '0?' part in the middle is an indication of whether the resource is + # locked, see 'svn help ls' + lspattern = re.compile( + r'^ *(?P<rev>\d+) +(?P<author>.+?) +(0? *(?P<size>\d+))? ' + '*(?P<date>\w+ +\d{2} +[\d:]+) +(?P<file>.*)$') + def __init__(self, line): + # this is a typical line from 'svn ls http://...' + #_ 1127 jum 0 Jul 13 15:28 branch/ + match = self.lspattern.match(line) + data = match.groupdict() + self._name = data['file'] + if self._name[-1] == '/': + self._name = self._name[:-1] + self.kind = 'dir' + else: + self.kind = 'file' + #self.has_props = l.pop(0) == 'P' + self.created_rev = int(data['rev']) + self.last_author = data['author'] + self.size = data['size'] and int(data['size']) or 0 + self.mtime = parse_time_with_missing_year(data['date']) + self.time = self.mtime * 1000000 + + def __eq__(self, other): + return self.__dict__ == other.__dict__ + + +#____________________________________________________ +# +# helper functions +#____________________________________________________ +def parse_time_with_missing_year(timestr): + """ analyze the time part from a single line of "svn ls -v" + the svn output doesn't show the year makes the 'timestr' + ambigous. + """ + import calendar + t_now = time.gmtime() + + tparts = timestr.split() + month = time.strptime(tparts.pop(0), '%b')[1] + day = time.strptime(tparts.pop(0), '%d')[2] + last = tparts.pop(0) # year or hour:minute + try: + if ":" in last: + raise ValueError() + year = time.strptime(last, '%Y')[0] + hour = minute = 0 + except ValueError: + hour, minute = time.strptime(last, '%H:%M')[3:5] + year = t_now[0] + + t_result = (year, month, day, hour, minute, 0,0,0,0) + if t_result > t_now: + year -= 1 + t_result = (year, month, day, hour, minute, 0,0,0,0) + return calendar.timegm(t_result) + +class PathEntry: + def __init__(self, ppart): + self.strpath = ppart.firstChild.nodeValue.encode('UTF-8') + self.action = ppart.getAttribute('action').encode('UTF-8') + if self.action == 'A': + self.copyfrom_path = ppart.getAttribute('copyfrom-path').encode('UTF-8') + if self.copyfrom_path: + self.copyfrom_rev = int(ppart.getAttribute('copyfrom-rev')) + diff --git a/python/py/py/_path/svnwc.py b/python/py/py/_path/svnwc.py new file mode 100644 index 000000000..00d3b4bba --- /dev/null +++ b/python/py/py/_path/svnwc.py @@ -0,0 +1,1240 @@ +""" +svn-Command based Implementation of a Subversion WorkingCopy Path. + + SvnWCCommandPath is the main class. + +""" + +import os, sys, time, re, calendar +import py +import subprocess +from py._path import common + +#----------------------------------------------------------- +# Caching latest repository revision and repo-paths +# (getting them is slow with the current implementations) +# +# XXX make mt-safe +#----------------------------------------------------------- + +class cache: + proplist = {} + info = {} + entries = {} + prop = {} + +class RepoEntry: + def __init__(self, url, rev, timestamp): + self.url = url + self.rev = rev + self.timestamp = timestamp + + def __str__(self): + return "repo: %s;%s %s" %(self.url, self.rev, self.timestamp) + +class RepoCache: + """ The Repocache manages discovered repository paths + and their revisions. If inside a timeout the cache + will even return the revision of the root. + """ + timeout = 20 # seconds after which we forget that we know the last revision + + def __init__(self): + self.repos = [] + + def clear(self): + self.repos = [] + + def put(self, url, rev, timestamp=None): + if rev is None: + return + if timestamp is None: + timestamp = time.time() + + for entry in self.repos: + if url == entry.url: + entry.timestamp = timestamp + entry.rev = rev + #print "set repo", entry + break + else: + entry = RepoEntry(url, rev, timestamp) + self.repos.append(entry) + #print "appended repo", entry + + def get(self, url): + now = time.time() + for entry in self.repos: + if url.startswith(entry.url): + if now < entry.timestamp + self.timeout: + #print "returning immediate Etrny", entry + return entry.url, entry.rev + return entry.url, -1 + return url, -1 + +repositories = RepoCache() + + +# svn support code + +ALLOWED_CHARS = "_ -/\\=$.~+%" #add characters as necessary when tested +if sys.platform == "win32": + ALLOWED_CHARS += ":" +ALLOWED_CHARS_HOST = ALLOWED_CHARS + '@:' + +def _getsvnversion(ver=[]): + try: + return ver[0] + except IndexError: + v = py.process.cmdexec("svn -q --version") + v.strip() + v = '.'.join(v.split('.')[:2]) + ver.append(v) + return v + +def _escape_helper(text): + text = str(text) + if py.std.sys.platform != 'win32': + text = str(text).replace('$', '\\$') + return text + +def _check_for_bad_chars(text, allowed_chars=ALLOWED_CHARS): + for c in str(text): + if c.isalnum(): + continue + if c in allowed_chars: + continue + return True + return False + +def checkbadchars(url): + # (hpk) not quite sure about the exact purpose, guido w.? + proto, uri = url.split("://", 1) + if proto != "file": + host, uripath = uri.split('/', 1) + # only check for bad chars in the non-protocol parts + if (_check_for_bad_chars(host, ALLOWED_CHARS_HOST) \ + or _check_for_bad_chars(uripath, ALLOWED_CHARS)): + raise ValueError("bad char in %r" % (url, )) + + +#_______________________________________________________________ + +class SvnPathBase(common.PathBase): + """ Base implementation for SvnPath implementations. """ + sep = '/' + + def _geturl(self): + return self.strpath + url = property(_geturl, None, None, "url of this svn-path.") + + def __str__(self): + """ return a string representation (including rev-number) """ + return self.strpath + + def __hash__(self): + return hash(self.strpath) + + def new(self, **kw): + """ create a modified version of this path. A 'rev' argument + indicates a new revision. + the following keyword arguments modify various path parts:: + + http://host.com/repo/path/file.ext + |-----------------------| dirname + |------| basename + |--| purebasename + |--| ext + """ + obj = object.__new__(self.__class__) + obj.rev = kw.get('rev', self.rev) + obj.auth = kw.get('auth', self.auth) + dirname, basename, purebasename, ext = self._getbyspec( + "dirname,basename,purebasename,ext") + if 'basename' in kw: + if 'purebasename' in kw or 'ext' in kw: + raise ValueError("invalid specification %r" % kw) + else: + pb = kw.setdefault('purebasename', purebasename) + ext = kw.setdefault('ext', ext) + if ext and not ext.startswith('.'): + ext = '.' + ext + kw['basename'] = pb + ext + + kw.setdefault('dirname', dirname) + kw.setdefault('sep', self.sep) + if kw['basename']: + obj.strpath = "%(dirname)s%(sep)s%(basename)s" % kw + else: + obj.strpath = "%(dirname)s" % kw + return obj + + def _getbyspec(self, spec): + """ get specified parts of the path. 'arg' is a string + with comma separated path parts. The parts are returned + in exactly the order of the specification. + + you may specify the following parts: + + http://host.com/repo/path/file.ext + |-----------------------| dirname + |------| basename + |--| purebasename + |--| ext + """ + res = [] + parts = self.strpath.split(self.sep) + for name in spec.split(','): + name = name.strip() + if name == 'dirname': + res.append(self.sep.join(parts[:-1])) + elif name == 'basename': + res.append(parts[-1]) + else: + basename = parts[-1] + i = basename.rfind('.') + if i == -1: + purebasename, ext = basename, '' + else: + purebasename, ext = basename[:i], basename[i:] + if name == 'purebasename': + res.append(purebasename) + elif name == 'ext': + res.append(ext) + else: + raise NameError("Don't know part %r" % name) + return res + + def __eq__(self, other): + """ return true if path and rev attributes each match """ + return (str(self) == str(other) and + (self.rev == other.rev or self.rev == other.rev)) + + def __ne__(self, other): + return not self == other + + def join(self, *args): + """ return a new Path (with the same revision) which is composed + of the self Path followed by 'args' path components. + """ + if not args: + return self + + args = tuple([arg.strip(self.sep) for arg in args]) + parts = (self.strpath, ) + args + newpath = self.__class__(self.sep.join(parts), self.rev, self.auth) + return newpath + + def propget(self, name): + """ return the content of the given property. """ + value = self._propget(name) + return value + + def proplist(self): + """ list all property names. """ + content = self._proplist() + return content + + def size(self): + """ Return the size of the file content of the Path. """ + return self.info().size + + def mtime(self): + """ Return the last modification time of the file. """ + return self.info().mtime + + # shared help methods + + def _escape(self, cmd): + return _escape_helper(cmd) + + + #def _childmaxrev(self): + # """ return maximum revision number of childs (or self.rev if no childs) """ + # rev = self.rev + # for name, info in self._listdir_nameinfo(): + # rev = max(rev, info.created_rev) + # return rev + + #def _getlatestrevision(self): + # """ return latest repo-revision for this path. """ + # url = self.strpath + # path = self.__class__(url, None) + # + # # we need a long walk to find the root-repo and revision + # while 1: + # try: + # rev = max(rev, path._childmaxrev()) + # previous = path + # path = path.dirpath() + # except (IOError, process.cmdexec.Error): + # break + # if rev is None: + # raise IOError, "could not determine newest repo revision for %s" % self + # return rev + + class Checkers(common.Checkers): + def dir(self): + try: + return self.path.info().kind == 'dir' + except py.error.Error: + return self._listdirworks() + + def _listdirworks(self): + try: + self.path.listdir() + except py.error.ENOENT: + return False + else: + return True + + def file(self): + try: + return self.path.info().kind == 'file' + except py.error.ENOENT: + return False + + def exists(self): + try: + return self.path.info() + except py.error.ENOENT: + return self._listdirworks() + +def parse_apr_time(timestr): + i = timestr.rfind('.') + if i == -1: + raise ValueError("could not parse %s" % timestr) + timestr = timestr[:i] + parsedtime = time.strptime(timestr, "%Y-%m-%dT%H:%M:%S") + return time.mktime(parsedtime) + +class PropListDict(dict): + """ a Dictionary which fetches values (InfoSvnCommand instances) lazily""" + def __init__(self, path, keynames): + dict.__init__(self, [(x, None) for x in keynames]) + self.path = path + + def __getitem__(self, key): + value = dict.__getitem__(self, key) + if value is None: + value = self.path.propget(key) + dict.__setitem__(self, key, value) + return value + +def fixlocale(): + if sys.platform != 'win32': + return 'LC_ALL=C ' + return '' + +# some nasty chunk of code to solve path and url conversion and quoting issues +ILLEGAL_CHARS = '* | \ / : < > ? \t \n \x0b \x0c \r'.split(' ') +if os.sep in ILLEGAL_CHARS: + ILLEGAL_CHARS.remove(os.sep) +ISWINDOWS = sys.platform == 'win32' +_reg_allow_disk = re.compile(r'^([a-z]\:\\)?[^:]+$', re.I) +def _check_path(path): + illegal = ILLEGAL_CHARS[:] + sp = path.strpath + if ISWINDOWS: + illegal.remove(':') + if not _reg_allow_disk.match(sp): + raise ValueError('path may not contain a colon (:)') + for char in sp: + if char not in string.printable or char in illegal: + raise ValueError('illegal character %r in path' % (char,)) + +def path_to_fspath(path, addat=True): + _check_path(path) + sp = path.strpath + if addat and path.rev != -1: + sp = '%s@%s' % (sp, path.rev) + elif addat: + sp = '%s@HEAD' % (sp,) + return sp + +def url_from_path(path): + fspath = path_to_fspath(path, False) + quote = py.std.urllib.quote + if ISWINDOWS: + match = _reg_allow_disk.match(fspath) + fspath = fspath.replace('\\', '/') + if match.group(1): + fspath = '/%s%s' % (match.group(1).replace('\\', '/'), + quote(fspath[len(match.group(1)):])) + else: + fspath = quote(fspath) + else: + fspath = quote(fspath) + if path.rev != -1: + fspath = '%s@%s' % (fspath, path.rev) + else: + fspath = '%s@HEAD' % (fspath,) + return 'file://%s' % (fspath,) + +class SvnAuth(object): + """ container for auth information for Subversion """ + def __init__(self, username, password, cache_auth=True, interactive=True): + self.username = username + self.password = password + self.cache_auth = cache_auth + self.interactive = interactive + + def makecmdoptions(self): + uname = self.username.replace('"', '\\"') + passwd = self.password.replace('"', '\\"') + ret = [] + if uname: + ret.append('--username="%s"' % (uname,)) + if passwd: + ret.append('--password="%s"' % (passwd,)) + if not self.cache_auth: + ret.append('--no-auth-cache') + if not self.interactive: + ret.append('--non-interactive') + return ' '.join(ret) + + def __str__(self): + return "<SvnAuth username=%s ...>" %(self.username,) + +rex_blame = re.compile(r'\s*(\d+)\s*(\S+) (.*)') + +class SvnWCCommandPath(common.PathBase): + """ path implementation offering access/modification to svn working copies. + It has methods similar to the functions in os.path and similar to the + commands of the svn client. + """ + sep = os.sep + + def __new__(cls, wcpath=None, auth=None): + self = object.__new__(cls) + if isinstance(wcpath, cls): + if wcpath.__class__ == cls: + return wcpath + wcpath = wcpath.localpath + if _check_for_bad_chars(str(wcpath), + ALLOWED_CHARS): + raise ValueError("bad char in wcpath %s" % (wcpath, )) + self.localpath = py.path.local(wcpath) + self.auth = auth + return self + + strpath = property(lambda x: str(x.localpath), None, None, "string path") + rev = property(lambda x: x.info(usecache=0).rev, None, None, "revision") + + def __eq__(self, other): + return self.localpath == getattr(other, 'localpath', None) + + def _geturl(self): + if getattr(self, '_url', None) is None: + info = self.info() + self._url = info.url #SvnPath(info.url, info.rev) + assert isinstance(self._url, py.builtin._basestring) + return self._url + + url = property(_geturl, None, None, "url of this WC item") + + def _escape(self, cmd): + return _escape_helper(cmd) + + def dump(self, obj): + """ pickle object into path location""" + return self.localpath.dump(obj) + + def svnurl(self): + """ return current SvnPath for this WC-item. """ + info = self.info() + return py.path.svnurl(info.url) + + def __repr__(self): + return "svnwc(%r)" % (self.strpath) # , self._url) + + def __str__(self): + return str(self.localpath) + + def _makeauthoptions(self): + if self.auth is None: + return '' + return self.auth.makecmdoptions() + + def _authsvn(self, cmd, args=None): + args = args and list(args) or [] + args.append(self._makeauthoptions()) + return self._svn(cmd, *args) + + def _svn(self, cmd, *args): + l = ['svn %s' % cmd] + args = [self._escape(item) for item in args] + l.extend(args) + l.append('"%s"' % self._escape(self.strpath)) + # try fixing the locale because we can't otherwise parse + string = fixlocale() + " ".join(l) + try: + try: + key = 'LC_MESSAGES' + hold = os.environ.get(key) + os.environ[key] = 'C' + out = py.process.cmdexec(string) + finally: + if hold: + os.environ[key] = hold + else: + del os.environ[key] + except py.process.cmdexec.Error: + e = sys.exc_info()[1] + strerr = e.err.lower() + if strerr.find('not found') != -1: + raise py.error.ENOENT(self) + elif strerr.find("E200009:") != -1: + raise py.error.ENOENT(self) + if (strerr.find('file exists') != -1 or + strerr.find('file already exists') != -1 or + strerr.find('w150002:') != -1 or + strerr.find("can't create directory") != -1): + raise py.error.EEXIST(strerr) #self) + raise + return out + + def switch(self, url): + """ switch to given URL. """ + self._authsvn('switch', [url]) + + def checkout(self, url=None, rev=None): + """ checkout from url to local wcpath. """ + args = [] + if url is None: + url = self.url + if rev is None or rev == -1: + if (py.std.sys.platform != 'win32' and + _getsvnversion() == '1.3'): + url += "@HEAD" + else: + if _getsvnversion() == '1.3': + url += "@%d" % rev + else: + args.append('-r' + str(rev)) + args.append(url) + self._authsvn('co', args) + + def update(self, rev='HEAD', interactive=True): + """ update working copy item to given revision. (None -> HEAD). """ + opts = ['-r', rev] + if not interactive: + opts.append("--non-interactive") + self._authsvn('up', opts) + + def write(self, content, mode='w'): + """ write content into local filesystem wc. """ + self.localpath.write(content, mode) + + def dirpath(self, *args): + """ return the directory Path of the current Path. """ + return self.__class__(self.localpath.dirpath(*args), auth=self.auth) + + def _ensuredirs(self): + parent = self.dirpath() + if parent.check(dir=0): + parent._ensuredirs() + if self.check(dir=0): + self.mkdir() + return self + + def ensure(self, *args, **kwargs): + """ ensure that an args-joined path exists (by default as + a file). if you specify a keyword argument 'directory=True' + then the path is forced to be a directory path. + """ + p = self.join(*args) + if p.check(): + if p.check(versioned=False): + p.add() + return p + if kwargs.get('dir', 0): + return p._ensuredirs() + parent = p.dirpath() + parent._ensuredirs() + p.write("") + p.add() + return p + + def mkdir(self, *args): + """ create & return the directory joined with args. """ + if args: + return self.join(*args).mkdir() + else: + self._svn('mkdir') + return self + + def add(self): + """ add ourself to svn """ + self._svn('add') + + def remove(self, rec=1, force=1): + """ remove a file or a directory tree. 'rec'ursive is + ignored and considered always true (because of + underlying svn semantics. + """ + assert rec, "svn cannot remove non-recursively" + if not self.check(versioned=True): + # not added to svn (anymore?), just remove + py.path.local(self).remove() + return + flags = [] + if force: + flags.append('--force') + self._svn('remove', *flags) + + def copy(self, target): + """ copy path to target.""" + py.process.cmdexec("svn copy %s %s" %(str(self), str(target))) + + def rename(self, target): + """ rename this path to target. """ + py.process.cmdexec("svn move --force %s %s" %(str(self), str(target))) + + def lock(self): + """ set a lock (exclusive) on the resource """ + out = self._authsvn('lock').strip() + if not out: + # warning or error, raise exception + raise ValueError("unknown error in svn lock command") + + def unlock(self): + """ unset a previously set lock """ + out = self._authsvn('unlock').strip() + if out.startswith('svn:'): + # warning or error, raise exception + raise Exception(out[4:]) + + def cleanup(self): + """ remove any locks from the resource """ + # XXX should be fixed properly!!! + try: + self.unlock() + except: + pass + + def status(self, updates=0, rec=0, externals=0): + """ return (collective) Status object for this file. """ + # http://svnbook.red-bean.com/book.html#svn-ch-3-sect-4.3.1 + # 2201 2192 jum test + # XXX + if externals: + raise ValueError("XXX cannot perform status() " + "on external items yet") + else: + #1.2 supports: externals = '--ignore-externals' + externals = '' + if rec: + rec= '' + else: + rec = '--non-recursive' + + # XXX does not work on all subversion versions + #if not externals: + # externals = '--ignore-externals' + + if updates: + updates = '-u' + else: + updates = '' + + try: + cmd = 'status -v --xml --no-ignore %s %s %s' % ( + updates, rec, externals) + out = self._authsvn(cmd) + except py.process.cmdexec.Error: + cmd = 'status -v --no-ignore %s %s %s' % ( + updates, rec, externals) + out = self._authsvn(cmd) + rootstatus = WCStatus(self).fromstring(out, self) + else: + rootstatus = XMLWCStatus(self).fromstring(out, self) + return rootstatus + + def diff(self, rev=None): + """ return a diff of the current path against revision rev (defaulting + to the last one). + """ + args = [] + if rev is not None: + args.append("-r %d" % rev) + out = self._authsvn('diff', args) + return out + + def blame(self): + """ return a list of tuples of three elements: + (revision, commiter, line) + """ + out = self._svn('blame') + result = [] + blamelines = out.splitlines() + reallines = py.path.svnurl(self.url).readlines() + for i, (blameline, line) in enumerate( + zip(blamelines, reallines)): + m = rex_blame.match(blameline) + if not m: + raise ValueError("output line %r of svn blame does not match " + "expected format" % (line, )) + rev, name, _ = m.groups() + result.append((int(rev), name, line)) + return result + + _rex_commit = re.compile(r'.*Committed revision (\d+)\.$', re.DOTALL) + def commit(self, msg='', rec=1): + """ commit with support for non-recursive commits """ + # XXX i guess escaping should be done better here?!? + cmd = 'commit -m "%s" --force-log' % (msg.replace('"', '\\"'),) + if not rec: + cmd += ' -N' + out = self._authsvn(cmd) + try: + del cache.info[self] + except KeyError: + pass + if out: + m = self._rex_commit.match(out) + return int(m.group(1)) + + def propset(self, name, value, *args): + """ set property name to value on this path. """ + d = py.path.local.mkdtemp() + try: + p = d.join('value') + p.write(value) + self._svn('propset', name, '--file', str(p), *args) + finally: + d.remove() + + def propget(self, name): + """ get property name on this path. """ + res = self._svn('propget', name) + return res[:-1] # strip trailing newline + + def propdel(self, name): + """ delete property name on this path. """ + res = self._svn('propdel', name) + return res[:-1] # strip trailing newline + + def proplist(self, rec=0): + """ return a mapping of property names to property values. +If rec is True, then return a dictionary mapping sub-paths to such mappings. +""" + if rec: + res = self._svn('proplist -R') + return make_recursive_propdict(self, res) + else: + res = self._svn('proplist') + lines = res.split('\n') + lines = [x.strip() for x in lines[1:]] + return PropListDict(self, lines) + + def revert(self, rec=0): + """ revert the local changes of this path. if rec is True, do so +recursively. """ + if rec: + result = self._svn('revert -R') + else: + result = self._svn('revert') + return result + + def new(self, **kw): + """ create a modified version of this path. A 'rev' argument + indicates a new revision. + the following keyword arguments modify various path parts: + + http://host.com/repo/path/file.ext + |-----------------------| dirname + |------| basename + |--| purebasename + |--| ext + """ + if kw: + localpath = self.localpath.new(**kw) + else: + localpath = self.localpath + return self.__class__(localpath, auth=self.auth) + + def join(self, *args, **kwargs): + """ return a new Path (with the same revision) which is composed + of the self Path followed by 'args' path components. + """ + if not args: + return self + localpath = self.localpath.join(*args, **kwargs) + return self.__class__(localpath, auth=self.auth) + + def info(self, usecache=1): + """ return an Info structure with svn-provided information. """ + info = usecache and cache.info.get(self) + if not info: + try: + output = self._svn('info') + except py.process.cmdexec.Error: + e = sys.exc_info()[1] + if e.err.find('Path is not a working copy directory') != -1: + raise py.error.ENOENT(self, e.err) + elif e.err.find("is not under version control") != -1: + raise py.error.ENOENT(self, e.err) + raise + # XXX SVN 1.3 has output on stderr instead of stdout (while it does + # return 0!), so a bit nasty, but we assume no output is output + # to stderr... + if (output.strip() == '' or + output.lower().find('not a versioned resource') != -1): + raise py.error.ENOENT(self, output) + info = InfoSvnWCCommand(output) + + # Can't reliably compare on Windows without access to win32api + if py.std.sys.platform != 'win32': + if info.path != self.localpath: + raise py.error.ENOENT(self, "not a versioned resource:" + + " %s != %s" % (info.path, self.localpath)) + cache.info[self] = info + return info + + def listdir(self, fil=None, sort=None): + """ return a sequence of Paths. + + listdir will return either a tuple or a list of paths + depending on implementation choices. + """ + if isinstance(fil, str): + fil = common.FNMatcher(fil) + # XXX unify argument naming with LocalPath.listdir + def notsvn(path): + return path.basename != '.svn' + + paths = [] + for localpath in self.localpath.listdir(notsvn): + p = self.__class__(localpath, auth=self.auth) + if notsvn(p) and (not fil or fil(p)): + paths.append(p) + self._sortlist(paths, sort) + return paths + + def open(self, mode='r'): + """ return an opened file with the given mode. """ + return open(self.strpath, mode) + + def _getbyspec(self, spec): + return self.localpath._getbyspec(spec) + + class Checkers(py.path.local.Checkers): + def __init__(self, path): + self.svnwcpath = path + self.path = path.localpath + def versioned(self): + try: + s = self.svnwcpath.info() + except (py.error.ENOENT, py.error.EEXIST): + return False + except py.process.cmdexec.Error: + e = sys.exc_info()[1] + if e.err.find('is not a working copy')!=-1: + return False + if e.err.lower().find('not a versioned resource') != -1: + return False + raise + else: + return True + + def log(self, rev_start=None, rev_end=1, verbose=False): + """ return a list of LogEntry instances for this path. +rev_start is the starting revision (defaulting to the first one). +rev_end is the last revision (defaulting to HEAD). +if verbose is True, then the LogEntry instances also know which files changed. +""" + assert self.check() # make it simpler for the pipe + rev_start = rev_start is None and "HEAD" or rev_start + rev_end = rev_end is None and "HEAD" or rev_end + if rev_start == "HEAD" and rev_end == 1: + rev_opt = "" + else: + rev_opt = "-r %s:%s" % (rev_start, rev_end) + verbose_opt = verbose and "-v" or "" + locale_env = fixlocale() + # some blather on stderr + auth_opt = self._makeauthoptions() + #stdin, stdout, stderr = os.popen3(locale_env + + # 'svn log --xml %s %s %s "%s"' % ( + # rev_opt, verbose_opt, auth_opt, + # self.strpath)) + cmd = locale_env + 'svn log --xml %s %s %s "%s"' % ( + rev_opt, verbose_opt, auth_opt, self.strpath) + + popen = subprocess.Popen(cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=True, + ) + stdout, stderr = popen.communicate() + stdout = py.builtin._totext(stdout, sys.getdefaultencoding()) + minidom,ExpatError = importxml() + try: + tree = minidom.parseString(stdout) + except ExpatError: + raise ValueError('no such revision') + result = [] + for logentry in filter(None, tree.firstChild.childNodes): + if logentry.nodeType == logentry.ELEMENT_NODE: + result.append(LogEntry(logentry)) + return result + + def size(self): + """ Return the size of the file content of the Path. """ + return self.info().size + + def mtime(self): + """ Return the last modification time of the file. """ + return self.info().mtime + + def __hash__(self): + return hash((self.strpath, self.__class__, self.auth)) + + +class WCStatus: + attrnames = ('modified','added', 'conflict', 'unchanged', 'external', + 'deleted', 'prop_modified', 'unknown', 'update_available', + 'incomplete', 'kindmismatch', 'ignored', 'locked', 'replaced' + ) + + def __init__(self, wcpath, rev=None, modrev=None, author=None): + self.wcpath = wcpath + self.rev = rev + self.modrev = modrev + self.author = author + + for name in self.attrnames: + setattr(self, name, []) + + def allpath(self, sort=True, **kw): + d = {} + for name in self.attrnames: + if name not in kw or kw[name]: + for path in getattr(self, name): + d[path] = 1 + l = d.keys() + if sort: + l.sort() + return l + + # XXX a bit scary to assume there's always 2 spaces between username and + # path, however with win32 allowing spaces in user names there doesn't + # seem to be a more solid approach :( + _rex_status = re.compile(r'\s+(\d+|-)\s+(\S+)\s+(.+?)\s{2,}(.*)') + + def fromstring(data, rootwcpath, rev=None, modrev=None, author=None): + """ return a new WCStatus object from data 's' + """ + rootstatus = WCStatus(rootwcpath, rev, modrev, author) + update_rev = None + for line in data.split('\n'): + if not line.strip(): + continue + #print "processing %r" % line + flags, rest = line[:8], line[8:] + # first column + c0,c1,c2,c3,c4,c5,x6,c7 = flags + #if '*' in line: + # print "flags", repr(flags), "rest", repr(rest) + + if c0 in '?XI': + fn = line.split(None, 1)[1] + if c0 == '?': + wcpath = rootwcpath.join(fn, abs=1) + rootstatus.unknown.append(wcpath) + elif c0 == 'X': + wcpath = rootwcpath.__class__( + rootwcpath.localpath.join(fn, abs=1), + auth=rootwcpath.auth) + rootstatus.external.append(wcpath) + elif c0 == 'I': + wcpath = rootwcpath.join(fn, abs=1) + rootstatus.ignored.append(wcpath) + + continue + + #elif c0 in '~!' or c4 == 'S': + # raise NotImplementedError("received flag %r" % c0) + + m = WCStatus._rex_status.match(rest) + if not m: + if c7 == '*': + fn = rest.strip() + wcpath = rootwcpath.join(fn, abs=1) + rootstatus.update_available.append(wcpath) + continue + if line.lower().find('against revision:')!=-1: + update_rev = int(rest.split(':')[1].strip()) + continue + if line.lower().find('status on external') > -1: + # XXX not sure what to do here... perhaps we want to + # store some state instead of just continuing, as right + # now it makes the top-level external get added twice + # (once as external, once as 'normal' unchanged item) + # because of the way SVN presents external items + continue + # keep trying + raise ValueError("could not parse line %r" % line) + else: + rev, modrev, author, fn = m.groups() + wcpath = rootwcpath.join(fn, abs=1) + #assert wcpath.check() + if c0 == 'M': + assert wcpath.check(file=1), "didn't expect a directory with changed content here" + rootstatus.modified.append(wcpath) + elif c0 == 'A' or c3 == '+' : + rootstatus.added.append(wcpath) + elif c0 == 'D': + rootstatus.deleted.append(wcpath) + elif c0 == 'C': + rootstatus.conflict.append(wcpath) + elif c0 == '~': + rootstatus.kindmismatch.append(wcpath) + elif c0 == '!': + rootstatus.incomplete.append(wcpath) + elif c0 == 'R': + rootstatus.replaced.append(wcpath) + elif not c0.strip(): + rootstatus.unchanged.append(wcpath) + else: + raise NotImplementedError("received flag %r" % c0) + + if c1 == 'M': + rootstatus.prop_modified.append(wcpath) + # XXX do we cover all client versions here? + if c2 == 'L' or c5 == 'K': + rootstatus.locked.append(wcpath) + if c7 == '*': + rootstatus.update_available.append(wcpath) + + if wcpath == rootwcpath: + rootstatus.rev = rev + rootstatus.modrev = modrev + rootstatus.author = author + if update_rev: + rootstatus.update_rev = update_rev + continue + return rootstatus + fromstring = staticmethod(fromstring) + +class XMLWCStatus(WCStatus): + def fromstring(data, rootwcpath, rev=None, modrev=None, author=None): + """ parse 'data' (XML string as outputted by svn st) into a status obj + """ + # XXX for externals, the path is shown twice: once + # with external information, and once with full info as if + # the item was a normal non-external... the current way of + # dealing with this issue is by ignoring it - this does make + # externals appear as external items as well as 'normal', + # unchanged ones in the status object so this is far from ideal + rootstatus = WCStatus(rootwcpath, rev, modrev, author) + update_rev = None + minidom, ExpatError = importxml() + try: + doc = minidom.parseString(data) + except ExpatError: + e = sys.exc_info()[1] + raise ValueError(str(e)) + urevels = doc.getElementsByTagName('against') + if urevels: + rootstatus.update_rev = urevels[-1].getAttribute('revision') + for entryel in doc.getElementsByTagName('entry'): + path = entryel.getAttribute('path') + statusel = entryel.getElementsByTagName('wc-status')[0] + itemstatus = statusel.getAttribute('item') + + if itemstatus == 'unversioned': + wcpath = rootwcpath.join(path, abs=1) + rootstatus.unknown.append(wcpath) + continue + elif itemstatus == 'external': + wcpath = rootwcpath.__class__( + rootwcpath.localpath.join(path, abs=1), + auth=rootwcpath.auth) + rootstatus.external.append(wcpath) + continue + elif itemstatus == 'ignored': + wcpath = rootwcpath.join(path, abs=1) + rootstatus.ignored.append(wcpath) + continue + elif itemstatus == 'incomplete': + wcpath = rootwcpath.join(path, abs=1) + rootstatus.incomplete.append(wcpath) + continue + + rev = statusel.getAttribute('revision') + if itemstatus == 'added' or itemstatus == 'none': + rev = '0' + modrev = '?' + author = '?' + date = '' + elif itemstatus == "replaced": + pass + else: + #print entryel.toxml() + commitel = entryel.getElementsByTagName('commit')[0] + if commitel: + modrev = commitel.getAttribute('revision') + author = '' + author_els = commitel.getElementsByTagName('author') + if author_els: + for c in author_els[0].childNodes: + author += c.nodeValue + date = '' + for c in commitel.getElementsByTagName('date')[0]\ + .childNodes: + date += c.nodeValue + + wcpath = rootwcpath.join(path, abs=1) + + assert itemstatus != 'modified' or wcpath.check(file=1), ( + 'did\'t expect a directory with changed content here') + + itemattrname = { + 'normal': 'unchanged', + 'unversioned': 'unknown', + 'conflicted': 'conflict', + 'none': 'added', + }.get(itemstatus, itemstatus) + + attr = getattr(rootstatus, itemattrname) + attr.append(wcpath) + + propsstatus = statusel.getAttribute('props') + if propsstatus not in ('none', 'normal'): + rootstatus.prop_modified.append(wcpath) + + if wcpath == rootwcpath: + rootstatus.rev = rev + rootstatus.modrev = modrev + rootstatus.author = author + rootstatus.date = date + + # handle repos-status element (remote info) + rstatusels = entryel.getElementsByTagName('repos-status') + if rstatusels: + rstatusel = rstatusels[0] + ritemstatus = rstatusel.getAttribute('item') + if ritemstatus in ('added', 'modified'): + rootstatus.update_available.append(wcpath) + + lockels = entryel.getElementsByTagName('lock') + if len(lockels): + rootstatus.locked.append(wcpath) + + return rootstatus + fromstring = staticmethod(fromstring) + +class InfoSvnWCCommand: + def __init__(self, output): + # Path: test + # URL: http://codespeak.net/svn/std.path/trunk/dist/std.path/test + # Repository UUID: fd0d7bf2-dfb6-0310-8d31-b7ecfe96aada + # Revision: 2151 + # Node Kind: directory + # Schedule: normal + # Last Changed Author: hpk + # Last Changed Rev: 2100 + # Last Changed Date: 2003-10-27 20:43:14 +0100 (Mon, 27 Oct 2003) + # Properties Last Updated: 2003-11-03 14:47:48 +0100 (Mon, 03 Nov 2003) + + d = {} + for line in output.split('\n'): + if not line.strip(): + continue + key, value = line.split(':', 1) + key = key.lower().replace(' ', '') + value = value.strip() + d[key] = value + try: + self.url = d['url'] + except KeyError: + raise ValueError("Not a versioned resource") + #raise ValueError, "Not a versioned resource %r" % path + self.kind = d['nodekind'] == 'directory' and 'dir' or d['nodekind'] + try: + self.rev = int(d['revision']) + except KeyError: + self.rev = None + + self.path = py.path.local(d['path']) + self.size = self.path.size() + if 'lastchangedrev' in d: + self.created_rev = int(d['lastchangedrev']) + if 'lastchangedauthor' in d: + self.last_author = d['lastchangedauthor'] + if 'lastchangeddate' in d: + self.mtime = parse_wcinfotime(d['lastchangeddate']) + self.time = self.mtime * 1000000 + + def __eq__(self, other): + return self.__dict__ == other.__dict__ + +def parse_wcinfotime(timestr): + """ Returns seconds since epoch, UTC. """ + # example: 2003-10-27 20:43:14 +0100 (Mon, 27 Oct 2003) + m = re.match(r'(\d+-\d+-\d+ \d+:\d+:\d+) ([+-]\d+) .*', timestr) + if not m: + raise ValueError("timestring %r does not match" % timestr) + timestr, timezone = m.groups() + # do not handle timezone specially, return value should be UTC + parsedtime = time.strptime(timestr, "%Y-%m-%d %H:%M:%S") + return calendar.timegm(parsedtime) + +def make_recursive_propdict(wcroot, + output, + rex = re.compile("Properties on '(.*)':")): + """ Return a dictionary of path->PropListDict mappings. """ + lines = [x for x in output.split('\n') if x] + pdict = {} + while lines: + line = lines.pop(0) + m = rex.match(line) + if not m: + raise ValueError("could not parse propget-line: %r" % line) + path = m.groups()[0] + wcpath = wcroot.join(path, abs=1) + propnames = [] + while lines and lines[0].startswith(' '): + propname = lines.pop(0).strip() + propnames.append(propname) + assert propnames, "must have found properties!" + pdict[wcpath] = PropListDict(wcpath, propnames) + return pdict + + +def importxml(cache=[]): + if cache: + return cache + from xml.dom import minidom + from xml.parsers.expat import ExpatError + cache.extend([minidom, ExpatError]) + return cache + +class LogEntry: + def __init__(self, logentry): + self.rev = int(logentry.getAttribute('revision')) + for lpart in filter(None, logentry.childNodes): + if lpart.nodeType == lpart.ELEMENT_NODE: + if lpart.nodeName == 'author': + self.author = lpart.firstChild.nodeValue + elif lpart.nodeName == 'msg': + if lpart.firstChild: + self.msg = lpart.firstChild.nodeValue + else: + self.msg = '' + elif lpart.nodeName == 'date': + #2003-07-29T20:05:11.598637Z + timestr = lpart.firstChild.nodeValue + self.date = parse_apr_time(timestr) + elif lpart.nodeName == 'paths': + self.strpaths = [] + for ppart in filter(None, lpart.childNodes): + if ppart.nodeType == ppart.ELEMENT_NODE: + self.strpaths.append(PathEntry(ppart)) + def __repr__(self): + return '<Logentry rev=%d author=%s date=%s>' % ( + self.rev, self.author, self.date) + + diff --git a/python/py/py/_process/__init__.py b/python/py/py/_process/__init__.py new file mode 100644 index 000000000..86c714ad1 --- /dev/null +++ b/python/py/py/_process/__init__.py @@ -0,0 +1 @@ +""" high-level sub-process handling """ diff --git a/python/py/py/_process/cmdexec.py b/python/py/py/_process/cmdexec.py new file mode 100644 index 000000000..f83a24940 --- /dev/null +++ b/python/py/py/_process/cmdexec.py @@ -0,0 +1,49 @@ +import sys +import subprocess +import py +from subprocess import Popen, PIPE + +def cmdexec(cmd): + """ return unicode output of executing 'cmd' in a separate process. + + raise cmdexec.Error exeception if the command failed. + the exception will provide an 'err' attribute containing + the error-output from the command. + if the subprocess module does not provide a proper encoding/unicode strings + sys.getdefaultencoding() will be used, if that does not exist, 'UTF-8'. + """ + process = subprocess.Popen(cmd, shell=True, + universal_newlines=True, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + out, err = process.communicate() + if sys.version_info[0] < 3: # on py3 we get unicode strings, on py2 not + try: + default_encoding = sys.getdefaultencoding() # jython may not have it + except AttributeError: + default_encoding = sys.stdout.encoding or 'UTF-8' + out = unicode(out, process.stdout.encoding or default_encoding) + err = unicode(err, process.stderr.encoding or default_encoding) + status = process.poll() + if status: + raise ExecutionFailed(status, status, cmd, out, err) + return out + +class ExecutionFailed(py.error.Error): + def __init__(self, status, systemstatus, cmd, out, err): + Exception.__init__(self) + self.status = status + self.systemstatus = systemstatus + self.cmd = cmd + self.err = err + self.out = out + + def __str__(self): + return "ExecutionFailed: %d %s\n%s" %(self.status, self.cmd, self.err) + +# export the exception under the name 'py.process.cmdexec.Error' +cmdexec.Error = ExecutionFailed +try: + ExecutionFailed.__module__ = 'py.process.cmdexec' + ExecutionFailed.__name__ = 'Error' +except (AttributeError, TypeError): + pass diff --git a/python/py/py/_process/forkedfunc.py b/python/py/py/_process/forkedfunc.py new file mode 100644 index 000000000..1c2853068 --- /dev/null +++ b/python/py/py/_process/forkedfunc.py @@ -0,0 +1,120 @@ + +""" + ForkedFunc provides a way to run a function in a forked process + and get at its return value, stdout and stderr output as well + as signals and exitstatusus. +""" + +import py +import os +import sys +import marshal + + +def get_unbuffered_io(fd, filename): + f = open(str(filename), "w") + if fd != f.fileno(): + os.dup2(f.fileno(), fd) + class AutoFlush: + def write(self, data): + f.write(data) + f.flush() + def __getattr__(self, name): + return getattr(f, name) + return AutoFlush() + + +class ForkedFunc: + EXITSTATUS_EXCEPTION = 3 + + + def __init__(self, fun, args=None, kwargs=None, nice_level=0, + child_on_start=None, child_on_exit=None): + if args is None: + args = [] + if kwargs is None: + kwargs = {} + self.fun = fun + self.args = args + self.kwargs = kwargs + self.tempdir = tempdir = py.path.local.mkdtemp() + self.RETVAL = tempdir.ensure('retval') + self.STDOUT = tempdir.ensure('stdout') + self.STDERR = tempdir.ensure('stderr') + + pid = os.fork() + if pid: # in parent process + self.pid = pid + else: # in child process + self.pid = None + self._child(nice_level, child_on_start, child_on_exit) + + def _child(self, nice_level, child_on_start, child_on_exit): + # right now we need to call a function, but first we need to + # map all IO that might happen + sys.stdout = stdout = get_unbuffered_io(1, self.STDOUT) + sys.stderr = stderr = get_unbuffered_io(2, self.STDERR) + retvalf = self.RETVAL.open("wb") + EXITSTATUS = 0 + try: + if nice_level: + os.nice(nice_level) + try: + if child_on_start is not None: + child_on_start() + retval = self.fun(*self.args, **self.kwargs) + retvalf.write(marshal.dumps(retval)) + if child_on_exit is not None: + child_on_exit() + except: + excinfo = py.code.ExceptionInfo() + stderr.write(str(excinfo._getreprcrash())) + EXITSTATUS = self.EXITSTATUS_EXCEPTION + finally: + stdout.close() + stderr.close() + retvalf.close() + os.close(1) + os.close(2) + os._exit(EXITSTATUS) + + def waitfinish(self, waiter=os.waitpid): + pid, systemstatus = waiter(self.pid, 0) + if systemstatus: + if os.WIFSIGNALED(systemstatus): + exitstatus = os.WTERMSIG(systemstatus) + 128 + else: + exitstatus = os.WEXITSTATUS(systemstatus) + else: + exitstatus = 0 + signal = systemstatus & 0x7f + if not exitstatus and not signal: + retval = self.RETVAL.open('rb') + try: + retval_data = retval.read() + finally: + retval.close() + retval = marshal.loads(retval_data) + else: + retval = None + stdout = self.STDOUT.read() + stderr = self.STDERR.read() + self._removetemp() + return Result(exitstatus, signal, retval, stdout, stderr) + + def _removetemp(self): + if self.tempdir.check(): + self.tempdir.remove() + + def __del__(self): + if self.pid is not None: # only clean up in main process + self._removetemp() + + +class Result(object): + def __init__(self, exitstatus, signal, retval, stdout, stderr): + self.exitstatus = exitstatus + self.signal = signal + self.retval = retval + self.out = stdout + self.err = stderr diff --git a/python/py/py/_process/killproc.py b/python/py/py/_process/killproc.py new file mode 100644 index 000000000..18e8310b5 --- /dev/null +++ b/python/py/py/_process/killproc.py @@ -0,0 +1,23 @@ +import py +import os, sys + +if sys.platform == "win32" or getattr(os, '_name', '') == 'nt': + try: + import ctypes + except ImportError: + def dokill(pid): + py.process.cmdexec("taskkill /F /PID %d" %(pid,)) + else: + def dokill(pid): + PROCESS_TERMINATE = 1 + handle = ctypes.windll.kernel32.OpenProcess( + PROCESS_TERMINATE, False, pid) + ctypes.windll.kernel32.TerminateProcess(handle, -1) + ctypes.windll.kernel32.CloseHandle(handle) +else: + def dokill(pid): + os.kill(pid, 15) + +def kill(pid): + """ kill process by id. """ + dokill(pid) diff --git a/python/py/py/_std.py b/python/py/py/_std.py new file mode 100644 index 000000000..97a985332 --- /dev/null +++ b/python/py/py/_std.py @@ -0,0 +1,18 @@ +import sys + +class Std(object): + """ makes top-level python modules available as an attribute, + importing them on first access. + """ + + def __init__(self): + self.__dict__ = sys.modules + + def __getattr__(self, name): + try: + m = __import__(name) + except ImportError: + raise AttributeError("py.std: could not import %s" % name) + return m + +std = Std() diff --git a/python/py/py/_xmlgen.py b/python/py/py/_xmlgen.py new file mode 100644 index 000000000..2ffcaa14b --- /dev/null +++ b/python/py/py/_xmlgen.py @@ -0,0 +1,253 @@ +""" +module for generating and serializing xml and html structures +by using simple python objects. + +(c) holger krekel, holger at merlinux eu. 2009 +""" +import sys, re + +if sys.version_info >= (3,0): + def u(s): + return s + def unicode(x, errors=None): + if hasattr(x, '__unicode__'): + return x.__unicode__() + return str(x) +else: + def u(s): + return unicode(s) + unicode = unicode + + +class NamespaceMetaclass(type): + def __getattr__(self, name): + if name[:1] == '_': + raise AttributeError(name) + if self == Namespace: + raise ValueError("Namespace class is abstract") + tagspec = self.__tagspec__ + if tagspec is not None and name not in tagspec: + raise AttributeError(name) + classattr = {} + if self.__stickyname__: + classattr['xmlname'] = name + cls = type(name, (self.__tagclass__,), classattr) + setattr(self, name, cls) + return cls + +class Tag(list): + class Attr(object): + def __init__(self, **kwargs): + self.__dict__.update(kwargs) + + def __init__(self, *args, **kwargs): + super(Tag, self).__init__(args) + self.attr = self.Attr(**kwargs) + + def __unicode__(self): + return self.unicode(indent=0) + __str__ = __unicode__ + + def unicode(self, indent=2): + l = [] + SimpleUnicodeVisitor(l.append, indent).visit(self) + return u("").join(l) + + def __repr__(self): + name = self.__class__.__name__ + return "<%r tag object %d>" % (name, id(self)) + +Namespace = NamespaceMetaclass('Namespace', (object, ), { + '__tagspec__': None, + '__tagclass__': Tag, + '__stickyname__': False, +}) + +class HtmlTag(Tag): + def unicode(self, indent=2): + l = [] + HtmlVisitor(l.append, indent, shortempty=False).visit(self) + return u("").join(l) + +# exported plain html namespace +class html(Namespace): + __tagclass__ = HtmlTag + __stickyname__ = True + __tagspec__ = dict([(x,1) for x in ( + 'a,abbr,acronym,address,applet,area,b,bdo,big,blink,' + 'blockquote,body,br,button,caption,center,cite,code,col,' + 'colgroup,comment,dd,del,dfn,dir,div,dl,dt,em,embed,' + 'fieldset,font,form,frameset,h1,h2,h3,h4,h5,h6,head,html,' + 'i,iframe,img,input,ins,kbd,label,legend,li,link,listing,' + 'map,marquee,menu,meta,multicol,nobr,noembed,noframes,' + 'noscript,object,ol,optgroup,option,p,pre,q,s,script,' + 'select,small,span,strike,strong,style,sub,sup,table,' + 'tbody,td,textarea,tfoot,th,thead,title,tr,tt,u,ul,xmp,' + 'base,basefont,frame,hr,isindex,param,samp,var' + ).split(',') if x]) + + class Style(object): + def __init__(self, **kw): + for x, y in kw.items(): + x = x.replace('_', '-') + setattr(self, x, y) + + +class raw(object): + """just a box that can contain a unicode string that will be + included directly in the output""" + def __init__(self, uniobj): + self.uniobj = uniobj + +class SimpleUnicodeVisitor(object): + """ recursive visitor to write unicode. """ + def __init__(self, write, indent=0, curindent=0, shortempty=True): + self.write = write + self.cache = {} + self.visited = {} # for detection of recursion + self.indent = indent + self.curindent = curindent + self.parents = [] + self.shortempty = shortempty # short empty tags or not + + def visit(self, node): + """ dispatcher on node's class/bases name. """ + cls = node.__class__ + try: + visitmethod = self.cache[cls] + except KeyError: + for subclass in cls.__mro__: + visitmethod = getattr(self, subclass.__name__, None) + if visitmethod is not None: + break + else: + visitmethod = self.__object + self.cache[cls] = visitmethod + visitmethod(node) + + # the default fallback handler is marked private + # to avoid clashes with the tag name object + def __object(self, obj): + #self.write(obj) + self.write(escape(unicode(obj))) + + def raw(self, obj): + self.write(obj.uniobj) + + def list(self, obj): + assert id(obj) not in self.visited + self.visited[id(obj)] = 1 + for elem in obj: + self.visit(elem) + + def Tag(self, tag): + assert id(tag) not in self.visited + try: + tag.parent = self.parents[-1] + except IndexError: + tag.parent = None + self.visited[id(tag)] = 1 + tagname = getattr(tag, 'xmlname', tag.__class__.__name__) + if self.curindent and not self._isinline(tagname): + self.write("\n" + u(' ') * self.curindent) + if tag: + self.curindent += self.indent + self.write(u('<%s%s>') % (tagname, self.attributes(tag))) + self.parents.append(tag) + for x in tag: + self.visit(x) + self.parents.pop() + self.write(u('</%s>') % tagname) + self.curindent -= self.indent + else: + nameattr = tagname+self.attributes(tag) + if self._issingleton(tagname): + self.write(u('<%s/>') % (nameattr,)) + else: + self.write(u('<%s></%s>') % (nameattr, tagname)) + + def attributes(self, tag): + # serialize attributes + attrlist = dir(tag.attr) + attrlist.sort() + l = [] + for name in attrlist: + res = self.repr_attribute(tag.attr, name) + if res is not None: + l.append(res) + l.extend(self.getstyle(tag)) + return u("").join(l) + + def repr_attribute(self, attrs, name): + if name[:2] != '__': + value = getattr(attrs, name) + if name.endswith('_'): + name = name[:-1] + if isinstance(value, raw): + insert = value.uniobj + else: + insert = escape(unicode(value)) + return ' %s="%s"' % (name, insert) + + def getstyle(self, tag): + """ return attribute list suitable for styling. """ + try: + styledict = tag.style.__dict__ + except AttributeError: + return [] + else: + stylelist = [x+': ' + y for x,y in styledict.items()] + return [u(' style="%s"') % u('; ').join(stylelist)] + + def _issingleton(self, tagname): + """can (and will) be overridden in subclasses""" + return self.shortempty + + def _isinline(self, tagname): + """can (and will) be overridden in subclasses""" + return False + +class HtmlVisitor(SimpleUnicodeVisitor): + + single = dict([(x, 1) for x in + ('br,img,area,param,col,hr,meta,link,base,' + 'input,frame').split(',')]) + inline = dict([(x, 1) for x in + ('a abbr acronym b basefont bdo big br cite code dfn em font ' + 'i img input kbd label q s samp select small span strike ' + 'strong sub sup textarea tt u var'.split(' '))]) + + def repr_attribute(self, attrs, name): + if name == 'class_': + value = getattr(attrs, name) + if value is None: + return + return super(HtmlVisitor, self).repr_attribute(attrs, name) + + def _issingleton(self, tagname): + return tagname in self.single + + def _isinline(self, tagname): + return tagname in self.inline + + +class _escape: + def __init__(self): + self.escape = { + u('"') : u('"'), u('<') : u('<'), u('>') : u('>'), + u('&') : u('&'), u("'") : u('''), + } + self.charef_rex = re.compile(u("|").join(self.escape.keys())) + + def _replacer(self, match): + return self.escape[match.group(0)] + + def __call__(self, ustring): + """ xml-escape the given unicode string. """ + try: + ustring = unicode(ustring) + except UnicodeDecodeError: + ustring = unicode(ustring, 'utf-8', errors='replace') + return self.charef_rex.sub(self._replacer, ustring) + +escape = _escape() diff --git a/python/py/py/test.py b/python/py/py/test.py new file mode 100644 index 000000000..aa5beb178 --- /dev/null +++ b/python/py/py/test.py @@ -0,0 +1,10 @@ +import sys +if __name__ == '__main__': + import pytest + sys.exit(pytest.main()) +else: + import sys, pytest + sys.modules['py.test'] = pytest + +# for more API entry points see the 'tests' definition +# in __init__.py diff --git a/python/py/setup.cfg b/python/py/setup.cfg new file mode 100644 index 000000000..be0b2a5c8 --- /dev/null +++ b/python/py/setup.cfg @@ -0,0 +1,11 @@ +[wheel] +universal = 1 + +[devpi:upload] +formats = sdist.tgz,bdist_wheel + +[egg_info] +tag_build = +tag_date = 0 +tag_svn_revision = 0 + diff --git a/python/py/setup.py b/python/py/setup.py new file mode 100644 index 000000000..06f0885cd --- /dev/null +++ b/python/py/setup.py @@ -0,0 +1,38 @@ +import os, sys + +from setuptools import setup + +def main(): + setup( + name='py', + description='library with cross-python path, ini-parsing, io, code, log facilities', + long_description = open('README.txt').read(), + version='1.4.31', + url='http://pylib.readthedocs.org/', + license='MIT license', + platforms=['unix', 'linux', 'osx', 'cygwin', 'win32'], + author='holger krekel, Ronny Pfannschmidt, Benjamin Peterson and others', + author_email='pytest-dev@python.org', + classifiers=['Development Status :: 6 - Mature', + 'Intended Audience :: Developers', + 'License :: OSI Approved :: MIT License', + 'Operating System :: POSIX', + 'Operating System :: Microsoft :: Windows', + 'Operating System :: MacOS :: MacOS X', + 'Topic :: Software Development :: Testing', + 'Topic :: Software Development :: Libraries', + 'Topic :: Utilities', + 'Programming Language :: Python', + 'Programming Language :: Python :: 3'], + packages=['py', + 'py._code', + 'py._io', + 'py._log', + 'py._path', + 'py._process', + ], + zip_safe=False, + ) + +if __name__ == '__main__': + main() |