diff options
Diffstat (limited to 'js/src/tests/lib/tasks_win.py')
-rw-r--r-- | js/src/tests/lib/tasks_win.py | 135 |
1 files changed, 135 insertions, 0 deletions
diff --git a/js/src/tests/lib/tasks_win.py b/js/src/tests/lib/tasks_win.py new file mode 100644 index 000000000..7a4540c60 --- /dev/null +++ b/js/src/tests/lib/tasks_win.py @@ -0,0 +1,135 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. */ +from __future__ import print_function, unicode_literals, division + +import subprocess +import sys +from datetime import datetime, timedelta +from progressbar import ProgressBar +from results import NullTestOutput, TestOutput, escape_cmdline +from threading import Thread +from Queue import Queue, Empty + + +class EndMarker: + pass + + +class TaskFinishedMarker: + pass + + +def _do_work(qTasks, qResults, qWatch, prefix, run_skipped, timeout, show_cmd): + while True: + test = qTasks.get(block=True, timeout=sys.maxint) + if test is EndMarker: + qWatch.put(EndMarker) + qResults.put(EndMarker) + return + + if not test.enable and not run_skipped: + qResults.put(NullTestOutput(test)) + continue + + # Spawn the test task. + cmd = test.get_command(prefix) + if show_cmd: + print(escape_cmdline(cmd)) + tStart = datetime.now() + proc = subprocess.Popen(cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + # Push the task to the watchdog -- it will kill the task + # if it goes over the timeout while we keep its stdout + # buffer clear on the "main" worker thread. + qWatch.put(proc) + out, err = proc.communicate() + qWatch.put(TaskFinishedMarker) + + # Create a result record and forward to result processing. + dt = datetime.now() - tStart + result = TestOutput(test, cmd, out, err, proc.returncode, dt.total_seconds(), + dt > timedelta(seconds=timeout)) + qResults.put(result) + + +def _do_watch(qWatch, timeout): + while True: + proc = qWatch.get(True) + if proc == EndMarker: + return + try: + fin = qWatch.get(block=True, timeout=timeout) + assert fin is TaskFinishedMarker, "invalid finish marker" + except Empty: + # Timed out, force-kill the test. + try: + proc.terminate() + except WindowsError as ex: + # If the process finishes after we time out but before we + # terminate, the terminate call will fail. We can safely + # ignore this. + if ex.winerror != 5: + raise + fin = qWatch.get(block=True, timeout=sys.maxint) + assert fin is TaskFinishedMarker, "invalid finish marker" + + +def run_all_tests(tests, prefix, pb, options): + """ + Uses scatter-gather to a thread-pool to manage children. + """ + qTasks, qResults = Queue(), Queue() + + workers = [] + watchdogs = [] + for _ in range(options.worker_count): + qWatch = Queue() + watcher = Thread(target=_do_watch, args=(qWatch, options.timeout)) + watcher.setDaemon(True) + watcher.start() + watchdogs.append(watcher) + worker = Thread(target=_do_work, args=(qTasks, qResults, qWatch, + prefix, options.run_skipped, + options.timeout, options.show_cmd)) + worker.setDaemon(True) + worker.start() + workers.append(worker) + + # Insert all jobs into the queue, followed by the queue-end + # marker, one per worker. This will not block on growing the + # queue, only on waiting for more items in the generator. The + # workers are already started, however, so this will process as + # fast as we can produce tests from the filesystem. + def _do_push(num_workers, qTasks): + for test in tests: + qTasks.put(test) + for _ in range(num_workers): + qTasks.put(EndMarker) + pusher = Thread(target=_do_push, args=(len(workers), qTasks)) + pusher.setDaemon(True) + pusher.start() + + # Read from the results. + ended = 0 + delay = ProgressBar.update_granularity().total_seconds() + while ended < len(workers): + try: + result = qResults.get(block=True, timeout=delay) + if result is EndMarker: + ended += 1 + else: + yield result + except Empty: + pb.poke() + + # Cleanup and exit. + pusher.join() + for worker in workers: + worker.join() + for watcher in watchdogs: + watcher.join() + assert qTasks.empty(), "Send queue not drained" + assert qResults.empty(), "Result queue not drained" |