1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
|
#!/usr/bin/env python
import os
import stat
import shutil
import threading
import time
import unittest
import errno
from contextlib import contextmanager
import mozfile
import mozinfo
import stubs
def mark_readonly(path):
"""Removes all write permissions from given file/directory.
:param path: path of directory/file of which modes must be changed
"""
mode = os.stat(path)[stat.ST_MODE]
os.chmod(path, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH)
class FileOpenCloseThread(threading.Thread):
"""Helper thread for asynchronous file handling"""
def __init__(self, path, delay, delete=False):
threading.Thread.__init__(self)
self.file_opened = threading.Event()
self.delay = delay
self.path = path
self.delete = delete
def run(self):
with open(self.path):
self.file_opened.set()
time.sleep(self.delay)
if self.delete:
try:
os.remove(self.path)
except:
pass
@contextmanager
def wait_file_opened_in_thread(*args, **kwargs):
thread = FileOpenCloseThread(*args, **kwargs)
thread.start()
thread.file_opened.wait()
try:
yield thread
finally:
thread.join()
class MozfileRemoveTestCase(unittest.TestCase):
"""Test our ability to remove directories and files"""
def setUp(self):
# Generate a stub
self.tempdir = stubs.create_stub()
def tearDown(self):
if os.path.isdir(self.tempdir):
shutil.rmtree(self.tempdir)
def test_remove_directory(self):
"""Test the removal of a directory"""
self.assertTrue(os.path.isdir(self.tempdir))
mozfile.remove(self.tempdir)
self.assertFalse(os.path.exists(self.tempdir))
def test_remove_directory_with_open_file(self):
"""Test removing a directory with an open file"""
# Open a file in the generated stub
filepath = os.path.join(self.tempdir, *stubs.files[1])
f = file(filepath, 'w')
f.write('foo-bar')
# keep file open and then try removing the dir-tree
if mozinfo.isWin:
# On the Windows family WindowsError should be raised.
self.assertRaises(OSError, mozfile.remove, self.tempdir)
self.assertTrue(os.path.exists(self.tempdir))
else:
# Folder should be deleted on all other platforms
mozfile.remove(self.tempdir)
self.assertFalse(os.path.exists(self.tempdir))
def test_remove_closed_file(self):
"""Test removing a closed file"""
# Open a file in the generated stub
filepath = os.path.join(self.tempdir, *stubs.files[1])
with open(filepath, 'w') as f:
f.write('foo-bar')
# Folder should be deleted on all platforms
mozfile.remove(self.tempdir)
self.assertFalse(os.path.exists(self.tempdir))
def test_removing_open_file_with_retry(self):
"""Test removing a file in use with retry"""
filepath = os.path.join(self.tempdir, *stubs.files[1])
with wait_file_opened_in_thread(filepath, 0.2):
# on windows first attempt will fail,
# and it will be retried until the thread leave the handle
mozfile.remove(filepath)
# Check deletion was successful
self.assertFalse(os.path.exists(filepath))
def test_removing_already_deleted_file_with_retry(self):
"""Test removing a meanwhile removed file with retry"""
filepath = os.path.join(self.tempdir, *stubs.files[1])
with wait_file_opened_in_thread(filepath, 0.2, True):
# on windows first attempt will fail, and before
# the retry the opened file will be deleted in the thread
mozfile.remove(filepath)
# Check deletion was successful
self.assertFalse(os.path.exists(filepath))
def test_remove_readonly_tree(self):
"""Test removing a read-only directory"""
dirpath = os.path.join(self.tempdir, "nested_tree")
mark_readonly(dirpath)
# However, mozfile should change write permissions and remove dir.
mozfile.remove(dirpath)
self.assertFalse(os.path.exists(dirpath))
def test_remove_readonly_file(self):
"""Test removing read-only files"""
filepath = os.path.join(self.tempdir, *stubs.files[1])
mark_readonly(filepath)
# However, mozfile should change write permission and then remove file.
mozfile.remove(filepath)
self.assertFalse(os.path.exists(filepath))
@unittest.skipIf(mozinfo.isWin, "Symlinks are not supported on Windows")
def test_remove_symlink(self):
"""Test removing a symlink"""
file_path = os.path.join(self.tempdir, *stubs.files[1])
symlink_path = os.path.join(self.tempdir, 'symlink')
os.symlink(file_path, symlink_path)
self.assertTrue(os.path.islink(symlink_path))
# The linked folder and files should not be deleted
mozfile.remove(symlink_path)
self.assertFalse(os.path.exists(symlink_path))
self.assertTrue(os.path.exists(file_path))
@unittest.skipIf(mozinfo.isWin, "Symlinks are not supported on Windows")
def test_remove_symlink_in_subfolder(self):
"""Test removing a folder with an contained symlink"""
file_path = os.path.join(self.tempdir, *stubs.files[0])
dir_path = os.path.dirname(os.path.join(self.tempdir, *stubs.files[1]))
symlink_path = os.path.join(dir_path, 'symlink')
os.symlink(file_path, symlink_path)
self.assertTrue(os.path.islink(symlink_path))
# The folder with the contained symlink will be deleted but not the
# original linked file
mozfile.remove(dir_path)
self.assertFalse(os.path.exists(dir_path))
self.assertFalse(os.path.exists(symlink_path))
self.assertTrue(os.path.exists(file_path))
@unittest.skipIf(mozinfo.isWin or not os.geteuid(),
"Symlinks are not supported on Windows and cannot run test as root")
def test_remove_symlink_for_system_path(self):
"""Test removing a symlink which points to a system folder"""
symlink_path = os.path.join(self.tempdir, 'symlink')
os.symlink(os.path.dirname(self.tempdir), symlink_path)
self.assertTrue(os.path.islink(symlink_path))
# The folder with the contained symlink will be deleted but not the
# original linked file
mozfile.remove(symlink_path)
self.assertFalse(os.path.exists(symlink_path))
def test_remove_path_that_does_not_exists(self):
not_existing_path = os.path.join(self.tempdir, 'I_do_not_not_exists')
try:
mozfile.remove(not_existing_path)
except OSError as exc:
if exc.errno == errno.ENOENT:
self.fail("removing non existing path must not raise error")
raise
class MozFileMoveTestCase(unittest.TestCase):
def setUp(self):
# Generate a stub
self.tempdir = stubs.create_stub()
self.addCleanup(mozfile.rmtree, self.tempdir)
def test_move_file(self):
file_path = os.path.join(self.tempdir, *stubs.files[1])
moved_path = file_path + '.moved'
self.assertTrue(os.path.isfile(file_path))
self.assertFalse(os.path.exists(moved_path))
mozfile.move(file_path, moved_path)
self.assertFalse(os.path.exists(file_path))
self.assertTrue(os.path.isfile(moved_path))
def test_move_file_with_retry(self):
file_path = os.path.join(self.tempdir, *stubs.files[1])
moved_path = file_path + '.moved'
with wait_file_opened_in_thread(file_path, 0.2):
# first move attempt should fail on windows and be retried
mozfile.move(file_path, moved_path)
self.assertFalse(os.path.exists(file_path))
self.assertTrue(os.path.isfile(moved_path))
if __name__ == '__main__':
unittest.main()
|