#!/usr/bin/env python

import os
import stat
import shutil
import threading
import time
import unittest
import errno
from contextlib import contextmanager

import mozfile
import mozinfo

import stubs


def mark_readonly(path):
    """Removes all write permissions from given file/directory.

    :param path: path of directory/file of which modes must be changed
    """
    mode = os.stat(path)[stat.ST_MODE]
    os.chmod(path, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH)


class FileOpenCloseThread(threading.Thread):
    """Helper thread for asynchronous file handling"""

    def __init__(self, path, delay, delete=False):
        threading.Thread.__init__(self)
        self.file_opened = threading.Event()
        self.delay = delay
        self.path = path
        self.delete = delete

    def run(self):
        with open(self.path):
            self.file_opened.set()
            time.sleep(self.delay)
        if self.delete:
            try:
                os.remove(self.path)
            except:
                pass


@contextmanager
def wait_file_opened_in_thread(*args, **kwargs):
    thread = FileOpenCloseThread(*args, **kwargs)
    thread.start()
    thread.file_opened.wait()
    try:
        yield thread
    finally:
        thread.join()


class MozfileRemoveTestCase(unittest.TestCase):
    """Test our ability to remove directories and files"""

    def setUp(self):
        # Generate a stub
        self.tempdir = stubs.create_stub()

    def tearDown(self):
        if os.path.isdir(self.tempdir):
            shutil.rmtree(self.tempdir)

    def test_remove_directory(self):
        """Test the removal of a directory"""
        self.assertTrue(os.path.isdir(self.tempdir))
        mozfile.remove(self.tempdir)
        self.assertFalse(os.path.exists(self.tempdir))

    def test_remove_directory_with_open_file(self):
        """Test removing a directory with an open file"""
        # Open a file in the generated stub
        filepath = os.path.join(self.tempdir, *stubs.files[1])
        f = file(filepath, 'w')
        f.write('foo-bar')

        # keep file open and then try removing the dir-tree
        if mozinfo.isWin:
            # On the Windows family WindowsError should be raised.
            self.assertRaises(OSError, mozfile.remove, self.tempdir)
            self.assertTrue(os.path.exists(self.tempdir))
        else:
            # Folder should be deleted on all other platforms
            mozfile.remove(self.tempdir)
            self.assertFalse(os.path.exists(self.tempdir))

    def test_remove_closed_file(self):
        """Test removing a closed file"""
        # Open a file in the generated stub
        filepath = os.path.join(self.tempdir, *stubs.files[1])
        with open(filepath, 'w') as f:
            f.write('foo-bar')

        # Folder should be deleted on all platforms
        mozfile.remove(self.tempdir)
        self.assertFalse(os.path.exists(self.tempdir))

    def test_removing_open_file_with_retry(self):
        """Test removing a file in use with retry"""
        filepath = os.path.join(self.tempdir, *stubs.files[1])

        with wait_file_opened_in_thread(filepath, 0.2):
            # on windows first attempt will fail,
            # and it will be retried until the thread leave the handle
            mozfile.remove(filepath)

        # Check deletion was successful
        self.assertFalse(os.path.exists(filepath))

    def test_removing_already_deleted_file_with_retry(self):
        """Test removing a meanwhile removed file with retry"""
        filepath = os.path.join(self.tempdir, *stubs.files[1])

        with wait_file_opened_in_thread(filepath, 0.2, True):
            # on windows first attempt will fail, and before
            # the retry the opened file will be deleted in the thread
            mozfile.remove(filepath)

        # Check deletion was successful
        self.assertFalse(os.path.exists(filepath))

    def test_remove_readonly_tree(self):
        """Test removing a read-only directory"""

        dirpath = os.path.join(self.tempdir, "nested_tree")
        mark_readonly(dirpath)

        # However, mozfile should change write permissions and remove dir.
        mozfile.remove(dirpath)

        self.assertFalse(os.path.exists(dirpath))

    def test_remove_readonly_file(self):
        """Test removing read-only files"""
        filepath = os.path.join(self.tempdir, *stubs.files[1])
        mark_readonly(filepath)

        # However, mozfile should change write permission and then remove file.
        mozfile.remove(filepath)

        self.assertFalse(os.path.exists(filepath))

    @unittest.skipIf(mozinfo.isWin, "Symlinks are not supported on Windows")
    def test_remove_symlink(self):
        """Test removing a symlink"""
        file_path = os.path.join(self.tempdir, *stubs.files[1])
        symlink_path = os.path.join(self.tempdir, 'symlink')

        os.symlink(file_path, symlink_path)
        self.assertTrue(os.path.islink(symlink_path))

        # The linked folder and files should not be deleted
        mozfile.remove(symlink_path)
        self.assertFalse(os.path.exists(symlink_path))
        self.assertTrue(os.path.exists(file_path))

    @unittest.skipIf(mozinfo.isWin, "Symlinks are not supported on Windows")
    def test_remove_symlink_in_subfolder(self):
        """Test removing a folder with an contained symlink"""
        file_path = os.path.join(self.tempdir, *stubs.files[0])
        dir_path = os.path.dirname(os.path.join(self.tempdir, *stubs.files[1]))
        symlink_path = os.path.join(dir_path, 'symlink')

        os.symlink(file_path, symlink_path)
        self.assertTrue(os.path.islink(symlink_path))

        # The folder with the contained symlink will be deleted but not the
        # original linked file
        mozfile.remove(dir_path)
        self.assertFalse(os.path.exists(dir_path))
        self.assertFalse(os.path.exists(symlink_path))
        self.assertTrue(os.path.exists(file_path))

    @unittest.skipIf(mozinfo.isWin or not os.geteuid(),
                     "Symlinks are not supported on Windows and cannot run test as root")
    def test_remove_symlink_for_system_path(self):
        """Test removing a symlink which points to a system folder"""
        symlink_path = os.path.join(self.tempdir, 'symlink')

        os.symlink(os.path.dirname(self.tempdir), symlink_path)
        self.assertTrue(os.path.islink(symlink_path))

        # The folder with the contained symlink will be deleted but not the
        # original linked file
        mozfile.remove(symlink_path)
        self.assertFalse(os.path.exists(symlink_path))

    def test_remove_path_that_does_not_exists(self):
        not_existing_path = os.path.join(self.tempdir, 'I_do_not_not_exists')
        try:
            mozfile.remove(not_existing_path)
        except OSError as exc:
            if exc.errno == errno.ENOENT:
                self.fail("removing non existing path must not raise error")
            raise


class MozFileMoveTestCase(unittest.TestCase):

    def setUp(self):
        # Generate a stub
        self.tempdir = stubs.create_stub()
        self.addCleanup(mozfile.rmtree, self.tempdir)

    def test_move_file(self):
        file_path = os.path.join(self.tempdir, *stubs.files[1])
        moved_path = file_path + '.moved'
        self.assertTrue(os.path.isfile(file_path))
        self.assertFalse(os.path.exists(moved_path))
        mozfile.move(file_path, moved_path)
        self.assertFalse(os.path.exists(file_path))
        self.assertTrue(os.path.isfile(moved_path))

    def test_move_file_with_retry(self):
        file_path = os.path.join(self.tempdir, *stubs.files[1])
        moved_path = file_path + '.moved'

        with wait_file_opened_in_thread(file_path, 0.2):
            # first move attempt should fail on windows and be retried
            mozfile.move(file_path, moved_path)
            self.assertFalse(os.path.exists(file_path))
            self.assertTrue(os.path.isfile(moved_path))


if __name__ == '__main__':
    unittest.main()