diff options
Diffstat (limited to 'testing/mozbase/mozfile/tests/test_move_remove.py')
-rw-r--r-- | testing/mozbase/mozfile/tests/test_move_remove.py | 232 |
1 files changed, 232 insertions, 0 deletions
diff --git a/testing/mozbase/mozfile/tests/test_move_remove.py b/testing/mozbase/mozfile/tests/test_move_remove.py new file mode 100644 index 000000000..e9d0cd434 --- /dev/null +++ b/testing/mozbase/mozfile/tests/test_move_remove.py @@ -0,0 +1,232 @@ +#!/usr/bin/env python + +import os +import stat +import shutil +import threading +import time +import unittest +import errno +from contextlib import contextmanager + +import mozfile +import mozinfo + +import stubs + + +def mark_readonly(path): + """Removes all write permissions from given file/directory. + + :param path: path of directory/file of which modes must be changed + """ + mode = os.stat(path)[stat.ST_MODE] + os.chmod(path, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH) + + +class FileOpenCloseThread(threading.Thread): + """Helper thread for asynchronous file handling""" + + def __init__(self, path, delay, delete=False): + threading.Thread.__init__(self) + self.file_opened = threading.Event() + self.delay = delay + self.path = path + self.delete = delete + + def run(self): + with open(self.path): + self.file_opened.set() + time.sleep(self.delay) + if self.delete: + try: + os.remove(self.path) + except: + pass + + +@contextmanager +def wait_file_opened_in_thread(*args, **kwargs): + thread = FileOpenCloseThread(*args, **kwargs) + thread.start() + thread.file_opened.wait() + try: + yield thread + finally: + thread.join() + + +class MozfileRemoveTestCase(unittest.TestCase): + """Test our ability to remove directories and files""" + + def setUp(self): + # Generate a stub + self.tempdir = stubs.create_stub() + + def tearDown(self): + if os.path.isdir(self.tempdir): + shutil.rmtree(self.tempdir) + + def test_remove_directory(self): + """Test the removal of a directory""" + self.assertTrue(os.path.isdir(self.tempdir)) + mozfile.remove(self.tempdir) + self.assertFalse(os.path.exists(self.tempdir)) + + def test_remove_directory_with_open_file(self): + """Test removing a directory with an open file""" + # Open a file in the generated stub + filepath = os.path.join(self.tempdir, *stubs.files[1]) + f = file(filepath, 'w') + f.write('foo-bar') + + # keep file open and then try removing the dir-tree + if mozinfo.isWin: + # On the Windows family WindowsError should be raised. + self.assertRaises(OSError, mozfile.remove, self.tempdir) + self.assertTrue(os.path.exists(self.tempdir)) + else: + # Folder should be deleted on all other platforms + mozfile.remove(self.tempdir) + self.assertFalse(os.path.exists(self.tempdir)) + + def test_remove_closed_file(self): + """Test removing a closed file""" + # Open a file in the generated stub + filepath = os.path.join(self.tempdir, *stubs.files[1]) + with open(filepath, 'w') as f: + f.write('foo-bar') + + # Folder should be deleted on all platforms + mozfile.remove(self.tempdir) + self.assertFalse(os.path.exists(self.tempdir)) + + def test_removing_open_file_with_retry(self): + """Test removing a file in use with retry""" + filepath = os.path.join(self.tempdir, *stubs.files[1]) + + with wait_file_opened_in_thread(filepath, 0.2): + # on windows first attempt will fail, + # and it will be retried until the thread leave the handle + mozfile.remove(filepath) + + # Check deletion was successful + self.assertFalse(os.path.exists(filepath)) + + def test_removing_already_deleted_file_with_retry(self): + """Test removing a meanwhile removed file with retry""" + filepath = os.path.join(self.tempdir, *stubs.files[1]) + + with wait_file_opened_in_thread(filepath, 0.2, True): + # on windows first attempt will fail, and before + # the retry the opened file will be deleted in the thread + mozfile.remove(filepath) + + # Check deletion was successful + self.assertFalse(os.path.exists(filepath)) + + def test_remove_readonly_tree(self): + """Test removing a read-only directory""" + + dirpath = os.path.join(self.tempdir, "nested_tree") + mark_readonly(dirpath) + + # However, mozfile should change write permissions and remove dir. + mozfile.remove(dirpath) + + self.assertFalse(os.path.exists(dirpath)) + + def test_remove_readonly_file(self): + """Test removing read-only files""" + filepath = os.path.join(self.tempdir, *stubs.files[1]) + mark_readonly(filepath) + + # However, mozfile should change write permission and then remove file. + mozfile.remove(filepath) + + self.assertFalse(os.path.exists(filepath)) + + @unittest.skipIf(mozinfo.isWin, "Symlinks are not supported on Windows") + def test_remove_symlink(self): + """Test removing a symlink""" + file_path = os.path.join(self.tempdir, *stubs.files[1]) + symlink_path = os.path.join(self.tempdir, 'symlink') + + os.symlink(file_path, symlink_path) + self.assertTrue(os.path.islink(symlink_path)) + + # The linked folder and files should not be deleted + mozfile.remove(symlink_path) + self.assertFalse(os.path.exists(symlink_path)) + self.assertTrue(os.path.exists(file_path)) + + @unittest.skipIf(mozinfo.isWin, "Symlinks are not supported on Windows") + def test_remove_symlink_in_subfolder(self): + """Test removing a folder with an contained symlink""" + file_path = os.path.join(self.tempdir, *stubs.files[0]) + dir_path = os.path.dirname(os.path.join(self.tempdir, *stubs.files[1])) + symlink_path = os.path.join(dir_path, 'symlink') + + os.symlink(file_path, symlink_path) + self.assertTrue(os.path.islink(symlink_path)) + + # The folder with the contained symlink will be deleted but not the + # original linked file + mozfile.remove(dir_path) + self.assertFalse(os.path.exists(dir_path)) + self.assertFalse(os.path.exists(symlink_path)) + self.assertTrue(os.path.exists(file_path)) + + @unittest.skipIf(mozinfo.isWin or not os.geteuid(), + "Symlinks are not supported on Windows and cannot run test as root") + def test_remove_symlink_for_system_path(self): + """Test removing a symlink which points to a system folder""" + symlink_path = os.path.join(self.tempdir, 'symlink') + + os.symlink(os.path.dirname(self.tempdir), symlink_path) + self.assertTrue(os.path.islink(symlink_path)) + + # The folder with the contained symlink will be deleted but not the + # original linked file + mozfile.remove(symlink_path) + self.assertFalse(os.path.exists(symlink_path)) + + def test_remove_path_that_does_not_exists(self): + not_existing_path = os.path.join(self.tempdir, 'I_do_not_not_exists') + try: + mozfile.remove(not_existing_path) + except OSError as exc: + if exc.errno == errno.ENOENT: + self.fail("removing non existing path must not raise error") + raise + + +class MozFileMoveTestCase(unittest.TestCase): + + def setUp(self): + # Generate a stub + self.tempdir = stubs.create_stub() + self.addCleanup(mozfile.rmtree, self.tempdir) + + def test_move_file(self): + file_path = os.path.join(self.tempdir, *stubs.files[1]) + moved_path = file_path + '.moved' + self.assertTrue(os.path.isfile(file_path)) + self.assertFalse(os.path.exists(moved_path)) + mozfile.move(file_path, moved_path) + self.assertFalse(os.path.exists(file_path)) + self.assertTrue(os.path.isfile(moved_path)) + + def test_move_file_with_retry(self): + file_path = os.path.join(self.tempdir, *stubs.files[1]) + moved_path = file_path + '.moved' + + with wait_file_opened_in_thread(file_path, 0.2): + # first move attempt should fail on windows and be retried + mozfile.move(file_path, moved_path) + self.assertFalse(os.path.exists(file_path)) + self.assertTrue(os.path.isfile(moved_path)) + + +if __name__ == '__main__': + unittest.main() |