summaryrefslogtreecommitdiffstats
path: root/python/mozbuild/mozpack/test/test_copier.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/mozbuild/mozpack/test/test_copier.py')
-rw-r--r--python/mozbuild/mozpack/test/test_copier.py529
1 files changed, 529 insertions, 0 deletions
diff --git a/python/mozbuild/mozpack/test/test_copier.py b/python/mozbuild/mozpack/test/test_copier.py
new file mode 100644
index 000000000..6688b3d5e
--- /dev/null
+++ b/python/mozbuild/mozpack/test/test_copier.py
@@ -0,0 +1,529 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+from mozpack.copier import (
+ FileCopier,
+ FileRegistry,
+ FileRegistrySubtree,
+ Jarrer,
+)
+from mozpack.files import (
+ GeneratedFile,
+ ExistingFile,
+)
+from mozpack.mozjar import JarReader
+import mozpack.path as mozpath
+import unittest
+import mozunit
+import os
+import stat
+from mozpack.errors import ErrorMessage
+from mozpack.test.test_files import (
+ MockDest,
+ MatchTestTemplate,
+ TestWithTmpDir,
+)
+
+
+class BaseTestFileRegistry(MatchTestTemplate):
+ def add(self, path):
+ self.registry.add(path, GeneratedFile(path))
+
+ def do_check(self, pattern, result):
+ self.checked = True
+ if result:
+ self.assertTrue(self.registry.contains(pattern))
+ else:
+ self.assertFalse(self.registry.contains(pattern))
+ self.assertEqual(self.registry.match(pattern), result)
+
+ def do_test_file_registry(self, registry):
+ self.registry = registry
+ self.registry.add('foo', GeneratedFile('foo'))
+ bar = GeneratedFile('bar')
+ self.registry.add('bar', bar)
+ self.assertEqual(self.registry.paths(), ['foo', 'bar'])
+ self.assertEqual(self.registry['bar'], bar)
+
+ self.assertRaises(ErrorMessage, self.registry.add, 'foo',
+ GeneratedFile('foo2'))
+
+ self.assertRaises(ErrorMessage, self.registry.remove, 'qux')
+
+ self.assertRaises(ErrorMessage, self.registry.add, 'foo/bar',
+ GeneratedFile('foobar'))
+ self.assertRaises(ErrorMessage, self.registry.add, 'foo/bar/baz',
+ GeneratedFile('foobar'))
+
+ self.assertEqual(self.registry.paths(), ['foo', 'bar'])
+
+ self.registry.remove('foo')
+ self.assertEqual(self.registry.paths(), ['bar'])
+ self.registry.remove('bar')
+ self.assertEqual(self.registry.paths(), [])
+
+ self.prepare_match_test()
+ self.do_match_test()
+ self.assertTrue(self.checked)
+ self.assertEqual(self.registry.paths(), [
+ 'bar',
+ 'foo/bar',
+ 'foo/baz',
+ 'foo/qux/1',
+ 'foo/qux/bar',
+ 'foo/qux/2/test',
+ 'foo/qux/2/test2',
+ ])
+
+ self.registry.remove('foo/qux')
+ self.assertEqual(self.registry.paths(), ['bar', 'foo/bar', 'foo/baz'])
+
+ self.registry.add('foo/qux', GeneratedFile('fooqux'))
+ self.assertEqual(self.registry.paths(), ['bar', 'foo/bar', 'foo/baz',
+ 'foo/qux'])
+ self.registry.remove('foo/b*')
+ self.assertEqual(self.registry.paths(), ['bar', 'foo/qux'])
+
+ self.assertEqual([f for f, c in self.registry], ['bar', 'foo/qux'])
+ self.assertEqual(len(self.registry), 2)
+
+ self.add('foo/.foo')
+ self.assertTrue(self.registry.contains('foo/.foo'))
+
+ def do_test_registry_paths(self, registry):
+ self.registry = registry
+
+ # Can't add a file if it requires a directory in place of a
+ # file we also require.
+ self.registry.add('foo', GeneratedFile('foo'))
+ self.assertRaises(ErrorMessage, self.registry.add, 'foo/bar',
+ GeneratedFile('foobar'))
+
+ # Can't add a file if we already have a directory there.
+ self.registry.add('bar/baz', GeneratedFile('barbaz'))
+ self.assertRaises(ErrorMessage, self.registry.add, 'bar',
+ GeneratedFile('bar'))
+
+ # Bump the count of things that require bar/ to 2.
+ self.registry.add('bar/zot', GeneratedFile('barzot'))
+ self.assertRaises(ErrorMessage, self.registry.add, 'bar',
+ GeneratedFile('bar'))
+
+ # Drop the count of things that require bar/ to 1.
+ self.registry.remove('bar/baz')
+ self.assertRaises(ErrorMessage, self.registry.add, 'bar',
+ GeneratedFile('bar'))
+
+ # Drop the count of things that require bar/ to 0.
+ self.registry.remove('bar/zot')
+ self.registry.add('bar/zot', GeneratedFile('barzot'))
+
+class TestFileRegistry(BaseTestFileRegistry, unittest.TestCase):
+ def test_partial_paths(self):
+ cases = {
+ 'foo/bar/baz/zot': ['foo/bar/baz', 'foo/bar', 'foo'],
+ 'foo/bar': ['foo'],
+ 'bar': [],
+ }
+ reg = FileRegistry()
+ for path, parts in cases.iteritems():
+ self.assertEqual(reg._partial_paths(path), parts)
+
+ def test_file_registry(self):
+ self.do_test_file_registry(FileRegistry())
+
+ def test_registry_paths(self):
+ self.do_test_registry_paths(FileRegistry())
+
+ def test_required_directories(self):
+ self.registry = FileRegistry()
+
+ self.registry.add('foo', GeneratedFile('foo'))
+ self.assertEqual(self.registry.required_directories(), set())
+
+ self.registry.add('bar/baz', GeneratedFile('barbaz'))
+ self.assertEqual(self.registry.required_directories(), {'bar'})
+
+ self.registry.add('bar/zot', GeneratedFile('barzot'))
+ self.assertEqual(self.registry.required_directories(), {'bar'})
+
+ self.registry.add('bar/zap/zot', GeneratedFile('barzapzot'))
+ self.assertEqual(self.registry.required_directories(), {'bar', 'bar/zap'})
+
+ self.registry.remove('bar/zap/zot')
+ self.assertEqual(self.registry.required_directories(), {'bar'})
+
+ self.registry.remove('bar/baz')
+ self.assertEqual(self.registry.required_directories(), {'bar'})
+
+ self.registry.remove('bar/zot')
+ self.assertEqual(self.registry.required_directories(), set())
+
+ self.registry.add('x/y/z', GeneratedFile('xyz'))
+ self.assertEqual(self.registry.required_directories(), {'x', 'x/y'})
+
+
+class TestFileRegistrySubtree(BaseTestFileRegistry, unittest.TestCase):
+ def test_file_registry_subtree_base(self):
+ registry = FileRegistry()
+ self.assertEqual(registry, FileRegistrySubtree('', registry))
+ self.assertNotEqual(registry, FileRegistrySubtree('base', registry))
+
+ def create_registry(self):
+ registry = FileRegistry()
+ registry.add('foo/bar', GeneratedFile('foo/bar'))
+ registry.add('baz/qux', GeneratedFile('baz/qux'))
+ return FileRegistrySubtree('base/root', registry)
+
+ def test_file_registry_subtree(self):
+ self.do_test_file_registry(self.create_registry())
+
+ def test_registry_paths_subtree(self):
+ registry = FileRegistry()
+ self.do_test_registry_paths(self.create_registry())
+
+
+class TestFileCopier(TestWithTmpDir):
+ def all_dirs(self, base):
+ all_dirs = set()
+ for root, dirs, files in os.walk(base):
+ if not dirs:
+ all_dirs.add(mozpath.relpath(root, base))
+ return all_dirs
+
+ def all_files(self, base):
+ all_files = set()
+ for root, dirs, files in os.walk(base):
+ for f in files:
+ all_files.add(
+ mozpath.join(mozpath.relpath(root, base), f))
+ return all_files
+
+ def test_file_copier(self):
+ copier = FileCopier()
+ copier.add('foo/bar', GeneratedFile('foobar'))
+ copier.add('foo/qux', GeneratedFile('fooqux'))
+ copier.add('foo/deep/nested/directory/file', GeneratedFile('fooz'))
+ copier.add('bar', GeneratedFile('bar'))
+ copier.add('qux/foo', GeneratedFile('quxfoo'))
+ copier.add('qux/bar', GeneratedFile(''))
+
+ result = copier.copy(self.tmpdir)
+ self.assertEqual(self.all_files(self.tmpdir), set(copier.paths()))
+ self.assertEqual(self.all_dirs(self.tmpdir),
+ set(['foo/deep/nested/directory', 'qux']))
+
+ self.assertEqual(result.updated_files, set(self.tmppath(p) for p in
+ self.all_files(self.tmpdir)))
+ self.assertEqual(result.existing_files, set())
+ self.assertEqual(result.removed_files, set())
+ self.assertEqual(result.removed_directories, set())
+
+ copier.remove('foo')
+ copier.add('test', GeneratedFile('test'))
+ result = copier.copy(self.tmpdir)
+ self.assertEqual(self.all_files(self.tmpdir), set(copier.paths()))
+ self.assertEqual(self.all_dirs(self.tmpdir), set(['qux']))
+ self.assertEqual(result.removed_files, set(self.tmppath(p) for p in
+ ('foo/bar', 'foo/qux', 'foo/deep/nested/directory/file')))
+
+ def test_symlink_directory_replaced(self):
+ """Directory symlinks in destination are replaced if they need to be
+ real directories."""
+ if not self.symlink_supported:
+ return
+
+ dest = self.tmppath('dest')
+
+ copier = FileCopier()
+ copier.add('foo/bar/baz', GeneratedFile('foobarbaz'))
+
+ os.makedirs(self.tmppath('dest/foo'))
+ dummy = self.tmppath('dummy')
+ os.mkdir(dummy)
+ link = self.tmppath('dest/foo/bar')
+ os.symlink(dummy, link)
+
+ result = copier.copy(dest)
+
+ st = os.lstat(link)
+ self.assertFalse(stat.S_ISLNK(st.st_mode))
+ self.assertTrue(stat.S_ISDIR(st.st_mode))
+
+ self.assertEqual(self.all_files(dest), set(copier.paths()))
+
+ self.assertEqual(result.removed_directories, set())
+ self.assertEqual(len(result.updated_files), 1)
+
+ def test_remove_unaccounted_directory_symlinks(self):
+ """Directory symlinks in destination that are not in the way are
+ deleted according to remove_unaccounted and
+ remove_all_directory_symlinks.
+ """
+ if not self.symlink_supported:
+ return
+
+ dest = self.tmppath('dest')
+
+ copier = FileCopier()
+ copier.add('foo/bar/baz', GeneratedFile('foobarbaz'))
+
+ os.makedirs(self.tmppath('dest/foo'))
+ dummy = self.tmppath('dummy')
+ os.mkdir(dummy)
+
+ os.mkdir(self.tmppath('dest/zot'))
+ link = self.tmppath('dest/zot/zap')
+ os.symlink(dummy, link)
+
+ # If not remove_unaccounted but remove_empty_directories, then
+ # the symlinked directory remains (as does its containing
+ # directory).
+ result = copier.copy(dest, remove_unaccounted=False,
+ remove_empty_directories=True,
+ remove_all_directory_symlinks=False)
+
+ st = os.lstat(link)
+ self.assertTrue(stat.S_ISLNK(st.st_mode))
+ self.assertFalse(stat.S_ISDIR(st.st_mode))
+
+ self.assertEqual(self.all_files(dest), set(copier.paths()))
+ self.assertEqual(self.all_dirs(dest), set(['foo/bar']))
+
+ self.assertEqual(result.removed_directories, set())
+ self.assertEqual(len(result.updated_files), 1)
+
+ # If remove_unaccounted but not remove_empty_directories, then
+ # only the symlinked directory is removed.
+ result = copier.copy(dest, remove_unaccounted=True,
+ remove_empty_directories=False,
+ remove_all_directory_symlinks=False)
+
+ st = os.lstat(self.tmppath('dest/zot'))
+ self.assertFalse(stat.S_ISLNK(st.st_mode))
+ self.assertTrue(stat.S_ISDIR(st.st_mode))
+
+ self.assertEqual(result.removed_files, set([link]))
+ self.assertEqual(result.removed_directories, set())
+
+ self.assertEqual(self.all_files(dest), set(copier.paths()))
+ self.assertEqual(self.all_dirs(dest), set(['foo/bar', 'zot']))
+
+ # If remove_unaccounted and remove_empty_directories, then
+ # both the symlink and its containing directory are removed.
+ link = self.tmppath('dest/zot/zap')
+ os.symlink(dummy, link)
+
+ result = copier.copy(dest, remove_unaccounted=True,
+ remove_empty_directories=True,
+ remove_all_directory_symlinks=False)
+
+ self.assertEqual(result.removed_files, set([link]))
+ self.assertEqual(result.removed_directories, set([self.tmppath('dest/zot')]))
+
+ self.assertEqual(self.all_files(dest), set(copier.paths()))
+ self.assertEqual(self.all_dirs(dest), set(['foo/bar']))
+
+ def test_permissions(self):
+ """Ensure files without write permission can be deleted."""
+ with open(self.tmppath('dummy'), 'a'):
+ pass
+
+ p = self.tmppath('no_perms')
+ with open(p, 'a'):
+ pass
+
+ # Make file and directory unwritable. Reminder: making a directory
+ # unwritable prevents modifications (including deletes) from the list
+ # of files in that directory.
+ os.chmod(p, 0o400)
+ os.chmod(self.tmpdir, 0o400)
+
+ copier = FileCopier()
+ copier.add('dummy', GeneratedFile('content'))
+ result = copier.copy(self.tmpdir)
+ self.assertEqual(result.removed_files_count, 1)
+ self.assertFalse(os.path.exists(p))
+
+ def test_no_remove(self):
+ copier = FileCopier()
+ copier.add('foo', GeneratedFile('foo'))
+
+ with open(self.tmppath('bar'), 'a'):
+ pass
+
+ os.mkdir(self.tmppath('emptydir'))
+ d = self.tmppath('populateddir')
+ os.mkdir(d)
+
+ with open(self.tmppath('populateddir/foo'), 'a'):
+ pass
+
+ result = copier.copy(self.tmpdir, remove_unaccounted=False)
+
+ self.assertEqual(self.all_files(self.tmpdir), set(['foo', 'bar',
+ 'populateddir/foo']))
+ self.assertEqual(self.all_dirs(self.tmpdir), set(['populateddir']))
+ self.assertEqual(result.removed_files, set())
+ self.assertEqual(result.removed_directories,
+ set([self.tmppath('emptydir')]))
+
+ def test_no_remove_empty_directories(self):
+ copier = FileCopier()
+ copier.add('foo', GeneratedFile('foo'))
+
+ with open(self.tmppath('bar'), 'a'):
+ pass
+
+ os.mkdir(self.tmppath('emptydir'))
+ d = self.tmppath('populateddir')
+ os.mkdir(d)
+
+ with open(self.tmppath('populateddir/foo'), 'a'):
+ pass
+
+ result = copier.copy(self.tmpdir, remove_unaccounted=False,
+ remove_empty_directories=False)
+
+ self.assertEqual(self.all_files(self.tmpdir), set(['foo', 'bar',
+ 'populateddir/foo']))
+ self.assertEqual(self.all_dirs(self.tmpdir), set(['emptydir',
+ 'populateddir']))
+ self.assertEqual(result.removed_files, set())
+ self.assertEqual(result.removed_directories, set())
+
+ def test_optional_exists_creates_unneeded_directory(self):
+ """Demonstrate that a directory not strictly required, but specified
+ as the path to an optional file, will be unnecessarily created.
+
+ This behaviour is wrong; fixing it is tracked by Bug 972432;
+ and this test exists to guard against unexpected changes in
+ behaviour.
+ """
+
+ dest = self.tmppath('dest')
+
+ copier = FileCopier()
+ copier.add('foo/bar', ExistingFile(required=False))
+
+ result = copier.copy(dest)
+
+ st = os.lstat(self.tmppath('dest/foo'))
+ self.assertFalse(stat.S_ISLNK(st.st_mode))
+ self.assertTrue(stat.S_ISDIR(st.st_mode))
+
+ # What's worse, we have no record that dest was created.
+ self.assertEquals(len(result.updated_files), 0)
+
+ # But we do have an erroneous record of an optional file
+ # existing when it does not.
+ self.assertIn(self.tmppath('dest/foo/bar'), result.existing_files)
+
+ def test_remove_unaccounted_file_registry(self):
+ """Test FileCopier.copy(remove_unaccounted=FileRegistry())"""
+
+ dest = self.tmppath('dest')
+
+ copier = FileCopier()
+ copier.add('foo/bar/baz', GeneratedFile('foobarbaz'))
+ copier.add('foo/bar/qux', GeneratedFile('foobarqux'))
+ copier.add('foo/hoge/fuga', GeneratedFile('foohogefuga'))
+ copier.add('foo/toto/tata', GeneratedFile('footototata'))
+
+ os.makedirs(os.path.join(dest, 'bar'))
+ with open(os.path.join(dest, 'bar', 'bar'), 'w') as fh:
+ fh.write('barbar');
+ os.makedirs(os.path.join(dest, 'foo', 'toto'))
+ with open(os.path.join(dest, 'foo', 'toto', 'toto'), 'w') as fh:
+ fh.write('foototototo');
+
+ result = copier.copy(dest, remove_unaccounted=False)
+
+ self.assertEqual(self.all_files(dest),
+ set(copier.paths()) | { 'foo/toto/toto', 'bar/bar'})
+ self.assertEqual(self.all_dirs(dest),
+ {'foo/bar', 'foo/hoge', 'foo/toto', 'bar'})
+
+ copier2 = FileCopier()
+ copier2.add('foo/hoge/fuga', GeneratedFile('foohogefuga'))
+
+ # We expect only files copied from the first copier to be removed,
+ # not the extra file that was there beforehand.
+ result = copier2.copy(dest, remove_unaccounted=copier)
+
+ self.assertEqual(self.all_files(dest),
+ set(copier2.paths()) | { 'foo/toto/toto', 'bar/bar'})
+ self.assertEqual(self.all_dirs(dest),
+ {'foo/hoge', 'foo/toto', 'bar'})
+ self.assertEqual(result.updated_files,
+ {self.tmppath('dest/foo/hoge/fuga')})
+ self.assertEqual(result.existing_files, set())
+ self.assertEqual(result.removed_files, {self.tmppath(p) for p in
+ ('dest/foo/bar/baz', 'dest/foo/bar/qux', 'dest/foo/toto/tata')})
+ self.assertEqual(result.removed_directories,
+ {self.tmppath('dest/foo/bar')})
+
+
+class TestJarrer(unittest.TestCase):
+ def check_jar(self, dest, copier):
+ jar = JarReader(fileobj=dest)
+ self.assertEqual([f.filename for f in jar], copier.paths())
+ for f in jar:
+ self.assertEqual(f.uncompressed_data.read(),
+ copier[f.filename].content)
+
+ def test_jarrer(self):
+ copier = Jarrer()
+ copier.add('foo/bar', GeneratedFile('foobar'))
+ copier.add('foo/qux', GeneratedFile('fooqux'))
+ copier.add('foo/deep/nested/directory/file', GeneratedFile('fooz'))
+ copier.add('bar', GeneratedFile('bar'))
+ copier.add('qux/foo', GeneratedFile('quxfoo'))
+ copier.add('qux/bar', GeneratedFile(''))
+
+ dest = MockDest()
+ copier.copy(dest)
+ self.check_jar(dest, copier)
+
+ copier.remove('foo')
+ copier.add('test', GeneratedFile('test'))
+ copier.copy(dest)
+ self.check_jar(dest, copier)
+
+ copier.remove('test')
+ copier.add('test', GeneratedFile('replaced-content'))
+ copier.copy(dest)
+ self.check_jar(dest, copier)
+
+ copier.copy(dest)
+ self.check_jar(dest, copier)
+
+ preloaded = ['qux/bar', 'bar']
+ copier.preload(preloaded)
+ copier.copy(dest)
+
+ dest.seek(0)
+ jar = JarReader(fileobj=dest)
+ self.assertEqual([f.filename for f in jar], preloaded +
+ [p for p in copier.paths() if not p in preloaded])
+ self.assertEqual(jar.last_preloaded, preloaded[-1])
+
+
+ def test_jarrer_compress(self):
+ copier = Jarrer()
+ copier.add('foo/bar', GeneratedFile('ffffff'))
+ copier.add('foo/qux', GeneratedFile('ffffff'), compress=False)
+
+ dest = MockDest()
+ copier.copy(dest)
+ self.check_jar(dest, copier)
+
+ dest.seek(0)
+ jar = JarReader(fileobj=dest)
+ self.assertTrue(jar['foo/bar'].compressed)
+ self.assertFalse(jar['foo/qux'].compressed)
+
+
+if __name__ == '__main__':
+ mozunit.main()