diff options
Diffstat (limited to 'testing/mozbase/mozfile')
-rw-r--r-- | testing/mozbase/mozfile/mozfile/__init__.py | 8 | ||||
-rw-r--r-- | testing/mozbase/mozfile/mozfile/mozfile.py | 449 | ||||
-rw-r--r-- | testing/mozbase/mozfile/setup.py | 25 | ||||
-rw-r--r-- | testing/mozbase/mozfile/tests/files/missing_file_attributes.zip | bin | 0 -> 442 bytes | |||
-rw-r--r-- | testing/mozbase/mozfile/tests/manifest.ini | 6 | ||||
-rw-r--r-- | testing/mozbase/mozfile/tests/stubs.py | 37 | ||||
-rw-r--r-- | testing/mozbase/mozfile/tests/test_extract.py | 154 | ||||
-rwxr-xr-x | testing/mozbase/mozfile/tests/test_load.py | 62 | ||||
-rw-r--r-- | testing/mozbase/mozfile/tests/test_move_remove.py | 232 | ||||
-rw-r--r-- | testing/mozbase/mozfile/tests/test_tempdir.py | 42 | ||||
-rw-r--r-- | testing/mozbase/mozfile/tests/test_tempfile.py | 102 | ||||
-rwxr-xr-x | testing/mozbase/mozfile/tests/test_url.py | 21 |
12 files changed, 1138 insertions, 0 deletions
diff --git a/testing/mozbase/mozfile/mozfile/__init__.py b/testing/mozbase/mozfile/mozfile/__init__.py new file mode 100644 index 000000000..a527f0ad6 --- /dev/null +++ b/testing/mozbase/mozfile/mozfile/__init__.py @@ -0,0 +1,8 @@ +# flake8: noqa +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +from __future__ import absolute_import + +from .mozfile import * diff --git a/testing/mozbase/mozfile/mozfile/mozfile.py b/testing/mozbase/mozfile/mozfile/mozfile.py new file mode 100644 index 000000000..94805594e --- /dev/null +++ b/testing/mozbase/mozfile/mozfile/mozfile.py @@ -0,0 +1,449 @@ +# -*- coding: utf-8 -*- + +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +# We don't import all modules at the top for performance reasons. See Bug 1008943 + +from __future__ import absolute_import + +from contextlib import contextmanager +import errno +import os +import stat +import time +import warnings + +__all__ = ['extract_tarball', + 'extract_zip', + 'extract', + 'is_url', + 'load', + 'move', + 'remove', + 'rmtree', + 'tree', + 'NamedTemporaryFile', + 'TemporaryDirectory'] + +# utilities for extracting archives + + +def extract_tarball(src, dest): + """extract a .tar file""" + + import tarfile + + bundle = tarfile.open(src) + namelist = bundle.getnames() + + for name in namelist: + bundle.extract(name, path=dest) + bundle.close() + return namelist + + +def extract_zip(src, dest): + """extract a zip file""" + + import zipfile + + if isinstance(src, zipfile.ZipFile): + bundle = src + else: + try: + bundle = zipfile.ZipFile(src) + except Exception: + print "src: %s" % src + raise + + namelist = bundle.namelist() + + for name in namelist: + filename = os.path.realpath(os.path.join(dest, name)) + if name.endswith('/'): + if not os.path.isdir(filename): + os.makedirs(filename) + else: + path = os.path.dirname(filename) + if not os.path.isdir(path): + os.makedirs(path) + _dest = open(filename, 'wb') + _dest.write(bundle.read(name)) + _dest.close() + mode = bundle.getinfo(name).external_attr >> 16 & 0x1FF + # Only update permissions if attributes are set. Otherwise fallback to the defaults. + if mode: + os.chmod(filename, mode) + bundle.close() + return namelist + + +def extract(src, dest=None): + """ + Takes in a tar or zip file and extracts it to dest + + If dest is not specified, extracts to os.path.dirname(src) + + Returns the list of top level files that were extracted + """ + + import zipfile + import tarfile + + assert os.path.exists(src), "'%s' does not exist" % src + + if dest is None: + dest = os.path.dirname(src) + elif not os.path.isdir(dest): + os.makedirs(dest) + assert not os.path.isfile(dest), "dest cannot be a file" + + if zipfile.is_zipfile(src): + namelist = extract_zip(src, dest) + elif tarfile.is_tarfile(src): + namelist = extract_tarball(src, dest) + else: + raise Exception("mozfile.extract: no archive format found for '%s'" % + src) + + # namelist returns paths with forward slashes even in windows + top_level_files = [os.path.join(dest, name.rstrip('/')) for name in namelist + if len(name.rstrip('/').split('/')) == 1] + + # namelist doesn't include folders, append these to the list + for name in namelist: + index = name.find('/') + if index != -1: + root = os.path.join(dest, name[:index]) + if root not in top_level_files: + top_level_files.append(root) + + return top_level_files + + +# utilities for removal of files and directories + +def rmtree(dir): + """Deprecated wrapper method to remove a directory tree. + + Ensure to update your code to use mozfile.remove() directly + + :param dir: directory to be removed + """ + + warnings.warn("mozfile.rmtree() is deprecated in favor of mozfile.remove()", + PendingDeprecationWarning, stacklevel=2) + return remove(dir) + + +def _call_windows_retry(func, args=(), retry_max=5, retry_delay=0.5): + """ + It's possible to see spurious errors on Windows due to various things + keeping a handle to the directory open (explorer, virus scanners, etc) + So we try a few times if it fails with a known error. + retry_delay is multiplied by the number of failed attempts to increase + the likelihood of success in subsequent attempts. + """ + retry_count = 0 + while True: + try: + func(*args) + except OSError as e: + # Error codes are defined in: + # http://docs.python.org/2/library/errno.html#module-errno + if e.errno not in (errno.EACCES, errno.ENOTEMPTY): + raise + + if retry_count == retry_max: + raise + + retry_count += 1 + + print '%s() failed for "%s". Reason: %s (%s). Retrying...' % \ + (func.__name__, args, e.strerror, e.errno) + time.sleep(retry_count * retry_delay) + else: + # If no exception has been thrown it should be done + break + + +def remove(path): + """Removes the specified file, link, or directory tree. + + This is a replacement for shutil.rmtree that works better under + windows. It does the following things: + + - check path access for the current user before trying to remove + - retry operations on some known errors due to various things keeping + a handle on file paths - like explorer, virus scanners, etc. The + known errors are errno.EACCES and errno.ENOTEMPTY, and it will + retry up to 5 five times with a delay of (failed_attempts * 0.5) seconds + between each attempt. + + Note that no error will be raised if the given path does not exists. + + :param path: path to be removed + """ + + import shutil + + def _call_with_windows_retry(*args, **kwargs): + try: + _call_windows_retry(*args, **kwargs) + except OSError as e: + # The file or directory to be removed doesn't exist anymore + if e.errno != errno.ENOENT: + raise + + def _update_permissions(path): + """Sets specified pemissions depending on filetype""" + if os.path.islink(path): + # Path is a symlink which we don't have to modify + # because it should already have all the needed permissions + return + + stats = os.stat(path) + + if os.path.isfile(path): + mode = stats.st_mode | stat.S_IWUSR + elif os.path.isdir(path): + mode = stats.st_mode | stat.S_IWUSR | stat.S_IXUSR + else: + # Not supported type + return + + _call_with_windows_retry(os.chmod, (path, mode)) + + if not os.path.exists(path): + return + + if os.path.isfile(path) or os.path.islink(path): + # Verify the file or link is read/write for the current user + _update_permissions(path) + _call_with_windows_retry(os.remove, (path,)) + + elif os.path.isdir(path): + # Verify the directory is read/write/execute for the current user + _update_permissions(path) + + # We're ensuring that every nested item has writable permission. + for root, dirs, files in os.walk(path): + for entry in dirs + files: + _update_permissions(os.path.join(root, entry)) + _call_with_windows_retry(shutil.rmtree, (path,)) + + +def move(src, dst): + """ + Move a file or directory path. + + This is a replacement for shutil.move that works better under windows, + retrying operations on some known errors due to various things keeping + a handle on file paths. + """ + import shutil + _call_windows_retry(shutil.move, (src, dst)) + + +def depth(directory): + """returns the integer depth of a directory or path relative to '/' """ + + directory = os.path.abspath(directory) + level = 0 + while True: + directory, remainder = os.path.split(directory) + level += 1 + if not remainder: + break + return level + + +# ASCII delimeters +ascii_delimeters = { + 'vertical_line': '|', + 'item_marker': '+', + 'last_child': '\\' +} + +# unicode delimiters +unicode_delimeters = { + 'vertical_line': '│', + 'item_marker': '├', + 'last_child': '└' +} + + +def tree(directory, + item_marker=unicode_delimeters['item_marker'], + vertical_line=unicode_delimeters['vertical_line'], + last_child=unicode_delimeters['last_child'], + sort_key=lambda x: x.lower()): + """ + display tree directory structure for `directory` + """ + + retval = [] + indent = [] + last = {} + top = depth(directory) + + for dirpath, dirnames, filenames in os.walk(directory, topdown=True): + + abspath = os.path.abspath(dirpath) + basename = os.path.basename(abspath) + parent = os.path.dirname(abspath) + level = depth(abspath) - top + + # sort articles of interest + for resource in (dirnames, filenames): + resource[:] = sorted(resource, key=sort_key) + + if level > len(indent): + indent.append(vertical_line) + indent = indent[:level] + + if dirnames: + files_end = item_marker + last[abspath] = dirnames[-1] + else: + files_end = last_child + + if last.get(parent) == os.path.basename(abspath): + # last directory of parent + dirpath_mark = last_child + indent[-1] = ' ' + elif not indent: + dirpath_mark = '' + else: + dirpath_mark = item_marker + + # append the directory and piece of tree structure + # if the top-level entry directory, print as passed + retval.append('%s%s%s' % (''.join(indent[:-1]), + dirpath_mark, + basename if retval else directory)) + # add the files + if filenames: + last_file = filenames[-1] + retval.extend([('%s%s%s' % (''.join(indent), + files_end if filename == last_file else item_marker, + filename)) + for index, filename in enumerate(filenames)]) + + return '\n'.join(retval) + + +# utilities for temporary resources + +class NamedTemporaryFile(object): + """ + Like tempfile.NamedTemporaryFile except it works on Windows + in the case where you open the created file a second time. + + This behaves very similarly to tempfile.NamedTemporaryFile but may + not behave exactly the same. For example, this function does not + prevent fd inheritance by children. + + Example usage: + + with NamedTemporaryFile() as fh: + fh.write(b'foobar') + + print('Filename: %s' % fh.name) + + see https://bugzilla.mozilla.org/show_bug.cgi?id=821362 + """ + + def __init__(self, mode='w+b', bufsize=-1, suffix='', prefix='tmp', + dir=None, delete=True): + + import tempfile + fd, path = tempfile.mkstemp(suffix, prefix, dir, 't' in mode) + os.close(fd) + + self.file = open(path, mode) + self._path = path + self._delete = delete + self._unlinked = False + + def __getattr__(self, k): + return getattr(self.__dict__['file'], k) + + def __iter__(self): + return self.__dict__['file'] + + def __enter__(self): + self.file.__enter__() + return self + + def __exit__(self, exc, value, tb): + self.file.__exit__(exc, value, tb) + if self.__dict__['_delete']: + os.unlink(self.__dict__['_path']) + self._unlinked = True + + def __del__(self): + if self.__dict__['_unlinked']: + return + self.file.__exit__(None, None, None) + if self.__dict__['_delete']: + os.unlink(self.__dict__['_path']) + + +@contextmanager +def TemporaryDirectory(): + """ + create a temporary directory using tempfile.mkdtemp, and then clean it up. + + Example usage: + with TemporaryDirectory() as tmp: + open(os.path.join(tmp, "a_temp_file"), "w").write("data") + + """ + + import tempfile + import shutil + + tempdir = tempfile.mkdtemp() + try: + yield tempdir + finally: + shutil.rmtree(tempdir) + + +# utilities dealing with URLs + +def is_url(thing): + """ + Return True if thing looks like a URL. + """ + + import urlparse + + parsed = urlparse.urlparse(thing) + if 'scheme' in parsed: + return len(parsed.scheme) >= 2 + else: + return len(parsed[0]) >= 2 + + +def load(resource): + """ + open a file or URL for reading. If the passed resource string is not a URL, + or begins with 'file://', return a ``file``. Otherwise, return the + result of urllib2.urlopen() + """ + + import urllib2 + + # handle file URLs separately due to python stdlib limitations + if resource.startswith('file://'): + resource = resource[len('file://'):] + + if not is_url(resource): + # if no scheme is given, it is a file path + return file(resource) + + return urllib2.urlopen(resource) diff --git a/testing/mozbase/mozfile/setup.py b/testing/mozbase/mozfile/setup.py new file mode 100644 index 000000000..277ff7b52 --- /dev/null +++ b/testing/mozbase/mozfile/setup.py @@ -0,0 +1,25 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this file, +# You can obtain one at http://mozilla.org/MPL/2.0/. + +from setuptools import setup + +PACKAGE_NAME = 'mozfile' +PACKAGE_VERSION = '1.2' + +setup(name=PACKAGE_NAME, + version=PACKAGE_VERSION, + description="Library of file utilities for use in Mozilla testing", + long_description="see http://mozbase.readthedocs.org/", + classifiers=[], # Get strings from http://pypi.python.org/pypi?%3Aaction=list_classifiers + keywords='mozilla', + author='Mozilla Automation and Tools team', + author_email='tools@lists.mozilla.org', + url='https://wiki.mozilla.org/Auto-tools/Projects/Mozbase', + license='MPL', + packages=['mozfile'], + include_package_data=True, + zip_safe=False, + install_requires=[], + tests_require=['mozhttpd'] + ) diff --git a/testing/mozbase/mozfile/tests/files/missing_file_attributes.zip b/testing/mozbase/mozfile/tests/files/missing_file_attributes.zip Binary files differnew file mode 100644 index 000000000..2b5409e89 --- /dev/null +++ b/testing/mozbase/mozfile/tests/files/missing_file_attributes.zip diff --git a/testing/mozbase/mozfile/tests/manifest.ini b/testing/mozbase/mozfile/tests/manifest.ini new file mode 100644 index 000000000..c7889beca --- /dev/null +++ b/testing/mozbase/mozfile/tests/manifest.ini @@ -0,0 +1,6 @@ +[test_extract.py] +[test_load.py] +[test_move_remove.py] +[test_tempdir.py] +[test_tempfile.py] +[test_url.py] diff --git a/testing/mozbase/mozfile/tests/stubs.py b/testing/mozbase/mozfile/tests/stubs.py new file mode 100644 index 000000000..06d79e7af --- /dev/null +++ b/testing/mozbase/mozfile/tests/stubs.py @@ -0,0 +1,37 @@ +import os +import shutil +import tempfile + + +# stub file paths +files = [('foo.txt',), + ('foo', 'bar.txt',), + ('foo', 'bar', 'fleem.txt',), + ('foobar', 'fleem.txt',), + ('bar.txt',), + ('nested_tree', 'bar', 'fleem.txt',), + ('readonly.txt',), + ] + + +def create_stub(): + """create a stub directory""" + + tempdir = tempfile.mkdtemp() + try: + for path in files: + fullpath = os.path.join(tempdir, *path) + dirname = os.path.dirname(fullpath) + if not os.path.exists(dirname): + os.makedirs(dirname) + contents = path[-1] + f = file(fullpath, 'w') + f.write(contents) + f.close() + return tempdir + except Exception: + try: + shutil.rmtree(tempdir) + except: + pass + raise diff --git a/testing/mozbase/mozfile/tests/test_extract.py b/testing/mozbase/mozfile/tests/test_extract.py new file mode 100644 index 000000000..e91f52349 --- /dev/null +++ b/testing/mozbase/mozfile/tests/test_extract.py @@ -0,0 +1,154 @@ +#!/usr/bin/env python + +import os +import shutil +import tarfile +import tempfile +import unittest +import zipfile + +import mozfile + +import stubs + + +class TestExtract(unittest.TestCase): + """test extracting archives""" + + def ensure_directory_contents(self, directory): + """ensure the directory contents match""" + for f in stubs.files: + path = os.path.join(directory, *f) + exists = os.path.exists(path) + if not exists: + print "%s does not exist" % (os.path.join(f)) + self.assertTrue(exists) + if exists: + contents = file(path).read().strip() + self.assertTrue(contents == f[-1]) + + def test_extract_zipfile(self): + """test extracting a zipfile""" + _zipfile = self.create_zip() + self.assertTrue(os.path.exists(_zipfile)) + try: + dest = tempfile.mkdtemp() + try: + mozfile.extract_zip(_zipfile, dest) + self.ensure_directory_contents(dest) + finally: + shutil.rmtree(dest) + finally: + os.remove(_zipfile) + + def test_extract_zipfile_missing_file_attributes(self): + """if files do not have attributes set the default permissions have to be inherited.""" + _zipfile = os.path.join(os.path.dirname(__file__), 'files', 'missing_file_attributes.zip') + self.assertTrue(os.path.exists(_zipfile)) + dest = tempfile.mkdtemp() + try: + # Get the default file permissions for the user + fname = os.path.join(dest, 'foo') + with open(fname, 'w'): + pass + default_stmode = os.stat(fname).st_mode + + files = mozfile.extract_zip(_zipfile, dest) + for filename in files: + self.assertEqual(os.stat(os.path.join(dest, filename)).st_mode, + default_stmode) + finally: + shutil.rmtree(dest) + + def test_extract_tarball(self): + """test extracting a tarball""" + tarball = self.create_tarball() + self.assertTrue(os.path.exists(tarball)) + try: + dest = tempfile.mkdtemp() + try: + mozfile.extract_tarball(tarball, dest) + self.ensure_directory_contents(dest) + finally: + shutil.rmtree(dest) + finally: + os.remove(tarball) + + def test_extract(self): + """test the generalized extract function""" + + # test extracting a tarball + tarball = self.create_tarball() + self.assertTrue(os.path.exists(tarball)) + try: + dest = tempfile.mkdtemp() + try: + mozfile.extract(tarball, dest) + self.ensure_directory_contents(dest) + finally: + shutil.rmtree(dest) + finally: + os.remove(tarball) + + # test extracting a zipfile + _zipfile = self.create_zip() + self.assertTrue(os.path.exists(_zipfile)) + try: + dest = tempfile.mkdtemp() + try: + mozfile.extract_zip(_zipfile, dest) + self.ensure_directory_contents(dest) + finally: + shutil.rmtree(dest) + finally: + os.remove(_zipfile) + + # test extracting some non-archive; this should fail + fd, filename = tempfile.mkstemp() + os.write(fd, 'This is not a zipfile or tarball') + os.close(fd) + exception = None + try: + dest = tempfile.mkdtemp() + mozfile.extract(filename, dest) + except Exception as exception: + pass + finally: + os.remove(filename) + os.rmdir(dest) + self.assertTrue(isinstance(exception, Exception)) + + # utility functions + + def create_tarball(self): + """create a stub tarball for testing""" + tempdir = stubs.create_stub() + filename = tempfile.mktemp(suffix='.tar') + archive = tarfile.TarFile(filename, mode='w') + try: + for path in stubs.files: + archive.add(os.path.join(tempdir, *path), arcname=os.path.join(*path)) + except: + os.remove(archive) + raise + finally: + shutil.rmtree(tempdir) + archive.close() + return filename + + def create_zip(self): + """create a stub zipfile for testing""" + + tempdir = stubs.create_stub() + filename = tempfile.mktemp(suffix='.zip') + archive = zipfile.ZipFile(filename, mode='w') + try: + for path in stubs.files: + archive.write(os.path.join(tempdir, *path), arcname=os.path.join(*path)) + except: + os.remove(filename) + raise + finally: + shutil.rmtree(tempdir) + archive.close() + return filename diff --git a/testing/mozbase/mozfile/tests/test_load.py b/testing/mozbase/mozfile/tests/test_load.py new file mode 100755 index 000000000..13a5b519c --- /dev/null +++ b/testing/mozbase/mozfile/tests/test_load.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python + +""" +tests for mozfile.load +""" + +import mozhttpd +import os +import tempfile +import unittest +from mozfile import load + + +class TestLoad(unittest.TestCase): + """test the load function""" + + def test_http(self): + """test with mozhttpd and a http:// URL""" + + def example(request): + """example request handler""" + body = 'example' + return (200, {'Content-type': 'text/plain', + 'Content-length': len(body) + }, body) + + host = '127.0.0.1' + httpd = mozhttpd.MozHttpd(host=host, + urlhandlers=[{'method': 'GET', + 'path': '.*', + 'function': example}]) + try: + httpd.start(block=False) + content = load(httpd.get_url()).read() + self.assertEqual(content, 'example') + finally: + httpd.stop() + + def test_file_path(self): + """test loading from file path""" + try: + # create a temporary file + tmp = tempfile.NamedTemporaryFile(delete=False) + tmp.write('foo bar') + tmp.close() + + # read the file + contents = file(tmp.name).read() + self.assertEqual(contents, 'foo bar') + + # read the file with load and a file path + self.assertEqual(load(tmp.name).read(), contents) + + # read the file with load and a file URL + self.assertEqual(load('file://%s' % tmp.name).read(), contents) + finally: + # remove the tempfile + if os.path.exists(tmp.name): + os.remove(tmp.name) + +if __name__ == '__main__': + unittest.main() diff --git a/testing/mozbase/mozfile/tests/test_move_remove.py b/testing/mozbase/mozfile/tests/test_move_remove.py new file mode 100644 index 000000000..e9d0cd434 --- /dev/null +++ b/testing/mozbase/mozfile/tests/test_move_remove.py @@ -0,0 +1,232 @@ +#!/usr/bin/env python + +import os +import stat +import shutil +import threading +import time +import unittest +import errno +from contextlib import contextmanager + +import mozfile +import mozinfo + +import stubs + + +def mark_readonly(path): + """Removes all write permissions from given file/directory. + + :param path: path of directory/file of which modes must be changed + """ + mode = os.stat(path)[stat.ST_MODE] + os.chmod(path, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH) + + +class FileOpenCloseThread(threading.Thread): + """Helper thread for asynchronous file handling""" + + def __init__(self, path, delay, delete=False): + threading.Thread.__init__(self) + self.file_opened = threading.Event() + self.delay = delay + self.path = path + self.delete = delete + + def run(self): + with open(self.path): + self.file_opened.set() + time.sleep(self.delay) + if self.delete: + try: + os.remove(self.path) + except: + pass + + +@contextmanager +def wait_file_opened_in_thread(*args, **kwargs): + thread = FileOpenCloseThread(*args, **kwargs) + thread.start() + thread.file_opened.wait() + try: + yield thread + finally: + thread.join() + + +class MozfileRemoveTestCase(unittest.TestCase): + """Test our ability to remove directories and files""" + + def setUp(self): + # Generate a stub + self.tempdir = stubs.create_stub() + + def tearDown(self): + if os.path.isdir(self.tempdir): + shutil.rmtree(self.tempdir) + + def test_remove_directory(self): + """Test the removal of a directory""" + self.assertTrue(os.path.isdir(self.tempdir)) + mozfile.remove(self.tempdir) + self.assertFalse(os.path.exists(self.tempdir)) + + def test_remove_directory_with_open_file(self): + """Test removing a directory with an open file""" + # Open a file in the generated stub + filepath = os.path.join(self.tempdir, *stubs.files[1]) + f = file(filepath, 'w') + f.write('foo-bar') + + # keep file open and then try removing the dir-tree + if mozinfo.isWin: + # On the Windows family WindowsError should be raised. + self.assertRaises(OSError, mozfile.remove, self.tempdir) + self.assertTrue(os.path.exists(self.tempdir)) + else: + # Folder should be deleted on all other platforms + mozfile.remove(self.tempdir) + self.assertFalse(os.path.exists(self.tempdir)) + + def test_remove_closed_file(self): + """Test removing a closed file""" + # Open a file in the generated stub + filepath = os.path.join(self.tempdir, *stubs.files[1]) + with open(filepath, 'w') as f: + f.write('foo-bar') + + # Folder should be deleted on all platforms + mozfile.remove(self.tempdir) + self.assertFalse(os.path.exists(self.tempdir)) + + def test_removing_open_file_with_retry(self): + """Test removing a file in use with retry""" + filepath = os.path.join(self.tempdir, *stubs.files[1]) + + with wait_file_opened_in_thread(filepath, 0.2): + # on windows first attempt will fail, + # and it will be retried until the thread leave the handle + mozfile.remove(filepath) + + # Check deletion was successful + self.assertFalse(os.path.exists(filepath)) + + def test_removing_already_deleted_file_with_retry(self): + """Test removing a meanwhile removed file with retry""" + filepath = os.path.join(self.tempdir, *stubs.files[1]) + + with wait_file_opened_in_thread(filepath, 0.2, True): + # on windows first attempt will fail, and before + # the retry the opened file will be deleted in the thread + mozfile.remove(filepath) + + # Check deletion was successful + self.assertFalse(os.path.exists(filepath)) + + def test_remove_readonly_tree(self): + """Test removing a read-only directory""" + + dirpath = os.path.join(self.tempdir, "nested_tree") + mark_readonly(dirpath) + + # However, mozfile should change write permissions and remove dir. + mozfile.remove(dirpath) + + self.assertFalse(os.path.exists(dirpath)) + + def test_remove_readonly_file(self): + """Test removing read-only files""" + filepath = os.path.join(self.tempdir, *stubs.files[1]) + mark_readonly(filepath) + + # However, mozfile should change write permission and then remove file. + mozfile.remove(filepath) + + self.assertFalse(os.path.exists(filepath)) + + @unittest.skipIf(mozinfo.isWin, "Symlinks are not supported on Windows") + def test_remove_symlink(self): + """Test removing a symlink""" + file_path = os.path.join(self.tempdir, *stubs.files[1]) + symlink_path = os.path.join(self.tempdir, 'symlink') + + os.symlink(file_path, symlink_path) + self.assertTrue(os.path.islink(symlink_path)) + + # The linked folder and files should not be deleted + mozfile.remove(symlink_path) + self.assertFalse(os.path.exists(symlink_path)) + self.assertTrue(os.path.exists(file_path)) + + @unittest.skipIf(mozinfo.isWin, "Symlinks are not supported on Windows") + def test_remove_symlink_in_subfolder(self): + """Test removing a folder with an contained symlink""" + file_path = os.path.join(self.tempdir, *stubs.files[0]) + dir_path = os.path.dirname(os.path.join(self.tempdir, *stubs.files[1])) + symlink_path = os.path.join(dir_path, 'symlink') + + os.symlink(file_path, symlink_path) + self.assertTrue(os.path.islink(symlink_path)) + + # The folder with the contained symlink will be deleted but not the + # original linked file + mozfile.remove(dir_path) + self.assertFalse(os.path.exists(dir_path)) + self.assertFalse(os.path.exists(symlink_path)) + self.assertTrue(os.path.exists(file_path)) + + @unittest.skipIf(mozinfo.isWin or not os.geteuid(), + "Symlinks are not supported on Windows and cannot run test as root") + def test_remove_symlink_for_system_path(self): + """Test removing a symlink which points to a system folder""" + symlink_path = os.path.join(self.tempdir, 'symlink') + + os.symlink(os.path.dirname(self.tempdir), symlink_path) + self.assertTrue(os.path.islink(symlink_path)) + + # The folder with the contained symlink will be deleted but not the + # original linked file + mozfile.remove(symlink_path) + self.assertFalse(os.path.exists(symlink_path)) + + def test_remove_path_that_does_not_exists(self): + not_existing_path = os.path.join(self.tempdir, 'I_do_not_not_exists') + try: + mozfile.remove(not_existing_path) + except OSError as exc: + if exc.errno == errno.ENOENT: + self.fail("removing non existing path must not raise error") + raise + + +class MozFileMoveTestCase(unittest.TestCase): + + def setUp(self): + # Generate a stub + self.tempdir = stubs.create_stub() + self.addCleanup(mozfile.rmtree, self.tempdir) + + def test_move_file(self): + file_path = os.path.join(self.tempdir, *stubs.files[1]) + moved_path = file_path + '.moved' + self.assertTrue(os.path.isfile(file_path)) + self.assertFalse(os.path.exists(moved_path)) + mozfile.move(file_path, moved_path) + self.assertFalse(os.path.exists(file_path)) + self.assertTrue(os.path.isfile(moved_path)) + + def test_move_file_with_retry(self): + file_path = os.path.join(self.tempdir, *stubs.files[1]) + moved_path = file_path + '.moved' + + with wait_file_opened_in_thread(file_path, 0.2): + # first move attempt should fail on windows and be retried + mozfile.move(file_path, moved_path) + self.assertFalse(os.path.exists(file_path)) + self.assertTrue(os.path.isfile(moved_path)) + + +if __name__ == '__main__': + unittest.main() diff --git a/testing/mozbase/mozfile/tests/test_tempdir.py b/testing/mozbase/mozfile/tests/test_tempdir.py new file mode 100644 index 000000000..81f03d095 --- /dev/null +++ b/testing/mozbase/mozfile/tests/test_tempdir.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python + +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +""" +tests for mozfile.TemporaryDirectory +""" + +from mozfile import TemporaryDirectory +import os +import unittest + + +class TestTemporaryDirectory(unittest.TestCase): + + def test_removed(self): + """ensure that a TemporaryDirectory gets removed""" + path = None + with TemporaryDirectory() as tmp: + path = tmp + self.assertTrue(os.path.isdir(tmp)) + tmpfile = os.path.join(tmp, "a_temp_file") + open(tmpfile, "w").write("data") + self.assertTrue(os.path.isfile(tmpfile)) + self.assertFalse(os.path.isdir(path)) + self.assertFalse(os.path.exists(path)) + + def test_exception(self): + """ensure that TemporaryDirectory handles exceptions""" + path = None + with self.assertRaises(Exception): + with TemporaryDirectory() as tmp: + path = tmp + self.assertTrue(os.path.isdir(tmp)) + raise Exception("oops") + self.assertFalse(os.path.isdir(path)) + self.assertFalse(os.path.exists(path)) + +if __name__ == '__main__': + unittest.main() diff --git a/testing/mozbase/mozfile/tests/test_tempfile.py b/testing/mozbase/mozfile/tests/test_tempfile.py new file mode 100644 index 000000000..3c3d26d5d --- /dev/null +++ b/testing/mozbase/mozfile/tests/test_tempfile.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python + +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +""" +tests for mozfile.NamedTemporaryFile +""" + +import mozfile +import os +import unittest + + +class TestNamedTemporaryFile(unittest.TestCase): + """test our fix for NamedTemporaryFile""" + + def test_named_temporary_file(self): + """ Ensure the fix for re-opening a NamedTemporaryFile works + + Refer to https://bugzilla.mozilla.org/show_bug.cgi?id=818777 + and https://bugzilla.mozilla.org/show_bug.cgi?id=821362 + """ + + test_string = "A simple test" + with mozfile.NamedTemporaryFile() as temp: + # Test we can write to file + temp.write(test_string) + # Forced flush, so that we can read later + temp.flush() + + # Test we can open the file again on all platforms + self.assertEqual(open(temp.name).read(), test_string) + + def test_iteration(self): + """ensure the line iterator works""" + + # make a file and write to it + tf = mozfile.NamedTemporaryFile() + notes = ['doe', 'rae', 'mi'] + for note in notes: + tf.write('%s\n' % note) + tf.flush() + + # now read from it + tf.seek(0) + lines = [line.rstrip('\n') for line in tf.readlines()] + self.assertEqual(lines, notes) + + # now read from it iteratively + lines = [] + for line in tf: + lines.append(line.strip()) + self.assertEqual(lines, []) # because we did not seek(0) + tf.seek(0) + lines = [] + for line in tf: + lines.append(line.strip()) + self.assertEqual(lines, notes) + + def test_delete(self): + """ensure ``delete=True/False`` works as expected""" + + # make a deleteable file; ensure it gets cleaned up + path = None + with mozfile.NamedTemporaryFile(delete=True) as tf: + path = tf.name + self.assertTrue(isinstance(path, basestring)) + self.assertFalse(os.path.exists(path)) + + # it is also deleted when __del__ is called + # here we will do so explicitly + tf = mozfile.NamedTemporaryFile(delete=True) + path = tf.name + self.assertTrue(os.path.exists(path)) + del tf + self.assertFalse(os.path.exists(path)) + + # Now the same thing but we won't delete the file + path = None + try: + with mozfile.NamedTemporaryFile(delete=False) as tf: + path = tf.name + self.assertTrue(os.path.exists(path)) + finally: + if path and os.path.exists(path): + os.remove(path) + + path = None + try: + tf = mozfile.NamedTemporaryFile(delete=False) + path = tf.name + self.assertTrue(os.path.exists(path)) + del tf + self.assertTrue(os.path.exists(path)) + finally: + if path and os.path.exists(path): + os.remove(path) + +if __name__ == '__main__': + unittest.main() diff --git a/testing/mozbase/mozfile/tests/test_url.py b/testing/mozbase/mozfile/tests/test_url.py new file mode 100755 index 000000000..7d2b12b39 --- /dev/null +++ b/testing/mozbase/mozfile/tests/test_url.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python + +""" +tests for is_url +""" + +import unittest +from mozfile import is_url + + +class TestIsUrl(unittest.TestCase): + """test the is_url function""" + + def test_is_url(self): + self.assertTrue(is_url('http://mozilla.org')) + self.assertFalse(is_url('/usr/bin/mozilla.org')) + self.assertTrue(is_url('file:///usr/bin/mozilla.org')) + self.assertFalse(is_url('c:\foo\bar')) + +if __name__ == '__main__': + unittest.main() |