diff options
Diffstat (limited to 'testing/web-platform/harness/wptrunner/testloader.py')
-rw-r--r-- | testing/web-platform/harness/wptrunner/testloader.py | 637 |
1 files changed, 637 insertions, 0 deletions
diff --git a/testing/web-platform/harness/wptrunner/testloader.py b/testing/web-platform/harness/wptrunner/testloader.py new file mode 100644 index 000000000..671b639f7 --- /dev/null +++ b/testing/web-platform/harness/wptrunner/testloader.py @@ -0,0 +1,637 @@ +import hashlib +import json +import os +import urlparse +from abc import ABCMeta, abstractmethod +from Queue import Empty +from collections import defaultdict, OrderedDict, deque +from multiprocessing import Queue + +import manifestinclude +import manifestexpected +import wpttest +from mozlog import structured + +manifest = None +manifest_update = None + +def do_delayed_imports(): + # This relies on an already loaded module having set the sys.path correctly :( + global manifest, manifest_update + from manifest import manifest + from manifest import update as manifest_update + +class TestChunker(object): + def __init__(self, total_chunks, chunk_number): + self.total_chunks = total_chunks + self.chunk_number = chunk_number + assert self.chunk_number <= self.total_chunks + self.logger = structured.get_default_logger() + + def __call__(self, manifest): + raise NotImplementedError + + +class Unchunked(TestChunker): + def __init__(self, *args, **kwargs): + TestChunker.__init__(self, *args, **kwargs) + assert self.total_chunks == 1 + + def __call__(self, manifest): + for item in manifest: + yield item + + +class HashChunker(TestChunker): + def __call__(self, manifest): + chunk_index = self.chunk_number - 1 + for test_path, tests in manifest: + h = int(hashlib.md5(test_path).hexdigest(), 16) + if h % self.total_chunks == chunk_index: + yield test_path, tests + + +class DirectoryHashChunker(TestChunker): + """Like HashChunker except the directory is hashed. + + This ensures that all tests in the same directory end up in the same + chunk. + """ + def __call__(self, manifest): + chunk_index = self.chunk_number - 1 + for test_path, tests in manifest: + h = int(hashlib.md5(os.path.dirname(test_path)).hexdigest(), 16) + if h % self.total_chunks == chunk_index: + yield test_path, tests + + +class EqualTimeChunker(TestChunker): + def _group_by_directory(self, manifest_items): + """Split the list of manifest items into a ordered dict that groups tests in + so that anything in the same subdirectory beyond a depth of 3 is in the same + group. So all tests in a/b/c, a/b/c/d and a/b/c/e will be grouped together + and separate to tests in a/b/f + + Returns: tuple (ordered dict of {test_dir: PathData}, total estimated runtime) + """ + + class PathData(object): + def __init__(self, path): + self.path = path + self.time = 0 + self.tests = [] + + by_dir = OrderedDict() + total_time = 0 + + for i, (test_path, tests) in enumerate(manifest_items): + test_dir = tuple(os.path.split(test_path)[0].split(os.path.sep)[:3]) + + if not test_dir in by_dir: + by_dir[test_dir] = PathData(test_dir) + + data = by_dir[test_dir] + time = sum(wpttest.DEFAULT_TIMEOUT if test.timeout != + "long" else wpttest.LONG_TIMEOUT for test in tests) + data.time += time + total_time += time + data.tests.append((test_path, tests)) + + return by_dir, total_time + + def _maybe_remove(self, chunks, i, direction): + """Trial removing a chunk from one chunk to an adjacent one. + + :param chunks: - the list of all chunks + :param i: - the chunk index in the list of chunks to try removing from + :param direction: either "next" if we are going to move from the end to + the subsequent chunk, or "prev" if we are going to move + from the start into the previous chunk. + + :returns bool: Did a chunk get moved?""" + source_chunk = chunks[i] + if direction == "next": + target_chunk = chunks[i+1] + path_index = -1 + move_func = lambda: target_chunk.appendleft(source_chunk.pop()) + elif direction == "prev": + target_chunk = chunks[i-1] + path_index = 0 + move_func = lambda: target_chunk.append(source_chunk.popleft()) + else: + raise ValueError("Unexpected move direction %s" % direction) + + return self._maybe_move(source_chunk, target_chunk, path_index, move_func) + + def _maybe_add(self, chunks, i, direction): + """Trial adding a chunk from one chunk to an adjacent one. + + :param chunks: - the list of all chunks + :param i: - the chunk index in the list of chunks to try adding to + :param direction: either "next" if we are going to remove from the + the subsequent chunk, or "prev" if we are going to remove + from the the previous chunk. + + :returns bool: Did a chunk get moved?""" + target_chunk = chunks[i] + if direction == "next": + source_chunk = chunks[i+1] + path_index = 0 + move_func = lambda: target_chunk.append(source_chunk.popleft()) + elif direction == "prev": + source_chunk = chunks[i-1] + path_index = -1 + move_func = lambda: target_chunk.appendleft(source_chunk.pop()) + else: + raise ValueError("Unexpected move direction %s" % direction) + + return self._maybe_move(source_chunk, target_chunk, path_index, move_func) + + def _maybe_move(self, source_chunk, target_chunk, path_index, move_func): + """Move from one chunk to another, assess the change in badness, + and keep the move iff it decreases the badness score. + + :param source_chunk: chunk to move from + :param target_chunk: chunk to move to + :param path_index: 0 if we are moving from the start or -1 if we are moving from the + end + :param move_func: Function that actually moves between chunks""" + if len(source_chunk.paths) <= 1: + return False + + move_time = source_chunk.paths[path_index].time + + new_source_badness = self._badness(source_chunk.time - move_time) + new_target_badness = self._badness(target_chunk.time + move_time) + + delta_badness = ((new_source_badness + new_target_badness) - + (source_chunk.badness + target_chunk.badness)) + if delta_badness < 0: + move_func() + return True + + return False + + def _badness(self, time): + """Metric of badness for a specific chunk + + :param time: the time for a specific chunk""" + return (time - self.expected_time)**2 + + def _get_chunk(self, manifest_items): + by_dir, total_time = self._group_by_directory(manifest_items) + + if len(by_dir) < self.total_chunks: + raise ValueError("Tried to split into %i chunks, but only %i subdirectories included" % ( + self.total_chunks, len(by_dir))) + + self.expected_time = float(total_time) / self.total_chunks + + chunks = self._create_initial_chunks(by_dir) + + while True: + # Move a test from one chunk to the next until doing so no longer + # reduces the badness + got_improvement = self._update_chunks(chunks) + if not got_improvement: + break + + self.logger.debug(self.expected_time) + for i, chunk in chunks.iteritems(): + self.logger.debug("%i: %i, %i" % (i + 1, chunk.time, chunk.badness)) + + assert self._all_tests(by_dir) == self._chunked_tests(chunks) + + return self._get_tests(chunks) + + @staticmethod + def _all_tests(by_dir): + """Return a set of all tests in the manifest from a grouping by directory""" + return set(x[0] for item in by_dir.itervalues() + for x in item.tests) + + @staticmethod + def _chunked_tests(chunks): + """Return a set of all tests in the manifest from the chunk list""" + return set(x[0] for chunk in chunks.itervalues() + for path in chunk.paths + for x in path.tests) + + + def _create_initial_chunks(self, by_dir): + """Create an initial unbalanced list of chunks. + + :param by_dir: All tests in the manifest grouped by subdirectory + :returns list: A list of Chunk objects""" + + class Chunk(object): + def __init__(self, paths, index): + """List of PathData objects that together form a single chunk of + tests""" + self.paths = deque(paths) + self.time = sum(item.time for item in paths) + self.index = index + + def appendleft(self, path): + """Add a PathData object to the start of the chunk""" + self.paths.appendleft(path) + self.time += path.time + + def append(self, path): + """Add a PathData object to the end of the chunk""" + self.paths.append(path) + self.time += path.time + + def pop(self): + """Remove PathData object from the end of the chunk""" + assert len(self.paths) > 1 + self.time -= self.paths[-1].time + return self.paths.pop() + + def popleft(self): + """Remove PathData object from the start of the chunk""" + assert len(self.paths) > 1 + self.time -= self.paths[0].time + return self.paths.popleft() + + @property + def badness(self_): + """Badness metric for this chunk""" + return self._badness(self_.time) + + initial_size = len(by_dir) / self.total_chunks + chunk_boundaries = [initial_size * i + for i in xrange(self.total_chunks)] + [len(by_dir)] + + chunks = OrderedDict() + for i, lower in enumerate(chunk_boundaries[:-1]): + upper = chunk_boundaries[i + 1] + paths = by_dir.values()[lower:upper] + chunks[i] = Chunk(paths, i) + + assert self._all_tests(by_dir) == self._chunked_tests(chunks) + + return chunks + + def _update_chunks(self, chunks): + """Run a single iteration of the chunk update algorithm. + + :param chunks: - List of chunks + """ + #TODO: consider replacing this with a heap + sorted_chunks = sorted(chunks.values(), key=lambda x:-x.badness) + got_improvement = False + for chunk in sorted_chunks: + if chunk.time < self.expected_time: + f = self._maybe_add + else: + f = self._maybe_remove + + if chunk.index == 0: + order = ["next"] + elif chunk.index == self.total_chunks - 1: + order = ["prev"] + else: + if chunk.time < self.expected_time: + # First try to add a test from the neighboring chunk with the + # greatest total time + if chunks[chunk.index + 1].time > chunks[chunk.index - 1].time: + order = ["next", "prev"] + else: + order = ["prev", "next"] + else: + # First try to remove a test and add to the neighboring chunk with the + # lowest total time + if chunks[chunk.index + 1].time > chunks[chunk.index - 1].time: + order = ["prev", "next"] + else: + order = ["next", "prev"] + + for direction in order: + if f(chunks, chunk.index, direction): + got_improvement = True + break + + if got_improvement: + break + + return got_improvement + + def _get_tests(self, chunks): + """Return the list of tests corresponding to the chunk number we are running. + + :param chunks: List of chunks""" + tests = [] + for path in chunks[self.chunk_number - 1].paths: + tests.extend(path.tests) + + return tests + + def __call__(self, manifest_iter): + manifest = list(manifest_iter) + tests = self._get_chunk(manifest) + for item in tests: + yield item + + +class TestFilter(object): + def __init__(self, test_manifests, include=None, exclude=None, manifest_path=None): + if manifest_path is not None and include is None: + self.manifest = manifestinclude.get_manifest(manifest_path) + else: + self.manifest = manifestinclude.IncludeManifest.create() + + if include: + self.manifest.set("skip", "true") + for item in include: + self.manifest.add_include(test_manifests, item) + + if exclude: + for item in exclude: + self.manifest.add_exclude(test_manifests, item) + + def __call__(self, manifest_iter): + for test_path, tests in manifest_iter: + include_tests = set() + for test in tests: + if self.manifest.include(test): + include_tests.add(test) + + if include_tests: + yield test_path, include_tests + +class TagFilter(object): + def __init__(self, tags): + self.tags = set(tags) + + def __call__(self, test_iter): + for test in test_iter: + if test.tags & self.tags: + yield test + +class ManifestLoader(object): + def __init__(self, test_paths, force_manifest_update=False): + do_delayed_imports() + self.test_paths = test_paths + self.force_manifest_update = force_manifest_update + self.logger = structured.get_default_logger() + if self.logger is None: + self.logger = structured.structuredlog.StructuredLogger("ManifestLoader") + + def load(self): + rv = {} + for url_base, paths in self.test_paths.iteritems(): + manifest_file = self.load_manifest(url_base=url_base, + **paths) + path_data = {"url_base": url_base} + path_data.update(paths) + rv[manifest_file] = path_data + return rv + + def create_manifest(self, manifest_path, tests_path, url_base="/"): + self.update_manifest(manifest_path, tests_path, url_base, recreate=True) + + def update_manifest(self, manifest_path, tests_path, url_base="/", + recreate=False): + self.logger.info("Updating test manifest %s" % manifest_path) + + json_data = None + if not recreate: + try: + with open(manifest_path) as f: + json_data = json.load(f) + except IOError: + #If the existing file doesn't exist just create one from scratch + pass + + if not json_data: + manifest_file = manifest.Manifest(None, url_base) + else: + try: + manifest_file = manifest.Manifest.from_json(tests_path, json_data) + except manifest.ManifestVersionMismatch: + manifest_file = manifest.Manifest(None, url_base) + + manifest_update.update(tests_path, url_base, manifest_file) + + manifest.write(manifest_file, manifest_path) + + def load_manifest(self, tests_path, metadata_path, url_base="/"): + manifest_path = os.path.join(metadata_path, "MANIFEST.json") + if (not os.path.exists(manifest_path) or + self.force_manifest_update): + self.update_manifest(manifest_path, tests_path, url_base) + manifest_file = manifest.load(tests_path, manifest_path) + if manifest_file.url_base != url_base: + self.logger.info("Updating url_base in manifest from %s to %s" % (manifest_file.url_base, + url_base)) + manifest_file.url_base = url_base + manifest.write(manifest_file, manifest_path) + + return manifest_file + +def iterfilter(filters, iter): + for f in filters: + iter = f(iter) + for item in iter: + yield item + +class TestLoader(object): + def __init__(self, + test_manifests, + test_types, + run_info, + manifest_filters=None, + meta_filters=None, + chunk_type="none", + total_chunks=1, + chunk_number=1, + include_https=True): + + self.test_types = test_types + self.run_info = run_info + + self.manifest_filters = manifest_filters if manifest_filters is not None else [] + self.meta_filters = meta_filters if meta_filters is not None else [] + + self.manifests = test_manifests + self.tests = None + self.disabled_tests = None + self.include_https = include_https + + self.chunk_type = chunk_type + self.total_chunks = total_chunks + self.chunk_number = chunk_number + + self.chunker = {"none": Unchunked, + "hash": HashChunker, + "dir_hash": DirectoryHashChunker, + "equal_time": EqualTimeChunker}[chunk_type](total_chunks, + chunk_number) + + self._test_ids = None + + self.directory_manifests = {} + + self._load_tests() + + @property + def test_ids(self): + if self._test_ids is None: + self._test_ids = [] + for test_dict in [self.disabled_tests, self.tests]: + for test_type in self.test_types: + self._test_ids += [item.id for item in test_dict[test_type]] + return self._test_ids + + def get_test(self, manifest_test, inherit_metadata, test_metadata): + if test_metadata is not None: + inherit_metadata.append(test_metadata) + test_metadata = test_metadata.get_test(manifest_test.id) + + return wpttest.from_manifest(manifest_test, inherit_metadata, test_metadata) + + def load_dir_metadata(self, test_manifest, metadata_path, test_path): + rv = [] + path_parts = os.path.dirname(test_path).split(os.path.sep) + for i in xrange(1,len(path_parts) + 1): + path = os.path.join(os.path.sep.join(path_parts[:i]), "__dir__.ini") + if path not in self.directory_manifests: + self.directory_manifests[path] = manifestexpected.get_dir_manifest( + metadata_path, path, self.run_info) + manifest = self.directory_manifests[path] + if manifest is not None: + rv.append(manifest) + return rv + + def load_metadata(self, test_manifest, metadata_path, test_path): + inherit_metadata = self.load_dir_metadata(test_manifest, metadata_path, test_path) + test_metadata = manifestexpected.get_manifest( + metadata_path, test_path, test_manifest.url_base, self.run_info) + return inherit_metadata, test_metadata + + def iter_tests(self): + manifest_items = [] + + for manifest in sorted(self.manifests.keys(), key=lambda x:x.url_base): + manifest_iter = iterfilter(self.manifest_filters, + manifest.itertypes(*self.test_types)) + manifest_items.extend(manifest_iter) + + if self.chunker is not None: + manifest_items = self.chunker(manifest_items) + + for test_path, tests in manifest_items: + manifest_file = iter(tests).next().manifest + metadata_path = self.manifests[manifest_file]["metadata_path"] + inherit_metadata, test_metadata = self.load_metadata(manifest_file, metadata_path, test_path) + + for test in iterfilter(self.meta_filters, + self.iter_wpttest(inherit_metadata, test_metadata, tests)): + yield test_path, test.test_type, test + + def iter_wpttest(self, inherit_metadata, test_metadata, tests): + for manifest_test in tests: + yield self.get_test(manifest_test, inherit_metadata, test_metadata) + + def _load_tests(self): + """Read in the tests from the manifest file and add them to a queue""" + tests = {"enabled":defaultdict(list), + "disabled":defaultdict(list)} + + for test_path, test_type, test in self.iter_tests(): + enabled = not test.disabled() + if not self.include_https and test.environment["protocol"] == "https": + enabled = False + key = "enabled" if enabled else "disabled" + tests[key][test_type].append(test) + + self.tests = tests["enabled"] + self.disabled_tests = tests["disabled"] + + def groups(self, test_types, chunk_type="none", total_chunks=1, chunk_number=1): + groups = set() + + for test_type in test_types: + for test in self.tests[test_type]: + group = test.url.split("/")[1] + groups.add(group) + + return groups + + +class TestSource(object): + __metaclass__ = ABCMeta + + @abstractmethod + def queue_tests(self, test_queue): + pass + + @abstractmethod + def requeue_test(self, test): + pass + + def __enter__(self): + return self + + def __exit__(self, *args, **kwargs): + pass + + +class SingleTestSource(TestSource): + def __init__(self, test_queue): + self.test_queue = test_queue + + @classmethod + def queue_tests(cls, test_queue, test_type, tests): + for test in tests[test_type]: + test_queue.put(test) + + def get_queue(self): + if self.test_queue.empty(): + return None + return self.test_queue + + def requeue_test(self, test): + self.test_queue.put(test) + +class PathGroupedSource(TestSource): + def __init__(self, test_queue): + self.test_queue = test_queue + self.current_queue = None + + @classmethod + def queue_tests(cls, test_queue, test_type, tests, depth=None): + if depth is True: + depth = None + + prev_path = None + group = None + + for test in tests[test_type]: + path = urlparse.urlsplit(test.url).path.split("/")[1:-1][:depth] + if path != prev_path: + group = [] + test_queue.put(group) + prev_path = path + + group.append(test) + + def get_queue(self): + if not self.current_queue or self.current_queue.empty(): + try: + data = self.test_queue.get(block=True, timeout=1) + self.current_queue = Queue() + for item in data: + self.current_queue.put(item) + except Empty: + return None + + return self.current_queue + + def requeue_test(self, test): + self.current_queue.put(test) + + def __exit__(self, *args, **kwargs): + if self.current_queue: + self.current_queue.close() |