diff options
Diffstat (limited to 'python/mozbuild/mozpack/test/test_files.py')
-rw-r--r-- | python/mozbuild/mozpack/test/test_files.py | 1160 |
1 files changed, 1160 insertions, 0 deletions
diff --git a/python/mozbuild/mozpack/test/test_files.py b/python/mozbuild/mozpack/test/test_files.py new file mode 100644 index 000000000..6fd617828 --- /dev/null +++ b/python/mozbuild/mozpack/test/test_files.py @@ -0,0 +1,1160 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. + +from mozbuild.util import ensureParentDir + +from mozpack.errors import ( + ErrorMessage, + errors, +) +from mozpack.files import ( + AbsoluteSymlinkFile, + ComposedFinder, + DeflatedFile, + Dest, + ExistingFile, + ExtractedTarFile, + FileFinder, + File, + GeneratedFile, + JarFinder, + TarFinder, + ManifestFile, + MercurialFile, + MercurialRevisionFinder, + MinifiedJavaScript, + MinifiedProperties, + PreprocessedFile, + XPTFile, +) + +# We don't have hglib installed everywhere. +try: + import hglib +except ImportError: + hglib = None + +try: + from mozpack.hg import MercurialNativeRevisionFinder +except ImportError: + MercurialNativeRevisionFinder = None + +from mozpack.mozjar import ( + JarReader, + JarWriter, +) +from mozpack.chrome.manifest import ( + ManifestContent, + ManifestResource, + ManifestLocale, + ManifestOverride, +) +import unittest +import mozfile +import mozunit +import os +import random +import string +import sys +import tarfile +import mozpack.path as mozpath +from tempfile import mkdtemp +from io import BytesIO +from StringIO import StringIO +from xpt import Typelib + + +class TestWithTmpDir(unittest.TestCase): + def setUp(self): + self.tmpdir = mkdtemp() + + self.symlink_supported = False + + if not hasattr(os, 'symlink'): + return + + dummy_path = self.tmppath('dummy_file') + with open(dummy_path, 'a'): + pass + + try: + os.symlink(dummy_path, self.tmppath('dummy_symlink')) + os.remove(self.tmppath('dummy_symlink')) + except EnvironmentError: + pass + finally: + os.remove(dummy_path) + + self.symlink_supported = True + + + def tearDown(self): + mozfile.rmtree(self.tmpdir) + + def tmppath(self, relpath): + return os.path.normpath(os.path.join(self.tmpdir, relpath)) + + +class MockDest(BytesIO, Dest): + def __init__(self): + BytesIO.__init__(self) + self.mode = None + + def read(self, length=-1): + if self.mode != 'r': + self.seek(0) + self.mode = 'r' + return BytesIO.read(self, length) + + def write(self, data): + if self.mode != 'w': + self.seek(0) + self.truncate(0) + self.mode = 'w' + return BytesIO.write(self, data) + + def exists(self): + return True + + def close(self): + if self.mode: + self.mode = None + + +class DestNoWrite(Dest): + def write(self, data): + raise RuntimeError + + +class TestDest(TestWithTmpDir): + def test_dest(self): + dest = Dest(self.tmppath('dest')) + self.assertFalse(dest.exists()) + dest.write('foo') + self.assertTrue(dest.exists()) + dest.write('foo') + self.assertEqual(dest.read(4), 'foof') + self.assertEqual(dest.read(), 'oo') + self.assertEqual(dest.read(), '') + dest.write('bar') + self.assertEqual(dest.read(4), 'bar') + dest.close() + self.assertEqual(dest.read(), 'bar') + dest.write('foo') + dest.close() + dest.write('qux') + self.assertEqual(dest.read(), 'qux') + +rand = ''.join(random.choice(string.letters) for i in xrange(131597)) +samples = [ + '', + 'test', + 'fooo', + 'same', + 'same', + 'Different and longer', + rand, + rand, + rand[:-1] + '_', + 'test' +] + + +class TestFile(TestWithTmpDir): + def test_file(self): + ''' + Check that File.copy yields the proper content in the destination file + in all situations that trigger different code paths: + - different content + - different content of the same size + - same content + - long content + ''' + src = self.tmppath('src') + dest = self.tmppath('dest') + + for content in samples: + with open(src, 'wb') as tmp: + tmp.write(content) + # Ensure the destination file, when it exists, is older than the + # source + if os.path.exists(dest): + time = os.path.getmtime(src) - 1 + os.utime(dest, (time, time)) + f = File(src) + f.copy(dest) + self.assertEqual(content, open(dest, 'rb').read()) + self.assertEqual(content, f.open().read()) + self.assertEqual(content, f.open().read()) + + def test_file_dest(self): + ''' + Similar to test_file, but for a destination object instead of + a destination file. This ensures the destination object is being + used properly by File.copy, ensuring that other subclasses of Dest + will work. + ''' + src = self.tmppath('src') + dest = MockDest() + + for content in samples: + with open(src, 'wb') as tmp: + tmp.write(content) + f = File(src) + f.copy(dest) + self.assertEqual(content, dest.getvalue()) + + def test_file_open(self): + ''' + Test whether File.open returns an appropriately reset file object. + ''' + src = self.tmppath('src') + content = ''.join(samples) + with open(src, 'wb') as tmp: + tmp.write(content) + + f = File(src) + self.assertEqual(content[:42], f.open().read(42)) + self.assertEqual(content, f.open().read()) + + def test_file_no_write(self): + ''' + Test various conditions where File.copy is expected not to write + in the destination file. + ''' + src = self.tmppath('src') + dest = self.tmppath('dest') + + with open(src, 'wb') as tmp: + tmp.write('test') + + # Initial copy + f = File(src) + f.copy(dest) + + # Ensure subsequent copies won't trigger writes + f.copy(DestNoWrite(dest)) + self.assertEqual('test', open(dest, 'rb').read()) + + # When the source file is newer, but with the same content, no copy + # should occur + time = os.path.getmtime(src) - 1 + os.utime(dest, (time, time)) + f.copy(DestNoWrite(dest)) + self.assertEqual('test', open(dest, 'rb').read()) + + # When the source file is older than the destination file, even with + # different content, no copy should occur. + with open(src, 'wb') as tmp: + tmp.write('fooo') + time = os.path.getmtime(dest) - 1 + os.utime(src, (time, time)) + f.copy(DestNoWrite(dest)) + self.assertEqual('test', open(dest, 'rb').read()) + + # Double check that under conditions where a copy occurs, we would get + # an exception. + time = os.path.getmtime(src) - 1 + os.utime(dest, (time, time)) + self.assertRaises(RuntimeError, f.copy, DestNoWrite(dest)) + + # skip_if_older=False is expected to force a copy in this situation. + f.copy(dest, skip_if_older=False) + self.assertEqual('fooo', open(dest, 'rb').read()) + + +class TestAbsoluteSymlinkFile(TestWithTmpDir): + def test_absolute_relative(self): + AbsoluteSymlinkFile('/foo') + + with self.assertRaisesRegexp(ValueError, 'Symlink target not absolute'): + AbsoluteSymlinkFile('./foo') + + def test_symlink_file(self): + source = self.tmppath('test_path') + with open(source, 'wt') as fh: + fh.write('Hello world') + + s = AbsoluteSymlinkFile(source) + dest = self.tmppath('symlink') + self.assertTrue(s.copy(dest)) + + if self.symlink_supported: + self.assertTrue(os.path.islink(dest)) + link = os.readlink(dest) + self.assertEqual(link, source) + else: + self.assertTrue(os.path.isfile(dest)) + content = open(dest).read() + self.assertEqual(content, 'Hello world') + + def test_replace_file_with_symlink(self): + # If symlinks are supported, an existing file should be replaced by a + # symlink. + source = self.tmppath('test_path') + with open(source, 'wt') as fh: + fh.write('source') + + dest = self.tmppath('dest') + with open(dest, 'a'): + pass + + s = AbsoluteSymlinkFile(source) + s.copy(dest, skip_if_older=False) + + if self.symlink_supported: + self.assertTrue(os.path.islink(dest)) + link = os.readlink(dest) + self.assertEqual(link, source) + else: + self.assertTrue(os.path.isfile(dest)) + content = open(dest).read() + self.assertEqual(content, 'source') + + def test_replace_symlink(self): + if not self.symlink_supported: + return + + source = self.tmppath('source') + with open(source, 'a'): + pass + + dest = self.tmppath('dest') + + os.symlink(self.tmppath('bad'), dest) + self.assertTrue(os.path.islink(dest)) + + s = AbsoluteSymlinkFile(source) + self.assertTrue(s.copy(dest)) + + self.assertTrue(os.path.islink(dest)) + link = os.readlink(dest) + self.assertEqual(link, source) + + def test_noop(self): + if not hasattr(os, 'symlink'): + return + + source = self.tmppath('source') + dest = self.tmppath('dest') + + with open(source, 'a'): + pass + + os.symlink(source, dest) + link = os.readlink(dest) + self.assertEqual(link, source) + + s = AbsoluteSymlinkFile(source) + self.assertFalse(s.copy(dest)) + + link = os.readlink(dest) + self.assertEqual(link, source) + +class TestPreprocessedFile(TestWithTmpDir): + def test_preprocess(self): + ''' + Test that copying the file invokes the preprocessor + ''' + src = self.tmppath('src') + dest = self.tmppath('dest') + + with open(src, 'wb') as tmp: + tmp.write('#ifdef FOO\ntest\n#endif') + + f = PreprocessedFile(src, depfile_path=None, marker='#', defines={'FOO': True}) + self.assertTrue(f.copy(dest)) + + self.assertEqual('test\n', open(dest, 'rb').read()) + + def test_preprocess_file_no_write(self): + ''' + Test various conditions where PreprocessedFile.copy is expected not to + write in the destination file. + ''' + src = self.tmppath('src') + dest = self.tmppath('dest') + depfile = self.tmppath('depfile') + + with open(src, 'wb') as tmp: + tmp.write('#ifdef FOO\ntest\n#endif') + + # Initial copy + f = PreprocessedFile(src, depfile_path=depfile, marker='#', defines={'FOO': True}) + self.assertTrue(f.copy(dest)) + + # Ensure subsequent copies won't trigger writes + self.assertFalse(f.copy(DestNoWrite(dest))) + self.assertEqual('test\n', open(dest, 'rb').read()) + + # When the source file is older than the destination file, even with + # different content, no copy should occur. + with open(src, 'wb') as tmp: + tmp.write('#ifdef FOO\nfooo\n#endif') + time = os.path.getmtime(dest) - 1 + os.utime(src, (time, time)) + self.assertFalse(f.copy(DestNoWrite(dest))) + self.assertEqual('test\n', open(dest, 'rb').read()) + + # skip_if_older=False is expected to force a copy in this situation. + self.assertTrue(f.copy(dest, skip_if_older=False)) + self.assertEqual('fooo\n', open(dest, 'rb').read()) + + def test_preprocess_file_dependencies(self): + ''' + Test that the preprocess runs if the dependencies of the source change + ''' + src = self.tmppath('src') + dest = self.tmppath('dest') + incl = self.tmppath('incl') + deps = self.tmppath('src.pp') + + with open(src, 'wb') as tmp: + tmp.write('#ifdef FOO\ntest\n#endif') + + with open(incl, 'wb') as tmp: + tmp.write('foo bar') + + # Initial copy + f = PreprocessedFile(src, depfile_path=deps, marker='#', defines={'FOO': True}) + self.assertTrue(f.copy(dest)) + + # Update the source so it #includes the include file. + with open(src, 'wb') as tmp: + tmp.write('#include incl\n') + time = os.path.getmtime(dest) + 1 + os.utime(src, (time, time)) + self.assertTrue(f.copy(dest)) + self.assertEqual('foo bar', open(dest, 'rb').read()) + + # If one of the dependencies changes, the file should be updated. The + # mtime of the dependency is set after the destination file, to avoid + # both files having the same time. + with open(incl, 'wb') as tmp: + tmp.write('quux') + time = os.path.getmtime(dest) + 1 + os.utime(incl, (time, time)) + self.assertTrue(f.copy(dest)) + self.assertEqual('quux', open(dest, 'rb').read()) + + # Perform one final copy to confirm that we don't run the preprocessor + # again. We update the mtime of the destination so it's newer than the + # input files. This would "just work" if we weren't changing + time = os.path.getmtime(incl) + 1 + os.utime(dest, (time, time)) + self.assertFalse(f.copy(DestNoWrite(dest))) + + def test_replace_symlink(self): + ''' + Test that if the destination exists, and is a symlink, the target of + the symlink is not overwritten by the preprocessor output. + ''' + if not self.symlink_supported: + return + + source = self.tmppath('source') + dest = self.tmppath('dest') + pp_source = self.tmppath('pp_in') + deps = self.tmppath('deps') + + with open(source, 'a'): + pass + + os.symlink(source, dest) + self.assertTrue(os.path.islink(dest)) + + with open(pp_source, 'wb') as tmp: + tmp.write('#define FOO\nPREPROCESSED') + + f = PreprocessedFile(pp_source, depfile_path=deps, marker='#', + defines={'FOO': True}) + self.assertTrue(f.copy(dest)) + + self.assertEqual('PREPROCESSED', open(dest, 'rb').read()) + self.assertFalse(os.path.islink(dest)) + self.assertEqual('', open(source, 'rb').read()) + +class TestExistingFile(TestWithTmpDir): + def test_required_missing_dest(self): + with self.assertRaisesRegexp(ErrorMessage, 'Required existing file'): + f = ExistingFile(required=True) + f.copy(self.tmppath('dest')) + + def test_required_existing_dest(self): + p = self.tmppath('dest') + with open(p, 'a'): + pass + + f = ExistingFile(required=True) + f.copy(p) + + def test_optional_missing_dest(self): + f = ExistingFile(required=False) + f.copy(self.tmppath('dest')) + + def test_optional_existing_dest(self): + p = self.tmppath('dest') + with open(p, 'a'): + pass + + f = ExistingFile(required=False) + f.copy(p) + + +class TestGeneratedFile(TestWithTmpDir): + def test_generated_file(self): + ''' + Check that GeneratedFile.copy yields the proper content in the + destination file in all situations that trigger different code paths + (see TestFile.test_file) + ''' + dest = self.tmppath('dest') + + for content in samples: + f = GeneratedFile(content) + f.copy(dest) + self.assertEqual(content, open(dest, 'rb').read()) + + def test_generated_file_open(self): + ''' + Test whether GeneratedFile.open returns an appropriately reset file + object. + ''' + content = ''.join(samples) + f = GeneratedFile(content) + self.assertEqual(content[:42], f.open().read(42)) + self.assertEqual(content, f.open().read()) + + def test_generated_file_no_write(self): + ''' + Test various conditions where GeneratedFile.copy is expected not to + write in the destination file. + ''' + dest = self.tmppath('dest') + + # Initial copy + f = GeneratedFile('test') + f.copy(dest) + + # Ensure subsequent copies won't trigger writes + f.copy(DestNoWrite(dest)) + self.assertEqual('test', open(dest, 'rb').read()) + + # When using a new instance with the same content, no copy should occur + f = GeneratedFile('test') + f.copy(DestNoWrite(dest)) + self.assertEqual('test', open(dest, 'rb').read()) + + # Double check that under conditions where a copy occurs, we would get + # an exception. + f = GeneratedFile('fooo') + self.assertRaises(RuntimeError, f.copy, DestNoWrite(dest)) + + +class TestDeflatedFile(TestWithTmpDir): + def test_deflated_file(self): + ''' + Check that DeflatedFile.copy yields the proper content in the + destination file in all situations that trigger different code paths + (see TestFile.test_file) + ''' + src = self.tmppath('src.jar') + dest = self.tmppath('dest') + + contents = {} + with JarWriter(src) as jar: + for content in samples: + name = ''.join(random.choice(string.letters) + for i in xrange(8)) + jar.add(name, content, compress=True) + contents[name] = content + + for j in JarReader(src): + f = DeflatedFile(j) + f.copy(dest) + self.assertEqual(contents[j.filename], open(dest, 'rb').read()) + + def test_deflated_file_open(self): + ''' + Test whether DeflatedFile.open returns an appropriately reset file + object. + ''' + src = self.tmppath('src.jar') + content = ''.join(samples) + with JarWriter(src) as jar: + jar.add('content', content) + + f = DeflatedFile(JarReader(src)['content']) + self.assertEqual(content[:42], f.open().read(42)) + self.assertEqual(content, f.open().read()) + + def test_deflated_file_no_write(self): + ''' + Test various conditions where DeflatedFile.copy is expected not to + write in the destination file. + ''' + src = self.tmppath('src.jar') + dest = self.tmppath('dest') + + with JarWriter(src) as jar: + jar.add('test', 'test') + jar.add('test2', 'test') + jar.add('fooo', 'fooo') + + jar = JarReader(src) + # Initial copy + f = DeflatedFile(jar['test']) + f.copy(dest) + + # Ensure subsequent copies won't trigger writes + f.copy(DestNoWrite(dest)) + self.assertEqual('test', open(dest, 'rb').read()) + + # When using a different file with the same content, no copy should + # occur + f = DeflatedFile(jar['test2']) + f.copy(DestNoWrite(dest)) + self.assertEqual('test', open(dest, 'rb').read()) + + # Double check that under conditions where a copy occurs, we would get + # an exception. + f = DeflatedFile(jar['fooo']) + self.assertRaises(RuntimeError, f.copy, DestNoWrite(dest)) + + +class TestManifestFile(TestWithTmpDir): + def test_manifest_file(self): + f = ManifestFile('chrome') + f.add(ManifestContent('chrome', 'global', 'toolkit/content/global/')) + f.add(ManifestResource('chrome', 'gre-resources', 'toolkit/res/')) + f.add(ManifestResource('chrome/pdfjs', 'pdfjs', './')) + f.add(ManifestContent('chrome/pdfjs', 'pdfjs', 'pdfjs')) + f.add(ManifestLocale('chrome', 'browser', 'en-US', + 'en-US/locale/browser/')) + + f.copy(self.tmppath('chrome.manifest')) + self.assertEqual(open(self.tmppath('chrome.manifest')).readlines(), [ + 'content global toolkit/content/global/\n', + 'resource gre-resources toolkit/res/\n', + 'resource pdfjs pdfjs/\n', + 'content pdfjs pdfjs/pdfjs\n', + 'locale browser en-US en-US/locale/browser/\n', + ]) + + self.assertRaises( + ValueError, + f.remove, + ManifestContent('', 'global', 'toolkit/content/global/') + ) + self.assertRaises( + ValueError, + f.remove, + ManifestOverride('chrome', 'chrome://global/locale/netError.dtd', + 'chrome://browser/locale/netError.dtd') + ) + + f.remove(ManifestContent('chrome', 'global', + 'toolkit/content/global/')) + self.assertRaises( + ValueError, + f.remove, + ManifestContent('chrome', 'global', 'toolkit/content/global/') + ) + + f.copy(self.tmppath('chrome.manifest')) + content = open(self.tmppath('chrome.manifest')).read() + self.assertEqual(content[:42], f.open().read(42)) + self.assertEqual(content, f.open().read()) + +# Compiled typelib for the following IDL: +# interface foo; +# [scriptable, uuid(5f70da76-519c-4858-b71e-e3c92333e2d6)] +# interface bar { +# void bar(in foo f); +# }; +# We need to make this [scriptable] so it doesn't get deleted from the +# typelib. We don't need to make the foo interfaces below [scriptable], +# because they will be automatically included by virtue of being an +# argument to a method of |bar|. +bar_xpt = GeneratedFile( + b'\x58\x50\x43\x4F\x4D\x0A\x54\x79\x70\x65\x4C\x69\x62\x0D\x0A\x1A' + + b'\x01\x02\x00\x02\x00\x00\x00\x7B\x00\x00\x00\x24\x00\x00\x00\x5C' + + b'\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + + b'\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x5F' + + b'\x70\xDA\x76\x51\x9C\x48\x58\xB7\x1E\xE3\xC9\x23\x33\xE2\xD6\x00' + + b'\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x0D\x00\x66\x6F\x6F\x00' + + b'\x62\x61\x72\x00\x62\x61\x72\x00\x00\x00\x00\x01\x00\x00\x00\x00' + + b'\x09\x01\x80\x92\x00\x01\x80\x06\x00\x00\x80' +) + +# Compiled typelib for the following IDL: +# [uuid(3271bebc-927e-4bef-935e-44e0aaf3c1e5)] +# interface foo { +# void foo(); +# }; +foo_xpt = GeneratedFile( + b'\x58\x50\x43\x4F\x4D\x0A\x54\x79\x70\x65\x4C\x69\x62\x0D\x0A\x1A' + + b'\x01\x02\x00\x01\x00\x00\x00\x57\x00\x00\x00\x24\x00\x00\x00\x40' + + b'\x80\x00\x00\x32\x71\xBE\xBC\x92\x7E\x4B\xEF\x93\x5E\x44\xE0\xAA' + + b'\xF3\xC1\xE5\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x09\x00' + + b'\x66\x6F\x6F\x00\x66\x6F\x6F\x00\x00\x00\x00\x01\x00\x00\x00\x00' + + b'\x05\x00\x80\x06\x00\x00\x00' +) + +# Compiled typelib for the following IDL: +# [uuid(7057f2aa-fdc2-4559-abde-08d939f7e80d)] +# interface foo { +# void foo(); +# }; +foo2_xpt = GeneratedFile( + b'\x58\x50\x43\x4F\x4D\x0A\x54\x79\x70\x65\x4C\x69\x62\x0D\x0A\x1A' + + b'\x01\x02\x00\x01\x00\x00\x00\x57\x00\x00\x00\x24\x00\x00\x00\x40' + + b'\x80\x00\x00\x70\x57\xF2\xAA\xFD\xC2\x45\x59\xAB\xDE\x08\xD9\x39' + + b'\xF7\xE8\x0D\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x09\x00' + + b'\x66\x6F\x6F\x00\x66\x6F\x6F\x00\x00\x00\x00\x01\x00\x00\x00\x00' + + b'\x05\x00\x80\x06\x00\x00\x00' +) + + +def read_interfaces(file): + return dict((i.name, i) for i in Typelib.read(file).interfaces) + + +class TestXPTFile(TestWithTmpDir): + def test_xpt_file(self): + x = XPTFile() + x.add(foo_xpt) + x.add(bar_xpt) + x.copy(self.tmppath('interfaces.xpt')) + + foo = read_interfaces(foo_xpt.open()) + foo2 = read_interfaces(foo2_xpt.open()) + bar = read_interfaces(bar_xpt.open()) + linked = read_interfaces(self.tmppath('interfaces.xpt')) + self.assertEqual(foo['foo'], linked['foo']) + self.assertEqual(bar['bar'], linked['bar']) + + x.remove(foo_xpt) + x.copy(self.tmppath('interfaces2.xpt')) + linked = read_interfaces(self.tmppath('interfaces2.xpt')) + self.assertEqual(bar['foo'], linked['foo']) + self.assertEqual(bar['bar'], linked['bar']) + + x.add(foo_xpt) + x.copy(DestNoWrite(self.tmppath('interfaces.xpt'))) + linked = read_interfaces(self.tmppath('interfaces.xpt')) + self.assertEqual(foo['foo'], linked['foo']) + self.assertEqual(bar['bar'], linked['bar']) + + x = XPTFile() + x.add(foo2_xpt) + x.add(bar_xpt) + x.copy(self.tmppath('interfaces.xpt')) + linked = read_interfaces(self.tmppath('interfaces.xpt')) + self.assertEqual(foo2['foo'], linked['foo']) + self.assertEqual(bar['bar'], linked['bar']) + + x = XPTFile() + x.add(foo_xpt) + x.add(foo2_xpt) + x.add(bar_xpt) + from xpt import DataError + self.assertRaises(DataError, x.copy, self.tmppath('interfaces.xpt')) + + +class TestMinifiedProperties(TestWithTmpDir): + def test_minified_properties(self): + propLines = [ + '# Comments are removed', + 'foo = bar', + '', + '# Another comment', + ] + prop = GeneratedFile('\n'.join(propLines)) + self.assertEqual(MinifiedProperties(prop).open().readlines(), + ['foo = bar\n', '\n']) + open(self.tmppath('prop'), 'wb').write('\n'.join(propLines)) + MinifiedProperties(File(self.tmppath('prop'))) \ + .copy(self.tmppath('prop2')) + self.assertEqual(open(self.tmppath('prop2')).readlines(), + ['foo = bar\n', '\n']) + + +class TestMinifiedJavaScript(TestWithTmpDir): + orig_lines = [ + '// Comment line', + 'let foo = "bar";', + 'var bar = true;', + '', + '// Another comment', + ] + + def test_minified_javascript(self): + orig_f = GeneratedFile('\n'.join(self.orig_lines)) + min_f = MinifiedJavaScript(orig_f) + + mini_lines = min_f.open().readlines() + self.assertTrue(mini_lines) + self.assertTrue(len(mini_lines) < len(self.orig_lines)) + + def _verify_command(self, code): + our_dir = os.path.abspath(os.path.dirname(__file__)) + return [ + sys.executable, + os.path.join(our_dir, 'support', 'minify_js_verify.py'), + code, + ] + + def test_minified_verify_success(self): + orig_f = GeneratedFile('\n'.join(self.orig_lines)) + min_f = MinifiedJavaScript(orig_f, + verify_command=self._verify_command('0')) + + mini_lines = min_f.open().readlines() + self.assertTrue(mini_lines) + self.assertTrue(len(mini_lines) < len(self.orig_lines)) + + def test_minified_verify_failure(self): + orig_f = GeneratedFile('\n'.join(self.orig_lines)) + errors.out = StringIO() + min_f = MinifiedJavaScript(orig_f, + verify_command=self._verify_command('1')) + + mini_lines = min_f.open().readlines() + output = errors.out.getvalue() + errors.out = sys.stderr + self.assertEqual(output, + 'Warning: JS minification verification failed for <unknown>:\n' + 'Warning: Error message\n') + self.assertEqual(mini_lines, orig_f.open().readlines()) + + +class MatchTestTemplate(object): + def prepare_match_test(self, with_dotfiles=False): + self.add('bar') + self.add('foo/bar') + self.add('foo/baz') + self.add('foo/qux/1') + self.add('foo/qux/bar') + self.add('foo/qux/2/test') + self.add('foo/qux/2/test2') + if with_dotfiles: + self.add('foo/.foo') + self.add('foo/.bar/foo') + + def do_match_test(self): + self.do_check('', [ + 'bar', 'foo/bar', 'foo/baz', 'foo/qux/1', 'foo/qux/bar', + 'foo/qux/2/test', 'foo/qux/2/test2' + ]) + self.do_check('*', [ + 'bar', 'foo/bar', 'foo/baz', 'foo/qux/1', 'foo/qux/bar', + 'foo/qux/2/test', 'foo/qux/2/test2' + ]) + self.do_check('foo/qux', [ + 'foo/qux/1', 'foo/qux/bar', 'foo/qux/2/test', 'foo/qux/2/test2' + ]) + self.do_check('foo/b*', ['foo/bar', 'foo/baz']) + self.do_check('baz', []) + self.do_check('foo/foo', []) + self.do_check('foo/*ar', ['foo/bar']) + self.do_check('*ar', ['bar']) + self.do_check('*/bar', ['foo/bar']) + self.do_check('foo/*ux', [ + 'foo/qux/1', 'foo/qux/bar', 'foo/qux/2/test', 'foo/qux/2/test2' + ]) + self.do_check('foo/q*ux', [ + 'foo/qux/1', 'foo/qux/bar', 'foo/qux/2/test', 'foo/qux/2/test2' + ]) + self.do_check('foo/*/2/test*', ['foo/qux/2/test', 'foo/qux/2/test2']) + self.do_check('**/bar', ['bar', 'foo/bar', 'foo/qux/bar']) + self.do_check('foo/**/test', ['foo/qux/2/test']) + self.do_check('foo', [ + 'foo/bar', 'foo/baz', 'foo/qux/1', 'foo/qux/bar', + 'foo/qux/2/test', 'foo/qux/2/test2' + ]) + self.do_check('foo/**', [ + 'foo/bar', 'foo/baz', 'foo/qux/1', 'foo/qux/bar', + 'foo/qux/2/test', 'foo/qux/2/test2' + ]) + self.do_check('**/2/test*', ['foo/qux/2/test', 'foo/qux/2/test2']) + self.do_check('**/foo', [ + 'foo/bar', 'foo/baz', 'foo/qux/1', 'foo/qux/bar', + 'foo/qux/2/test', 'foo/qux/2/test2' + ]) + self.do_check('**/barbaz', []) + self.do_check('f**/bar', ['foo/bar']) + + def do_finder_test(self, finder): + self.assertTrue(finder.contains('foo/.foo')) + self.assertTrue(finder.contains('foo/.bar')) + self.assertTrue('foo/.foo' in [f for f, c in + finder.find('foo/.foo')]) + self.assertTrue('foo/.bar/foo' in [f for f, c in + finder.find('foo/.bar')]) + self.assertEqual(sorted([f for f, c in finder.find('foo/.*')]), + ['foo/.bar/foo', 'foo/.foo']) + for pattern in ['foo', '**', '**/*', '**/foo', 'foo/*']: + self.assertFalse('foo/.foo' in [f for f, c in + finder.find(pattern)]) + self.assertFalse('foo/.bar/foo' in [f for f, c in + finder.find(pattern)]) + self.assertEqual(sorted([f for f, c in finder.find(pattern)]), + sorted([f for f, c in finder + if mozpath.match(f, pattern)])) + + +def do_check(test, finder, pattern, result): + if result: + test.assertTrue(finder.contains(pattern)) + else: + test.assertFalse(finder.contains(pattern)) + test.assertEqual(sorted(list(f for f, c in finder.find(pattern))), + sorted(result)) + + +class TestFileFinder(MatchTestTemplate, TestWithTmpDir): + def add(self, path): + ensureParentDir(self.tmppath(path)) + open(self.tmppath(path), 'wb').write(path) + + def do_check(self, pattern, result): + do_check(self, self.finder, pattern, result) + + def test_file_finder(self): + self.prepare_match_test(with_dotfiles=True) + self.finder = FileFinder(self.tmpdir) + self.do_match_test() + self.do_finder_test(self.finder) + + def test_get(self): + self.prepare_match_test() + finder = FileFinder(self.tmpdir) + + self.assertIsNone(finder.get('does-not-exist')) + res = finder.get('bar') + self.assertIsInstance(res, File) + self.assertEqual(mozpath.normpath(res.path), + mozpath.join(self.tmpdir, 'bar')) + + def test_ignored_dirs(self): + """Ignored directories should not have results returned.""" + self.prepare_match_test() + self.add('fooz') + + # Present to ensure prefix matching doesn't exclude. + self.add('foo/quxz') + + self.finder = FileFinder(self.tmpdir, ignore=['foo/qux']) + + self.do_check('**', ['bar', 'foo/bar', 'foo/baz', 'foo/quxz', 'fooz']) + self.do_check('foo/*', ['foo/bar', 'foo/baz', 'foo/quxz']) + self.do_check('foo/**', ['foo/bar', 'foo/baz', 'foo/quxz']) + self.do_check('foo/qux/**', []) + self.do_check('foo/qux/*', []) + self.do_check('foo/qux/bar', []) + self.do_check('foo/quxz', ['foo/quxz']) + self.do_check('fooz', ['fooz']) + + def test_ignored_files(self): + """Ignored files should not have results returned.""" + self.prepare_match_test() + + # Be sure prefix match doesn't get ignored. + self.add('barz') + + self.finder = FileFinder(self.tmpdir, ignore=['foo/bar', 'bar']) + self.do_check('**', ['barz', 'foo/baz', 'foo/qux/1', 'foo/qux/2/test', + 'foo/qux/2/test2', 'foo/qux/bar']) + self.do_check('foo/**', ['foo/baz', 'foo/qux/1', 'foo/qux/2/test', + 'foo/qux/2/test2', 'foo/qux/bar']) + + def test_ignored_patterns(self): + """Ignore entries with patterns should be honored.""" + self.prepare_match_test() + + self.add('foo/quxz') + + self.finder = FileFinder(self.tmpdir, ignore=['foo/qux/*']) + self.do_check('**', ['foo/bar', 'foo/baz', 'foo/quxz', 'bar']) + self.do_check('foo/**', ['foo/bar', 'foo/baz', 'foo/quxz']) + + def test_dotfiles(self): + """Finder can find files beginning with . is configured.""" + self.prepare_match_test(with_dotfiles=True) + self.finder = FileFinder(self.tmpdir, find_dotfiles=True) + self.do_check('**', ['bar', 'foo/.foo', 'foo/.bar/foo', + 'foo/bar', 'foo/baz', 'foo/qux/1', 'foo/qux/bar', + 'foo/qux/2/test', 'foo/qux/2/test2']) + + def test_dotfiles_plus_ignore(self): + self.prepare_match_test(with_dotfiles=True) + self.finder = FileFinder(self.tmpdir, find_dotfiles=True, + ignore=['foo/.bar/**']) + self.do_check('foo/**', ['foo/.foo', 'foo/bar', 'foo/baz', + 'foo/qux/1', 'foo/qux/bar', 'foo/qux/2/test', 'foo/qux/2/test2']) + + +class TestJarFinder(MatchTestTemplate, TestWithTmpDir): + def add(self, path): + self.jar.add(path, path, compress=True) + + def do_check(self, pattern, result): + do_check(self, self.finder, pattern, result) + + def test_jar_finder(self): + self.jar = JarWriter(file=self.tmppath('test.jar')) + self.prepare_match_test() + self.jar.finish() + reader = JarReader(file=self.tmppath('test.jar')) + self.finder = JarFinder(self.tmppath('test.jar'), reader) + self.do_match_test() + + self.assertIsNone(self.finder.get('does-not-exist')) + self.assertIsInstance(self.finder.get('bar'), DeflatedFile) + +class TestTarFinder(MatchTestTemplate, TestWithTmpDir): + def add(self, path): + self.tar.addfile(tarfile.TarInfo(name=path)) + + def do_check(self, pattern, result): + do_check(self, self.finder, pattern, result) + + def test_tar_finder(self): + self.tar = tarfile.open(name=self.tmppath('test.tar.bz2'), + mode='w:bz2') + self.prepare_match_test() + self.tar.close() + with tarfile.open(name=self.tmppath('test.tar.bz2'), + mode='r:bz2') as tarreader: + self.finder = TarFinder(self.tmppath('test.tar.bz2'), tarreader) + self.do_match_test() + + self.assertIsNone(self.finder.get('does-not-exist')) + self.assertIsInstance(self.finder.get('bar'), ExtractedTarFile) + + +class TestComposedFinder(MatchTestTemplate, TestWithTmpDir): + def add(self, path, content=None): + # Put foo/qux files under $tmp/b. + if path.startswith('foo/qux/'): + real_path = mozpath.join('b', path[8:]) + else: + real_path = mozpath.join('a', path) + ensureParentDir(self.tmppath(real_path)) + if not content: + content = path + open(self.tmppath(real_path), 'wb').write(content) + + def do_check(self, pattern, result): + if '*' in pattern: + return + do_check(self, self.finder, pattern, result) + + def test_composed_finder(self): + self.prepare_match_test() + # Also add files in $tmp/a/foo/qux because ComposedFinder is + # expected to mask foo/qux entirely with content from $tmp/b. + ensureParentDir(self.tmppath('a/foo/qux/hoge')) + open(self.tmppath('a/foo/qux/hoge'), 'wb').write('hoge') + open(self.tmppath('a/foo/qux/bar'), 'wb').write('not the right content') + self.finder = ComposedFinder({ + '': FileFinder(self.tmppath('a')), + 'foo/qux': FileFinder(self.tmppath('b')), + }) + self.do_match_test() + + self.assertIsNone(self.finder.get('does-not-exist')) + self.assertIsInstance(self.finder.get('bar'), File) + + +@unittest.skipUnless(hglib, 'hglib not available') +class TestMercurialRevisionFinder(MatchTestTemplate, TestWithTmpDir): + def setUp(self): + super(TestMercurialRevisionFinder, self).setUp() + hglib.init(self.tmpdir) + + def add(self, path): + c = hglib.open(self.tmpdir) + ensureParentDir(self.tmppath(path)) + with open(self.tmppath(path), 'wb') as fh: + fh.write(path) + c.add(self.tmppath(path)) + + def do_check(self, pattern, result): + do_check(self, self.finder, pattern, result) + + def _get_finder(self, *args, **kwargs): + return MercurialRevisionFinder(*args, **kwargs) + + def test_default_revision(self): + self.prepare_match_test() + c = hglib.open(self.tmpdir) + c.commit('initial commit') + self.finder = self._get_finder(self.tmpdir) + self.do_match_test() + + self.assertIsNone(self.finder.get('does-not-exist')) + self.assertIsInstance(self.finder.get('bar'), MercurialFile) + + def test_old_revision(self): + c = hglib.open(self.tmpdir) + with open(self.tmppath('foo'), 'wb') as fh: + fh.write('foo initial') + c.add(self.tmppath('foo')) + c.commit('initial') + + with open(self.tmppath('foo'), 'wb') as fh: + fh.write('foo second') + with open(self.tmppath('bar'), 'wb') as fh: + fh.write('bar second') + c.add(self.tmppath('bar')) + c.commit('second') + # This wipes out the working directory, ensuring the finder isn't + # finding anything from the filesystem. + c.rawcommand(['update', 'null']) + + finder = self._get_finder(self.tmpdir, 0) + f = finder.get('foo') + self.assertEqual(f.read(), 'foo initial') + self.assertEqual(f.read(), 'foo initial', 'read again for good measure') + self.assertIsNone(finder.get('bar')) + + finder = MercurialRevisionFinder(self.tmpdir, rev=1) + f = finder.get('foo') + self.assertEqual(f.read(), 'foo second') + f = finder.get('bar') + self.assertEqual(f.read(), 'bar second') + + def test_recognize_repo_paths(self): + c = hglib.open(self.tmpdir) + with open(self.tmppath('foo'), 'wb') as fh: + fh.write('initial') + c.add(self.tmppath('foo')) + c.commit('initial') + c.rawcommand(['update', 'null']) + + finder = self._get_finder(self.tmpdir, 0, + recognize_repo_paths=True) + with self.assertRaises(NotImplementedError): + list(finder.find('')) + + with self.assertRaises(ValueError): + finder.get('foo') + with self.assertRaises(ValueError): + finder.get('') + + f = finder.get(self.tmppath('foo')) + self.assertIsInstance(f, MercurialFile) + self.assertEqual(f.read(), 'initial') + + +@unittest.skipUnless(MercurialNativeRevisionFinder, 'hgnative not available') +class TestMercurialNativeRevisionFinder(TestMercurialRevisionFinder): + def _get_finder(self, *args, **kwargs): + return MercurialNativeRevisionFinder(*args, **kwargs) + + +if __name__ == '__main__': + mozunit.main() |