import hashlib import json import os import urlparse from abc import ABCMeta, abstractmethod from Queue import Empty from collections import defaultdict, OrderedDict, deque from multiprocessing import Queue import manifestinclude import manifestexpected import wpttest from mozlog import structured manifest = None manifest_update = None def do_delayed_imports(): # This relies on an already loaded module having set the sys.path correctly :( global manifest, manifest_update from manifest import manifest from manifest import update as manifest_update class TestChunker(object): def __init__(self, total_chunks, chunk_number): self.total_chunks = total_chunks self.chunk_number = chunk_number assert self.chunk_number <= self.total_chunks self.logger = structured.get_default_logger() def __call__(self, manifest): raise NotImplementedError class Unchunked(TestChunker): def __init__(self, *args, **kwargs): TestChunker.__init__(self, *args, **kwargs) assert self.total_chunks == 1 def __call__(self, manifest): for item in manifest: yield item class HashChunker(TestChunker): def __call__(self, manifest): chunk_index = self.chunk_number - 1 for test_path, tests in manifest: h = int(hashlib.md5(test_path).hexdigest(), 16) if h % self.total_chunks == chunk_index: yield test_path, tests class DirectoryHashChunker(TestChunker): """Like HashChunker except the directory is hashed. This ensures that all tests in the same directory end up in the same chunk. """ def __call__(self, manifest): chunk_index = self.chunk_number - 1 for test_path, tests in manifest: h = int(hashlib.md5(os.path.dirname(test_path)).hexdigest(), 16) if h % self.total_chunks == chunk_index: yield test_path, tests class EqualTimeChunker(TestChunker): def _group_by_directory(self, manifest_items): """Split the list of manifest items into a ordered dict that groups tests in so that anything in the same subdirectory beyond a depth of 3 is in the same group. So all tests in a/b/c, a/b/c/d and a/b/c/e will be grouped together and separate to tests in a/b/f Returns: tuple (ordered dict of {test_dir: PathData}, total estimated runtime) """ class PathData(object): def __init__(self, path): self.path = path self.time = 0 self.tests = [] by_dir = OrderedDict() total_time = 0 for i, (test_path, tests) in enumerate(manifest_items): test_dir = tuple(os.path.split(test_path)[0].split(os.path.sep)[:3]) if not test_dir in by_dir: by_dir[test_dir] = PathData(test_dir) data = by_dir[test_dir] time = sum(wpttest.DEFAULT_TIMEOUT if test.timeout != "long" else wpttest.LONG_TIMEOUT for test in tests) data.time += time total_time += time data.tests.append((test_path, tests)) return by_dir, total_time def _maybe_remove(self, chunks, i, direction): """Trial removing a chunk from one chunk to an adjacent one. :param chunks: - the list of all chunks :param i: - the chunk index in the list of chunks to try removing from :param direction: either "next" if we are going to move from the end to the subsequent chunk, or "prev" if we are going to move from the start into the previous chunk. :returns bool: Did a chunk get moved?""" source_chunk = chunks[i] if direction == "next": target_chunk = chunks[i+1] path_index = -1 move_func = lambda: target_chunk.appendleft(source_chunk.pop()) elif direction == "prev": target_chunk = chunks[i-1] path_index = 0 move_func = lambda: target_chunk.append(source_chunk.popleft()) else: raise ValueError("Unexpected move direction %s" % direction) return self._maybe_move(source_chunk, target_chunk, path_index, move_func) def _maybe_add(self, chunks, i, direction): """Trial adding a chunk from one chunk to an adjacent one. :param chunks: - the list of all chunks :param i: - the chunk index in the list of chunks to try adding to :param direction: either "next" if we are going to remove from the the subsequent chunk, or "prev" if we are going to remove from the the previous chunk. :returns bool: Did a chunk get moved?""" target_chunk = chunks[i] if direction == "next": source_chunk = chunks[i+1] path_index = 0 move_func = lambda: target_chunk.append(source_chunk.popleft()) elif direction == "prev": source_chunk = chunks[i-1] path_index = -1 move_func = lambda: target_chunk.appendleft(source_chunk.pop()) else: raise ValueError("Unexpected move direction %s" % direction) return self._maybe_move(source_chunk, target_chunk, path_index, move_func) def _maybe_move(self, source_chunk, target_chunk, path_index, move_func): """Move from one chunk to another, assess the change in badness, and keep the move iff it decreases the badness score. :param source_chunk: chunk to move from :param target_chunk: chunk to move to :param path_index: 0 if we are moving from the start or -1 if we are moving from the end :param move_func: Function that actually moves between chunks""" if len(source_chunk.paths) <= 1: return False move_time = source_chunk.paths[path_index].time new_source_badness = self._badness(source_chunk.time - move_time) new_target_badness = self._badness(target_chunk.time + move_time) delta_badness = ((new_source_badness + new_target_badness) - (source_chunk.badness + target_chunk.badness)) if delta_badness < 0: move_func() return True return False def _badness(self, time): """Metric of badness for a specific chunk :param time: the time for a specific chunk""" return (time - self.expected_time)**2 def _get_chunk(self, manifest_items): by_dir, total_time = self._group_by_directory(manifest_items) if len(by_dir) < self.total_chunks: raise ValueError("Tried to split into %i chunks, but only %i subdirectories included" % ( self.total_chunks, len(by_dir))) self.expected_time = float(total_time) / self.total_chunks chunks = self._create_initial_chunks(by_dir) while True: # Move a test from one chunk to the next until doing so no longer # reduces the badness got_improvement = self._update_chunks(chunks) if not got_improvement: break self.logger.debug(self.expected_time) for i, chunk in chunks.iteritems(): self.logger.debug("%i: %i, %i" % (i + 1, chunk.time, chunk.badness)) assert self._all_tests(by_dir) == self._chunked_tests(chunks) return self._get_tests(chunks) @staticmethod def _all_tests(by_dir): """Return a set of all tests in the manifest from a grouping by directory""" return set(x[0] for item in by_dir.itervalues() for x in item.tests) @staticmethod def _chunked_tests(chunks): """Return a set of all tests in the manifest from the chunk list""" return set(x[0] for chunk in chunks.itervalues() for path in chunk.paths for x in path.tests) def _create_initial_chunks(self, by_dir): """Create an initial unbalanced list of chunks. :param by_dir: All tests in the manifest grouped by subdirectory :returns list: A list of Chunk objects""" class Chunk(object): def __init__(self, paths, index): """List of PathData objects that together form a single chunk of tests""" self.paths = deque(paths) self.time = sum(item.time for item in paths) self.index = index def appendleft(self, path): """Add a PathData object to the start of the chunk""" self.paths.appendleft(path) self.time += path.time def append(self, path): """Add a PathData object to the end of the chunk""" self.paths.append(path) self.time += path.time def pop(self): """Remove PathData object from the end of the chunk""" assert len(self.paths) > 1 self.time -= self.paths[-1].time return self.paths.pop() def popleft(self): """Remove PathData object from the start of the chunk""" assert len(self.paths) > 1 self.time -= self.paths[0].time return self.paths.popleft() @property def badness(self_): """Badness metric for this chunk""" return self._badness(self_.time) initial_size = len(by_dir) / self.total_chunks chunk_boundaries = [initial_size * i for i in xrange(self.total_chunks)] + [len(by_dir)] chunks = OrderedDict() for i, lower in enumerate(chunk_boundaries[:-1]): upper = chunk_boundaries[i + 1] paths = by_dir.values()[lower:upper] chunks[i] = Chunk(paths, i) assert self._all_tests(by_dir) == self._chunked_tests(chunks) return chunks def _update_chunks(self, chunks): """Run a single iteration of the chunk update algorithm. :param chunks: - List of chunks """ #TODO: consider replacing this with a heap sorted_chunks = sorted(chunks.values(), key=lambda x:-x.badness) got_improvement = False for chunk in sorted_chunks: if chunk.time < self.expected_time: f = self._maybe_add else: f = self._maybe_remove if chunk.index == 0: order = ["next"] elif chunk.index == self.total_chunks - 1: order = ["prev"] else: if chunk.time < self.expected_time: # First try to add a test from the neighboring chunk with the # greatest total time if chunks[chunk.index + 1].time > chunks[chunk.index - 1].time: order = ["next", "prev"] else: order = ["prev", "next"] else: # First try to remove a test and add to the neighboring chunk with the # lowest total time if chunks[chunk.index + 1].time > chunks[chunk.index - 1].time: order = ["prev", "next"] else: order = ["next", "prev"] for direction in order: if f(chunks, chunk.index, direction): got_improvement = True break if got_improvement: break return got_improvement def _get_tests(self, chunks): """Return the list of tests corresponding to the chunk number we are running. :param chunks: List of chunks""" tests = [] for path in chunks[self.chunk_number - 1].paths: tests.extend(path.tests) return tests def __call__(self, manifest_iter): manifest = list(manifest_iter) tests = self._get_chunk(manifest) for item in tests: yield item class TestFilter(object): def __init__(self, test_manifests, include=None, exclude=None, manifest_path=None): if manifest_path is not None and include is None: self.manifest = manifestinclude.get_manifest(manifest_path) else: self.manifest = manifestinclude.IncludeManifest.create() if include: self.manifest.set("skip", "true") for item in include: self.manifest.add_include(test_manifests, item) if exclude: for item in exclude: self.manifest.add_exclude(test_manifests, item) def __call__(self, manifest_iter): for test_path, tests in manifest_iter: include_tests = set() for test in tests: if self.manifest.include(test): include_tests.add(test) if include_tests: yield test_path, include_tests class TagFilter(object): def __init__(self, tags): self.tags = set(tags) def __call__(self, test_iter): for test in test_iter: if test.tags & self.tags: yield test class ManifestLoader(object): def __init__(self, test_paths, force_manifest_update=False): do_delayed_imports() self.test_paths = test_paths self.force_manifest_update = force_manifest_update self.logger = structured.get_default_logger() if self.logger is None: self.logger = structured.structuredlog.StructuredLogger("ManifestLoader") def load(self): rv = {} for url_base, paths in self.test_paths.iteritems(): manifest_file = self.load_manifest(url_base=url_base, **paths) path_data = {"url_base": url_base} path_data.update(paths) rv[manifest_file] = path_data return rv def create_manifest(self, manifest_path, tests_path, url_base="/"): self.update_manifest(manifest_path, tests_path, url_base, recreate=True) def update_manifest(self, manifest_path, tests_path, url_base="/", recreate=False): self.logger.info("Updating test manifest %s" % manifest_path) json_data = None if not recreate: try: with open(manifest_path) as f: json_data = json.load(f) except IOError: #If the existing file doesn't exist just create one from scratch pass if not json_data: manifest_file = manifest.Manifest(None, url_base) else: try: manifest_file = manifest.Manifest.from_json(tests_path, json_data) except manifest.ManifestVersionMismatch: manifest_file = manifest.Manifest(None, url_base) manifest_update.update(tests_path, url_base, manifest_file) manifest.write(manifest_file, manifest_path) def load_manifest(self, tests_path, metadata_path, url_base="/"): manifest_path = os.path.join(metadata_path, "MANIFEST.json") if (not os.path.exists(manifest_path) or self.force_manifest_update): self.update_manifest(manifest_path, tests_path, url_base) manifest_file = manifest.load(tests_path, manifest_path) if manifest_file.url_base != url_base: self.logger.info("Updating url_base in manifest from %s to %s" % (manifest_file.url_base, url_base)) manifest_file.url_base = url_base manifest.write(manifest_file, manifest_path) return manifest_file def iterfilter(filters, iter): for f in filters: iter = f(iter) for item in iter: yield item class TestLoader(object): def __init__(self, test_manifests, test_types, run_info, manifest_filters=None, meta_filters=None, chunk_type="none", total_chunks=1, chunk_number=1, include_https=True): self.test_types = test_types self.run_info = run_info self.manifest_filters = manifest_filters if manifest_filters is not None else [] self.meta_filters = meta_filters if meta_filters is not None else [] self.manifests = test_manifests self.tests = None self.disabled_tests = None self.include_https = include_https self.chunk_type = chunk_type self.total_chunks = total_chunks self.chunk_number = chunk_number self.chunker = {"none": Unchunked, "hash": HashChunker, "dir_hash": DirectoryHashChunker, "equal_time": EqualTimeChunker}[chunk_type](total_chunks, chunk_number) self._test_ids = None self.directory_manifests = {} self._load_tests() @property def test_ids(self): if self._test_ids is None: self._test_ids = [] for test_dict in [self.disabled_tests, self.tests]: for test_type in self.test_types: self._test_ids += [item.id for item in test_dict[test_type]] return self._test_ids def get_test(self, manifest_test, inherit_metadata, test_metadata): if test_metadata is not None: inherit_metadata.append(test_metadata) test_metadata = test_metadata.get_test(manifest_test.id) return wpttest.from_manifest(manifest_test, inherit_metadata, test_metadata) def load_dir_metadata(self, test_manifest, metadata_path, test_path): rv = [] path_parts = os.path.dirname(test_path).split(os.path.sep) for i in xrange(1,len(path_parts) + 1): path = os.path.join(os.path.sep.join(path_parts[:i]), "__dir__.ini") if path not in self.directory_manifests: self.directory_manifests[path] = manifestexpected.get_dir_manifest( metadata_path, path, self.run_info) manifest = self.directory_manifests[path] if manifest is not None: rv.append(manifest) return rv def load_metadata(self, test_manifest, metadata_path, test_path): inherit_metadata = self.load_dir_metadata(test_manifest, metadata_path, test_path) test_metadata = manifestexpected.get_manifest( metadata_path, test_path, test_manifest.url_base, self.run_info) return inherit_metadata, test_metadata def iter_tests(self): manifest_items = [] for manifest in sorted(self.manifests.keys(), key=lambda x:x.url_base): manifest_iter = iterfilter(self.manifest_filters, manifest.itertypes(*self.test_types)) manifest_items.extend(manifest_iter) if self.chunker is not None: manifest_items = self.chunker(manifest_items) for test_path, tests in manifest_items: manifest_file = iter(tests).next().manifest metadata_path = self.manifests[manifest_file]["metadata_path"] inherit_metadata, test_metadata = self.load_metadata(manifest_file, metadata_path, test_path) for test in iterfilter(self.meta_filters, self.iter_wpttest(inherit_metadata, test_metadata, tests)): yield test_path, test.test_type, test def iter_wpttest(self, inherit_metadata, test_metadata, tests): for manifest_test in tests: yield self.get_test(manifest_test, inherit_metadata, test_metadata) def _load_tests(self): """Read in the tests from the manifest file and add them to a queue""" tests = {"enabled":defaultdict(list), "disabled":defaultdict(list)} for test_path, test_type, test in self.iter_tests(): enabled = not test.disabled() if not self.include_https and test.environment["protocol"] == "https": enabled = False key = "enabled" if enabled else "disabled" tests[key][test_type].append(test) self.tests = tests["enabled"] self.disabled_tests = tests["disabled"] def groups(self, test_types, chunk_type="none", total_chunks=1, chunk_number=1): groups = set() for test_type in test_types: for test in self.tests[test_type]: group = test.url.split("/")[1] groups.add(group) return groups class TestSource(object): __metaclass__ = ABCMeta @abstractmethod def queue_tests(self, test_queue): pass @abstractmethod def requeue_test(self, test): pass def __enter__(self): return self def __exit__(self, *args, **kwargs): pass class SingleTestSource(TestSource): def __init__(self, test_queue): self.test_queue = test_queue @classmethod def queue_tests(cls, test_queue, test_type, tests): for test in tests[test_type]: test_queue.put(test) def get_queue(self): if self.test_queue.empty(): return None return self.test_queue def requeue_test(self, test): self.test_queue.put(test) class PathGroupedSource(TestSource): def __init__(self, test_queue): self.test_queue = test_queue self.current_queue = None @classmethod def queue_tests(cls, test_queue, test_type, tests, depth=None): if depth is True: depth = None prev_path = None group = None for test in tests[test_type]: path = urlparse.urlsplit(test.url).path.split("/")[1:-1][:depth] if path != prev_path: group = [] test_queue.put(group) prev_path = path group.append(test) def get_queue(self): if not self.current_queue or self.current_queue.empty(): try: data = self.test_queue.get(block=True, timeout=1) self.current_queue = Queue() for item in data: self.current_queue.put(item) except Empty: return None return self.current_queue def requeue_test(self, test): self.current_queue.put(test) def __exit__(self, *args, **kwargs): if self.current_queue: self.current_queue.close()