summaryrefslogtreecommitdiffstats
path: root/testing/mozbase/mozfile
diff options
context:
space:
mode:
Diffstat (limited to 'testing/mozbase/mozfile')
-rw-r--r--testing/mozbase/mozfile/mozfile/__init__.py8
-rw-r--r--testing/mozbase/mozfile/mozfile/mozfile.py449
-rw-r--r--testing/mozbase/mozfile/setup.py25
-rw-r--r--testing/mozbase/mozfile/tests/files/missing_file_attributes.zipbin0 -> 442 bytes
-rw-r--r--testing/mozbase/mozfile/tests/manifest.ini6
-rw-r--r--testing/mozbase/mozfile/tests/stubs.py37
-rw-r--r--testing/mozbase/mozfile/tests/test_extract.py154
-rwxr-xr-xtesting/mozbase/mozfile/tests/test_load.py62
-rw-r--r--testing/mozbase/mozfile/tests/test_move_remove.py232
-rw-r--r--testing/mozbase/mozfile/tests/test_tempdir.py42
-rw-r--r--testing/mozbase/mozfile/tests/test_tempfile.py102
-rwxr-xr-xtesting/mozbase/mozfile/tests/test_url.py21
12 files changed, 1138 insertions, 0 deletions
diff --git a/testing/mozbase/mozfile/mozfile/__init__.py b/testing/mozbase/mozfile/mozfile/__init__.py
new file mode 100644
index 000000000..a527f0ad6
--- /dev/null
+++ b/testing/mozbase/mozfile/mozfile/__init__.py
@@ -0,0 +1,8 @@
+# flake8: noqa
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+from __future__ import absolute_import
+
+from .mozfile import *
diff --git a/testing/mozbase/mozfile/mozfile/mozfile.py b/testing/mozbase/mozfile/mozfile/mozfile.py
new file mode 100644
index 000000000..94805594e
--- /dev/null
+++ b/testing/mozbase/mozfile/mozfile/mozfile.py
@@ -0,0 +1,449 @@
+# -*- coding: utf-8 -*-
+
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+# We don't import all modules at the top for performance reasons. See Bug 1008943
+
+from __future__ import absolute_import
+
+from contextlib import contextmanager
+import errno
+import os
+import stat
+import time
+import warnings
+
+__all__ = ['extract_tarball',
+ 'extract_zip',
+ 'extract',
+ 'is_url',
+ 'load',
+ 'move',
+ 'remove',
+ 'rmtree',
+ 'tree',
+ 'NamedTemporaryFile',
+ 'TemporaryDirectory']
+
+# utilities for extracting archives
+
+
+def extract_tarball(src, dest):
+ """extract a .tar file"""
+
+ import tarfile
+
+ bundle = tarfile.open(src)
+ namelist = bundle.getnames()
+
+ for name in namelist:
+ bundle.extract(name, path=dest)
+ bundle.close()
+ return namelist
+
+
+def extract_zip(src, dest):
+ """extract a zip file"""
+
+ import zipfile
+
+ if isinstance(src, zipfile.ZipFile):
+ bundle = src
+ else:
+ try:
+ bundle = zipfile.ZipFile(src)
+ except Exception:
+ print "src: %s" % src
+ raise
+
+ namelist = bundle.namelist()
+
+ for name in namelist:
+ filename = os.path.realpath(os.path.join(dest, name))
+ if name.endswith('/'):
+ if not os.path.isdir(filename):
+ os.makedirs(filename)
+ else:
+ path = os.path.dirname(filename)
+ if not os.path.isdir(path):
+ os.makedirs(path)
+ _dest = open(filename, 'wb')
+ _dest.write(bundle.read(name))
+ _dest.close()
+ mode = bundle.getinfo(name).external_attr >> 16 & 0x1FF
+ # Only update permissions if attributes are set. Otherwise fallback to the defaults.
+ if mode:
+ os.chmod(filename, mode)
+ bundle.close()
+ return namelist
+
+
+def extract(src, dest=None):
+ """
+ Takes in a tar or zip file and extracts it to dest
+
+ If dest is not specified, extracts to os.path.dirname(src)
+
+ Returns the list of top level files that were extracted
+ """
+
+ import zipfile
+ import tarfile
+
+ assert os.path.exists(src), "'%s' does not exist" % src
+
+ if dest is None:
+ dest = os.path.dirname(src)
+ elif not os.path.isdir(dest):
+ os.makedirs(dest)
+ assert not os.path.isfile(dest), "dest cannot be a file"
+
+ if zipfile.is_zipfile(src):
+ namelist = extract_zip(src, dest)
+ elif tarfile.is_tarfile(src):
+ namelist = extract_tarball(src, dest)
+ else:
+ raise Exception("mozfile.extract: no archive format found for '%s'" %
+ src)
+
+ # namelist returns paths with forward slashes even in windows
+ top_level_files = [os.path.join(dest, name.rstrip('/')) for name in namelist
+ if len(name.rstrip('/').split('/')) == 1]
+
+ # namelist doesn't include folders, append these to the list
+ for name in namelist:
+ index = name.find('/')
+ if index != -1:
+ root = os.path.join(dest, name[:index])
+ if root not in top_level_files:
+ top_level_files.append(root)
+
+ return top_level_files
+
+
+# utilities for removal of files and directories
+
+def rmtree(dir):
+ """Deprecated wrapper method to remove a directory tree.
+
+ Ensure to update your code to use mozfile.remove() directly
+
+ :param dir: directory to be removed
+ """
+
+ warnings.warn("mozfile.rmtree() is deprecated in favor of mozfile.remove()",
+ PendingDeprecationWarning, stacklevel=2)
+ return remove(dir)
+
+
+def _call_windows_retry(func, args=(), retry_max=5, retry_delay=0.5):
+ """
+ It's possible to see spurious errors on Windows due to various things
+ keeping a handle to the directory open (explorer, virus scanners, etc)
+ So we try a few times if it fails with a known error.
+ retry_delay is multiplied by the number of failed attempts to increase
+ the likelihood of success in subsequent attempts.
+ """
+ retry_count = 0
+ while True:
+ try:
+ func(*args)
+ except OSError as e:
+ # Error codes are defined in:
+ # http://docs.python.org/2/library/errno.html#module-errno
+ if e.errno not in (errno.EACCES, errno.ENOTEMPTY):
+ raise
+
+ if retry_count == retry_max:
+ raise
+
+ retry_count += 1
+
+ print '%s() failed for "%s". Reason: %s (%s). Retrying...' % \
+ (func.__name__, args, e.strerror, e.errno)
+ time.sleep(retry_count * retry_delay)
+ else:
+ # If no exception has been thrown it should be done
+ break
+
+
+def remove(path):
+ """Removes the specified file, link, or directory tree.
+
+ This is a replacement for shutil.rmtree that works better under
+ windows. It does the following things:
+
+ - check path access for the current user before trying to remove
+ - retry operations on some known errors due to various things keeping
+ a handle on file paths - like explorer, virus scanners, etc. The
+ known errors are errno.EACCES and errno.ENOTEMPTY, and it will
+ retry up to 5 five times with a delay of (failed_attempts * 0.5) seconds
+ between each attempt.
+
+ Note that no error will be raised if the given path does not exists.
+
+ :param path: path to be removed
+ """
+
+ import shutil
+
+ def _call_with_windows_retry(*args, **kwargs):
+ try:
+ _call_windows_retry(*args, **kwargs)
+ except OSError as e:
+ # The file or directory to be removed doesn't exist anymore
+ if e.errno != errno.ENOENT:
+ raise
+
+ def _update_permissions(path):
+ """Sets specified pemissions depending on filetype"""
+ if os.path.islink(path):
+ # Path is a symlink which we don't have to modify
+ # because it should already have all the needed permissions
+ return
+
+ stats = os.stat(path)
+
+ if os.path.isfile(path):
+ mode = stats.st_mode | stat.S_IWUSR
+ elif os.path.isdir(path):
+ mode = stats.st_mode | stat.S_IWUSR | stat.S_IXUSR
+ else:
+ # Not supported type
+ return
+
+ _call_with_windows_retry(os.chmod, (path, mode))
+
+ if not os.path.exists(path):
+ return
+
+ if os.path.isfile(path) or os.path.islink(path):
+ # Verify the file or link is read/write for the current user
+ _update_permissions(path)
+ _call_with_windows_retry(os.remove, (path,))
+
+ elif os.path.isdir(path):
+ # Verify the directory is read/write/execute for the current user
+ _update_permissions(path)
+
+ # We're ensuring that every nested item has writable permission.
+ for root, dirs, files in os.walk(path):
+ for entry in dirs + files:
+ _update_permissions(os.path.join(root, entry))
+ _call_with_windows_retry(shutil.rmtree, (path,))
+
+
+def move(src, dst):
+ """
+ Move a file or directory path.
+
+ This is a replacement for shutil.move that works better under windows,
+ retrying operations on some known errors due to various things keeping
+ a handle on file paths.
+ """
+ import shutil
+ _call_windows_retry(shutil.move, (src, dst))
+
+
+def depth(directory):
+ """returns the integer depth of a directory or path relative to '/' """
+
+ directory = os.path.abspath(directory)
+ level = 0
+ while True:
+ directory, remainder = os.path.split(directory)
+ level += 1
+ if not remainder:
+ break
+ return level
+
+
+# ASCII delimeters
+ascii_delimeters = {
+ 'vertical_line': '|',
+ 'item_marker': '+',
+ 'last_child': '\\'
+}
+
+# unicode delimiters
+unicode_delimeters = {
+ 'vertical_line': '│',
+ 'item_marker': '├',
+ 'last_child': '└'
+}
+
+
+def tree(directory,
+ item_marker=unicode_delimeters['item_marker'],
+ vertical_line=unicode_delimeters['vertical_line'],
+ last_child=unicode_delimeters['last_child'],
+ sort_key=lambda x: x.lower()):
+ """
+ display tree directory structure for `directory`
+ """
+
+ retval = []
+ indent = []
+ last = {}
+ top = depth(directory)
+
+ for dirpath, dirnames, filenames in os.walk(directory, topdown=True):
+
+ abspath = os.path.abspath(dirpath)
+ basename = os.path.basename(abspath)
+ parent = os.path.dirname(abspath)
+ level = depth(abspath) - top
+
+ # sort articles of interest
+ for resource in (dirnames, filenames):
+ resource[:] = sorted(resource, key=sort_key)
+
+ if level > len(indent):
+ indent.append(vertical_line)
+ indent = indent[:level]
+
+ if dirnames:
+ files_end = item_marker
+ last[abspath] = dirnames[-1]
+ else:
+ files_end = last_child
+
+ if last.get(parent) == os.path.basename(abspath):
+ # last directory of parent
+ dirpath_mark = last_child
+ indent[-1] = ' '
+ elif not indent:
+ dirpath_mark = ''
+ else:
+ dirpath_mark = item_marker
+
+ # append the directory and piece of tree structure
+ # if the top-level entry directory, print as passed
+ retval.append('%s%s%s' % (''.join(indent[:-1]),
+ dirpath_mark,
+ basename if retval else directory))
+ # add the files
+ if filenames:
+ last_file = filenames[-1]
+ retval.extend([('%s%s%s' % (''.join(indent),
+ files_end if filename == last_file else item_marker,
+ filename))
+ for index, filename in enumerate(filenames)])
+
+ return '\n'.join(retval)
+
+
+# utilities for temporary resources
+
+class NamedTemporaryFile(object):
+ """
+ Like tempfile.NamedTemporaryFile except it works on Windows
+ in the case where you open the created file a second time.
+
+ This behaves very similarly to tempfile.NamedTemporaryFile but may
+ not behave exactly the same. For example, this function does not
+ prevent fd inheritance by children.
+
+ Example usage:
+
+ with NamedTemporaryFile() as fh:
+ fh.write(b'foobar')
+
+ print('Filename: %s' % fh.name)
+
+ see https://bugzilla.mozilla.org/show_bug.cgi?id=821362
+ """
+
+ def __init__(self, mode='w+b', bufsize=-1, suffix='', prefix='tmp',
+ dir=None, delete=True):
+
+ import tempfile
+ fd, path = tempfile.mkstemp(suffix, prefix, dir, 't' in mode)
+ os.close(fd)
+
+ self.file = open(path, mode)
+ self._path = path
+ self._delete = delete
+ self._unlinked = False
+
+ def __getattr__(self, k):
+ return getattr(self.__dict__['file'], k)
+
+ def __iter__(self):
+ return self.__dict__['file']
+
+ def __enter__(self):
+ self.file.__enter__()
+ return self
+
+ def __exit__(self, exc, value, tb):
+ self.file.__exit__(exc, value, tb)
+ if self.__dict__['_delete']:
+ os.unlink(self.__dict__['_path'])
+ self._unlinked = True
+
+ def __del__(self):
+ if self.__dict__['_unlinked']:
+ return
+ self.file.__exit__(None, None, None)
+ if self.__dict__['_delete']:
+ os.unlink(self.__dict__['_path'])
+
+
+@contextmanager
+def TemporaryDirectory():
+ """
+ create a temporary directory using tempfile.mkdtemp, and then clean it up.
+
+ Example usage:
+ with TemporaryDirectory() as tmp:
+ open(os.path.join(tmp, "a_temp_file"), "w").write("data")
+
+ """
+
+ import tempfile
+ import shutil
+
+ tempdir = tempfile.mkdtemp()
+ try:
+ yield tempdir
+ finally:
+ shutil.rmtree(tempdir)
+
+
+# utilities dealing with URLs
+
+def is_url(thing):
+ """
+ Return True if thing looks like a URL.
+ """
+
+ import urlparse
+
+ parsed = urlparse.urlparse(thing)
+ if 'scheme' in parsed:
+ return len(parsed.scheme) >= 2
+ else:
+ return len(parsed[0]) >= 2
+
+
+def load(resource):
+ """
+ open a file or URL for reading. If the passed resource string is not a URL,
+ or begins with 'file://', return a ``file``. Otherwise, return the
+ result of urllib2.urlopen()
+ """
+
+ import urllib2
+
+ # handle file URLs separately due to python stdlib limitations
+ if resource.startswith('file://'):
+ resource = resource[len('file://'):]
+
+ if not is_url(resource):
+ # if no scheme is given, it is a file path
+ return file(resource)
+
+ return urllib2.urlopen(resource)
diff --git a/testing/mozbase/mozfile/setup.py b/testing/mozbase/mozfile/setup.py
new file mode 100644
index 000000000..277ff7b52
--- /dev/null
+++ b/testing/mozbase/mozfile/setup.py
@@ -0,0 +1,25 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this file,
+# You can obtain one at http://mozilla.org/MPL/2.0/.
+
+from setuptools import setup
+
+PACKAGE_NAME = 'mozfile'
+PACKAGE_VERSION = '1.2'
+
+setup(name=PACKAGE_NAME,
+ version=PACKAGE_VERSION,
+ description="Library of file utilities for use in Mozilla testing",
+ long_description="see http://mozbase.readthedocs.org/",
+ classifiers=[], # Get strings from http://pypi.python.org/pypi?%3Aaction=list_classifiers
+ keywords='mozilla',
+ author='Mozilla Automation and Tools team',
+ author_email='tools@lists.mozilla.org',
+ url='https://wiki.mozilla.org/Auto-tools/Projects/Mozbase',
+ license='MPL',
+ packages=['mozfile'],
+ include_package_data=True,
+ zip_safe=False,
+ install_requires=[],
+ tests_require=['mozhttpd']
+ )
diff --git a/testing/mozbase/mozfile/tests/files/missing_file_attributes.zip b/testing/mozbase/mozfile/tests/files/missing_file_attributes.zip
new file mode 100644
index 000000000..2b5409e89
--- /dev/null
+++ b/testing/mozbase/mozfile/tests/files/missing_file_attributes.zip
Binary files differ
diff --git a/testing/mozbase/mozfile/tests/manifest.ini b/testing/mozbase/mozfile/tests/manifest.ini
new file mode 100644
index 000000000..c7889beca
--- /dev/null
+++ b/testing/mozbase/mozfile/tests/manifest.ini
@@ -0,0 +1,6 @@
+[test_extract.py]
+[test_load.py]
+[test_move_remove.py]
+[test_tempdir.py]
+[test_tempfile.py]
+[test_url.py]
diff --git a/testing/mozbase/mozfile/tests/stubs.py b/testing/mozbase/mozfile/tests/stubs.py
new file mode 100644
index 000000000..06d79e7af
--- /dev/null
+++ b/testing/mozbase/mozfile/tests/stubs.py
@@ -0,0 +1,37 @@
+import os
+import shutil
+import tempfile
+
+
+# stub file paths
+files = [('foo.txt',),
+ ('foo', 'bar.txt',),
+ ('foo', 'bar', 'fleem.txt',),
+ ('foobar', 'fleem.txt',),
+ ('bar.txt',),
+ ('nested_tree', 'bar', 'fleem.txt',),
+ ('readonly.txt',),
+ ]
+
+
+def create_stub():
+ """create a stub directory"""
+
+ tempdir = tempfile.mkdtemp()
+ try:
+ for path in files:
+ fullpath = os.path.join(tempdir, *path)
+ dirname = os.path.dirname(fullpath)
+ if not os.path.exists(dirname):
+ os.makedirs(dirname)
+ contents = path[-1]
+ f = file(fullpath, 'w')
+ f.write(contents)
+ f.close()
+ return tempdir
+ except Exception:
+ try:
+ shutil.rmtree(tempdir)
+ except:
+ pass
+ raise
diff --git a/testing/mozbase/mozfile/tests/test_extract.py b/testing/mozbase/mozfile/tests/test_extract.py
new file mode 100644
index 000000000..e91f52349
--- /dev/null
+++ b/testing/mozbase/mozfile/tests/test_extract.py
@@ -0,0 +1,154 @@
+#!/usr/bin/env python
+
+import os
+import shutil
+import tarfile
+import tempfile
+import unittest
+import zipfile
+
+import mozfile
+
+import stubs
+
+
+class TestExtract(unittest.TestCase):
+ """test extracting archives"""
+
+ def ensure_directory_contents(self, directory):
+ """ensure the directory contents match"""
+ for f in stubs.files:
+ path = os.path.join(directory, *f)
+ exists = os.path.exists(path)
+ if not exists:
+ print "%s does not exist" % (os.path.join(f))
+ self.assertTrue(exists)
+ if exists:
+ contents = file(path).read().strip()
+ self.assertTrue(contents == f[-1])
+
+ def test_extract_zipfile(self):
+ """test extracting a zipfile"""
+ _zipfile = self.create_zip()
+ self.assertTrue(os.path.exists(_zipfile))
+ try:
+ dest = tempfile.mkdtemp()
+ try:
+ mozfile.extract_zip(_zipfile, dest)
+ self.ensure_directory_contents(dest)
+ finally:
+ shutil.rmtree(dest)
+ finally:
+ os.remove(_zipfile)
+
+ def test_extract_zipfile_missing_file_attributes(self):
+ """if files do not have attributes set the default permissions have to be inherited."""
+ _zipfile = os.path.join(os.path.dirname(__file__), 'files', 'missing_file_attributes.zip')
+ self.assertTrue(os.path.exists(_zipfile))
+ dest = tempfile.mkdtemp()
+ try:
+ # Get the default file permissions for the user
+ fname = os.path.join(dest, 'foo')
+ with open(fname, 'w'):
+ pass
+ default_stmode = os.stat(fname).st_mode
+
+ files = mozfile.extract_zip(_zipfile, dest)
+ for filename in files:
+ self.assertEqual(os.stat(os.path.join(dest, filename)).st_mode,
+ default_stmode)
+ finally:
+ shutil.rmtree(dest)
+
+ def test_extract_tarball(self):
+ """test extracting a tarball"""
+ tarball = self.create_tarball()
+ self.assertTrue(os.path.exists(tarball))
+ try:
+ dest = tempfile.mkdtemp()
+ try:
+ mozfile.extract_tarball(tarball, dest)
+ self.ensure_directory_contents(dest)
+ finally:
+ shutil.rmtree(dest)
+ finally:
+ os.remove(tarball)
+
+ def test_extract(self):
+ """test the generalized extract function"""
+
+ # test extracting a tarball
+ tarball = self.create_tarball()
+ self.assertTrue(os.path.exists(tarball))
+ try:
+ dest = tempfile.mkdtemp()
+ try:
+ mozfile.extract(tarball, dest)
+ self.ensure_directory_contents(dest)
+ finally:
+ shutil.rmtree(dest)
+ finally:
+ os.remove(tarball)
+
+ # test extracting a zipfile
+ _zipfile = self.create_zip()
+ self.assertTrue(os.path.exists(_zipfile))
+ try:
+ dest = tempfile.mkdtemp()
+ try:
+ mozfile.extract_zip(_zipfile, dest)
+ self.ensure_directory_contents(dest)
+ finally:
+ shutil.rmtree(dest)
+ finally:
+ os.remove(_zipfile)
+
+ # test extracting some non-archive; this should fail
+ fd, filename = tempfile.mkstemp()
+ os.write(fd, 'This is not a zipfile or tarball')
+ os.close(fd)
+ exception = None
+ try:
+ dest = tempfile.mkdtemp()
+ mozfile.extract(filename, dest)
+ except Exception as exception:
+ pass
+ finally:
+ os.remove(filename)
+ os.rmdir(dest)
+ self.assertTrue(isinstance(exception, Exception))
+
+ # utility functions
+
+ def create_tarball(self):
+ """create a stub tarball for testing"""
+ tempdir = stubs.create_stub()
+ filename = tempfile.mktemp(suffix='.tar')
+ archive = tarfile.TarFile(filename, mode='w')
+ try:
+ for path in stubs.files:
+ archive.add(os.path.join(tempdir, *path), arcname=os.path.join(*path))
+ except:
+ os.remove(archive)
+ raise
+ finally:
+ shutil.rmtree(tempdir)
+ archive.close()
+ return filename
+
+ def create_zip(self):
+ """create a stub zipfile for testing"""
+
+ tempdir = stubs.create_stub()
+ filename = tempfile.mktemp(suffix='.zip')
+ archive = zipfile.ZipFile(filename, mode='w')
+ try:
+ for path in stubs.files:
+ archive.write(os.path.join(tempdir, *path), arcname=os.path.join(*path))
+ except:
+ os.remove(filename)
+ raise
+ finally:
+ shutil.rmtree(tempdir)
+ archive.close()
+ return filename
diff --git a/testing/mozbase/mozfile/tests/test_load.py b/testing/mozbase/mozfile/tests/test_load.py
new file mode 100755
index 000000000..13a5b519c
--- /dev/null
+++ b/testing/mozbase/mozfile/tests/test_load.py
@@ -0,0 +1,62 @@
+#!/usr/bin/env python
+
+"""
+tests for mozfile.load
+"""
+
+import mozhttpd
+import os
+import tempfile
+import unittest
+from mozfile import load
+
+
+class TestLoad(unittest.TestCase):
+ """test the load function"""
+
+ def test_http(self):
+ """test with mozhttpd and a http:// URL"""
+
+ def example(request):
+ """example request handler"""
+ body = 'example'
+ return (200, {'Content-type': 'text/plain',
+ 'Content-length': len(body)
+ }, body)
+
+ host = '127.0.0.1'
+ httpd = mozhttpd.MozHttpd(host=host,
+ urlhandlers=[{'method': 'GET',
+ 'path': '.*',
+ 'function': example}])
+ try:
+ httpd.start(block=False)
+ content = load(httpd.get_url()).read()
+ self.assertEqual(content, 'example')
+ finally:
+ httpd.stop()
+
+ def test_file_path(self):
+ """test loading from file path"""
+ try:
+ # create a temporary file
+ tmp = tempfile.NamedTemporaryFile(delete=False)
+ tmp.write('foo bar')
+ tmp.close()
+
+ # read the file
+ contents = file(tmp.name).read()
+ self.assertEqual(contents, 'foo bar')
+
+ # read the file with load and a file path
+ self.assertEqual(load(tmp.name).read(), contents)
+
+ # read the file with load and a file URL
+ self.assertEqual(load('file://%s' % tmp.name).read(), contents)
+ finally:
+ # remove the tempfile
+ if os.path.exists(tmp.name):
+ os.remove(tmp.name)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/testing/mozbase/mozfile/tests/test_move_remove.py b/testing/mozbase/mozfile/tests/test_move_remove.py
new file mode 100644
index 000000000..e9d0cd434
--- /dev/null
+++ b/testing/mozbase/mozfile/tests/test_move_remove.py
@@ -0,0 +1,232 @@
+#!/usr/bin/env python
+
+import os
+import stat
+import shutil
+import threading
+import time
+import unittest
+import errno
+from contextlib import contextmanager
+
+import mozfile
+import mozinfo
+
+import stubs
+
+
+def mark_readonly(path):
+ """Removes all write permissions from given file/directory.
+
+ :param path: path of directory/file of which modes must be changed
+ """
+ mode = os.stat(path)[stat.ST_MODE]
+ os.chmod(path, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH)
+
+
+class FileOpenCloseThread(threading.Thread):
+ """Helper thread for asynchronous file handling"""
+
+ def __init__(self, path, delay, delete=False):
+ threading.Thread.__init__(self)
+ self.file_opened = threading.Event()
+ self.delay = delay
+ self.path = path
+ self.delete = delete
+
+ def run(self):
+ with open(self.path):
+ self.file_opened.set()
+ time.sleep(self.delay)
+ if self.delete:
+ try:
+ os.remove(self.path)
+ except:
+ pass
+
+
+@contextmanager
+def wait_file_opened_in_thread(*args, **kwargs):
+ thread = FileOpenCloseThread(*args, **kwargs)
+ thread.start()
+ thread.file_opened.wait()
+ try:
+ yield thread
+ finally:
+ thread.join()
+
+
+class MozfileRemoveTestCase(unittest.TestCase):
+ """Test our ability to remove directories and files"""
+
+ def setUp(self):
+ # Generate a stub
+ self.tempdir = stubs.create_stub()
+
+ def tearDown(self):
+ if os.path.isdir(self.tempdir):
+ shutil.rmtree(self.tempdir)
+
+ def test_remove_directory(self):
+ """Test the removal of a directory"""
+ self.assertTrue(os.path.isdir(self.tempdir))
+ mozfile.remove(self.tempdir)
+ self.assertFalse(os.path.exists(self.tempdir))
+
+ def test_remove_directory_with_open_file(self):
+ """Test removing a directory with an open file"""
+ # Open a file in the generated stub
+ filepath = os.path.join(self.tempdir, *stubs.files[1])
+ f = file(filepath, 'w')
+ f.write('foo-bar')
+
+ # keep file open and then try removing the dir-tree
+ if mozinfo.isWin:
+ # On the Windows family WindowsError should be raised.
+ self.assertRaises(OSError, mozfile.remove, self.tempdir)
+ self.assertTrue(os.path.exists(self.tempdir))
+ else:
+ # Folder should be deleted on all other platforms
+ mozfile.remove(self.tempdir)
+ self.assertFalse(os.path.exists(self.tempdir))
+
+ def test_remove_closed_file(self):
+ """Test removing a closed file"""
+ # Open a file in the generated stub
+ filepath = os.path.join(self.tempdir, *stubs.files[1])
+ with open(filepath, 'w') as f:
+ f.write('foo-bar')
+
+ # Folder should be deleted on all platforms
+ mozfile.remove(self.tempdir)
+ self.assertFalse(os.path.exists(self.tempdir))
+
+ def test_removing_open_file_with_retry(self):
+ """Test removing a file in use with retry"""
+ filepath = os.path.join(self.tempdir, *stubs.files[1])
+
+ with wait_file_opened_in_thread(filepath, 0.2):
+ # on windows first attempt will fail,
+ # and it will be retried until the thread leave the handle
+ mozfile.remove(filepath)
+
+ # Check deletion was successful
+ self.assertFalse(os.path.exists(filepath))
+
+ def test_removing_already_deleted_file_with_retry(self):
+ """Test removing a meanwhile removed file with retry"""
+ filepath = os.path.join(self.tempdir, *stubs.files[1])
+
+ with wait_file_opened_in_thread(filepath, 0.2, True):
+ # on windows first attempt will fail, and before
+ # the retry the opened file will be deleted in the thread
+ mozfile.remove(filepath)
+
+ # Check deletion was successful
+ self.assertFalse(os.path.exists(filepath))
+
+ def test_remove_readonly_tree(self):
+ """Test removing a read-only directory"""
+
+ dirpath = os.path.join(self.tempdir, "nested_tree")
+ mark_readonly(dirpath)
+
+ # However, mozfile should change write permissions and remove dir.
+ mozfile.remove(dirpath)
+
+ self.assertFalse(os.path.exists(dirpath))
+
+ def test_remove_readonly_file(self):
+ """Test removing read-only files"""
+ filepath = os.path.join(self.tempdir, *stubs.files[1])
+ mark_readonly(filepath)
+
+ # However, mozfile should change write permission and then remove file.
+ mozfile.remove(filepath)
+
+ self.assertFalse(os.path.exists(filepath))
+
+ @unittest.skipIf(mozinfo.isWin, "Symlinks are not supported on Windows")
+ def test_remove_symlink(self):
+ """Test removing a symlink"""
+ file_path = os.path.join(self.tempdir, *stubs.files[1])
+ symlink_path = os.path.join(self.tempdir, 'symlink')
+
+ os.symlink(file_path, symlink_path)
+ self.assertTrue(os.path.islink(symlink_path))
+
+ # The linked folder and files should not be deleted
+ mozfile.remove(symlink_path)
+ self.assertFalse(os.path.exists(symlink_path))
+ self.assertTrue(os.path.exists(file_path))
+
+ @unittest.skipIf(mozinfo.isWin, "Symlinks are not supported on Windows")
+ def test_remove_symlink_in_subfolder(self):
+ """Test removing a folder with an contained symlink"""
+ file_path = os.path.join(self.tempdir, *stubs.files[0])
+ dir_path = os.path.dirname(os.path.join(self.tempdir, *stubs.files[1]))
+ symlink_path = os.path.join(dir_path, 'symlink')
+
+ os.symlink(file_path, symlink_path)
+ self.assertTrue(os.path.islink(symlink_path))
+
+ # The folder with the contained symlink will be deleted but not the
+ # original linked file
+ mozfile.remove(dir_path)
+ self.assertFalse(os.path.exists(dir_path))
+ self.assertFalse(os.path.exists(symlink_path))
+ self.assertTrue(os.path.exists(file_path))
+
+ @unittest.skipIf(mozinfo.isWin or not os.geteuid(),
+ "Symlinks are not supported on Windows and cannot run test as root")
+ def test_remove_symlink_for_system_path(self):
+ """Test removing a symlink which points to a system folder"""
+ symlink_path = os.path.join(self.tempdir, 'symlink')
+
+ os.symlink(os.path.dirname(self.tempdir), symlink_path)
+ self.assertTrue(os.path.islink(symlink_path))
+
+ # The folder with the contained symlink will be deleted but not the
+ # original linked file
+ mozfile.remove(symlink_path)
+ self.assertFalse(os.path.exists(symlink_path))
+
+ def test_remove_path_that_does_not_exists(self):
+ not_existing_path = os.path.join(self.tempdir, 'I_do_not_not_exists')
+ try:
+ mozfile.remove(not_existing_path)
+ except OSError as exc:
+ if exc.errno == errno.ENOENT:
+ self.fail("removing non existing path must not raise error")
+ raise
+
+
+class MozFileMoveTestCase(unittest.TestCase):
+
+ def setUp(self):
+ # Generate a stub
+ self.tempdir = stubs.create_stub()
+ self.addCleanup(mozfile.rmtree, self.tempdir)
+
+ def test_move_file(self):
+ file_path = os.path.join(self.tempdir, *stubs.files[1])
+ moved_path = file_path + '.moved'
+ self.assertTrue(os.path.isfile(file_path))
+ self.assertFalse(os.path.exists(moved_path))
+ mozfile.move(file_path, moved_path)
+ self.assertFalse(os.path.exists(file_path))
+ self.assertTrue(os.path.isfile(moved_path))
+
+ def test_move_file_with_retry(self):
+ file_path = os.path.join(self.tempdir, *stubs.files[1])
+ moved_path = file_path + '.moved'
+
+ with wait_file_opened_in_thread(file_path, 0.2):
+ # first move attempt should fail on windows and be retried
+ mozfile.move(file_path, moved_path)
+ self.assertFalse(os.path.exists(file_path))
+ self.assertTrue(os.path.isfile(moved_path))
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/testing/mozbase/mozfile/tests/test_tempdir.py b/testing/mozbase/mozfile/tests/test_tempdir.py
new file mode 100644
index 000000000..81f03d095
--- /dev/null
+++ b/testing/mozbase/mozfile/tests/test_tempdir.py
@@ -0,0 +1,42 @@
+#!/usr/bin/env python
+
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+"""
+tests for mozfile.TemporaryDirectory
+"""
+
+from mozfile import TemporaryDirectory
+import os
+import unittest
+
+
+class TestTemporaryDirectory(unittest.TestCase):
+
+ def test_removed(self):
+ """ensure that a TemporaryDirectory gets removed"""
+ path = None
+ with TemporaryDirectory() as tmp:
+ path = tmp
+ self.assertTrue(os.path.isdir(tmp))
+ tmpfile = os.path.join(tmp, "a_temp_file")
+ open(tmpfile, "w").write("data")
+ self.assertTrue(os.path.isfile(tmpfile))
+ self.assertFalse(os.path.isdir(path))
+ self.assertFalse(os.path.exists(path))
+
+ def test_exception(self):
+ """ensure that TemporaryDirectory handles exceptions"""
+ path = None
+ with self.assertRaises(Exception):
+ with TemporaryDirectory() as tmp:
+ path = tmp
+ self.assertTrue(os.path.isdir(tmp))
+ raise Exception("oops")
+ self.assertFalse(os.path.isdir(path))
+ self.assertFalse(os.path.exists(path))
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/testing/mozbase/mozfile/tests/test_tempfile.py b/testing/mozbase/mozfile/tests/test_tempfile.py
new file mode 100644
index 000000000..3c3d26d5d
--- /dev/null
+++ b/testing/mozbase/mozfile/tests/test_tempfile.py
@@ -0,0 +1,102 @@
+#!/usr/bin/env python
+
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+"""
+tests for mozfile.NamedTemporaryFile
+"""
+
+import mozfile
+import os
+import unittest
+
+
+class TestNamedTemporaryFile(unittest.TestCase):
+ """test our fix for NamedTemporaryFile"""
+
+ def test_named_temporary_file(self):
+ """ Ensure the fix for re-opening a NamedTemporaryFile works
+
+ Refer to https://bugzilla.mozilla.org/show_bug.cgi?id=818777
+ and https://bugzilla.mozilla.org/show_bug.cgi?id=821362
+ """
+
+ test_string = "A simple test"
+ with mozfile.NamedTemporaryFile() as temp:
+ # Test we can write to file
+ temp.write(test_string)
+ # Forced flush, so that we can read later
+ temp.flush()
+
+ # Test we can open the file again on all platforms
+ self.assertEqual(open(temp.name).read(), test_string)
+
+ def test_iteration(self):
+ """ensure the line iterator works"""
+
+ # make a file and write to it
+ tf = mozfile.NamedTemporaryFile()
+ notes = ['doe', 'rae', 'mi']
+ for note in notes:
+ tf.write('%s\n' % note)
+ tf.flush()
+
+ # now read from it
+ tf.seek(0)
+ lines = [line.rstrip('\n') for line in tf.readlines()]
+ self.assertEqual(lines, notes)
+
+ # now read from it iteratively
+ lines = []
+ for line in tf:
+ lines.append(line.strip())
+ self.assertEqual(lines, []) # because we did not seek(0)
+ tf.seek(0)
+ lines = []
+ for line in tf:
+ lines.append(line.strip())
+ self.assertEqual(lines, notes)
+
+ def test_delete(self):
+ """ensure ``delete=True/False`` works as expected"""
+
+ # make a deleteable file; ensure it gets cleaned up
+ path = None
+ with mozfile.NamedTemporaryFile(delete=True) as tf:
+ path = tf.name
+ self.assertTrue(isinstance(path, basestring))
+ self.assertFalse(os.path.exists(path))
+
+ # it is also deleted when __del__ is called
+ # here we will do so explicitly
+ tf = mozfile.NamedTemporaryFile(delete=True)
+ path = tf.name
+ self.assertTrue(os.path.exists(path))
+ del tf
+ self.assertFalse(os.path.exists(path))
+
+ # Now the same thing but we won't delete the file
+ path = None
+ try:
+ with mozfile.NamedTemporaryFile(delete=False) as tf:
+ path = tf.name
+ self.assertTrue(os.path.exists(path))
+ finally:
+ if path and os.path.exists(path):
+ os.remove(path)
+
+ path = None
+ try:
+ tf = mozfile.NamedTemporaryFile(delete=False)
+ path = tf.name
+ self.assertTrue(os.path.exists(path))
+ del tf
+ self.assertTrue(os.path.exists(path))
+ finally:
+ if path and os.path.exists(path):
+ os.remove(path)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/testing/mozbase/mozfile/tests/test_url.py b/testing/mozbase/mozfile/tests/test_url.py
new file mode 100755
index 000000000..7d2b12b39
--- /dev/null
+++ b/testing/mozbase/mozfile/tests/test_url.py
@@ -0,0 +1,21 @@
+#!/usr/bin/env python
+
+"""
+tests for is_url
+"""
+
+import unittest
+from mozfile import is_url
+
+
+class TestIsUrl(unittest.TestCase):
+ """test the is_url function"""
+
+ def test_is_url(self):
+ self.assertTrue(is_url('http://mozilla.org'))
+ self.assertFalse(is_url('/usr/bin/mozilla.org'))
+ self.assertTrue(is_url('file:///usr/bin/mozilla.org'))
+ self.assertFalse(is_url('c:\foo\bar'))
+
+if __name__ == '__main__':
+ unittest.main()