summaryrefslogtreecommitdiffstats
path: root/js/src/gdb/taskpool.py
diff options
context:
space:
mode:
Diffstat (limited to 'js/src/gdb/taskpool.py')
-rw-r--r--js/src/gdb/taskpool.py219
1 files changed, 219 insertions, 0 deletions
diff --git a/js/src/gdb/taskpool.py b/js/src/gdb/taskpool.py
new file mode 100644
index 000000000..8bfcb3a34
--- /dev/null
+++ b/js/src/gdb/taskpool.py
@@ -0,0 +1,219 @@
+import fcntl, os, select, time
+from subprocess import Popen, PIPE
+
+# Run a series of subprocesses. Try to keep up to a certain number going in
+# parallel at any given time. Enforce time limits.
+#
+# This is implemented using non-blocking I/O, and so is Unix-specific.
+#
+# We assume that, if a task closes its standard error, then it's safe to
+# wait for it to terminate. So an ill-behaved task that closes its standard
+# output and then hangs will hang us, as well. However, as it takes special
+# effort to close one's standard output, this seems unlikely to be a
+# problem in practice.
+class TaskPool(object):
+
+ # A task we should run in a subprocess. Users should subclass this and
+ # fill in the methods as given.
+ class Task(object):
+ def __init__(self):
+ self.pipe = None
+ self.start_time = None
+
+ # Record that this task is running, with |pipe| as its Popen object,
+ # and should time out at |deadline|.
+ def start(self, pipe, deadline):
+ self.pipe = pipe
+ self.deadline = deadline
+
+ # Return a shell command (a string or sequence of arguments) to be
+ # passed to Popen to run the task. The command will be given
+ # /dev/null as its standard input, and pipes as its standard output
+ # and error.
+ def cmd(self):
+ raise NotImplementedError
+
+ # TaskPool calls this method to report that the process wrote
+ # |string| to its standard output.
+ def onStdout(self, string):
+ raise NotImplementedError
+
+ # TaskPool calls this method to report that the process wrote
+ # |string| to its standard error.
+ def onStderr(self, string):
+ raise NotImplementedError
+
+ # TaskPool calls this method to report that the process terminated,
+ # yielding |returncode|.
+ def onFinished(self, returncode):
+ raise NotImplementedError
+
+ # TaskPool calls this method to report that the process timed out and
+ # was killed.
+ def onTimeout(self):
+ raise NotImplementedError
+
+ # If a task output handler (onStdout, onStderr) throws this, we terminate
+ # the task.
+ class TerminateTask(Exception):
+ pass
+
+ def __init__(self, tasks, cwd='.', job_limit=4, timeout=150):
+ self.pending = iter(tasks)
+ self.cwd = cwd
+ self.job_limit = job_limit
+ self.timeout = timeout
+ self.next_pending = self.get_next_pending()
+
+ # Set self.next_pending to the next task that has not yet been executed.
+ def get_next_pending(self):
+ try:
+ return self.pending.next()
+ except StopIteration:
+ return None
+
+ def run_all(self):
+ # The currently running tasks: a set of Task instances.
+ running = set()
+ with open(os.devnull, 'r') as devnull:
+ while True:
+ while len(running) < self.job_limit and self.next_pending:
+ t = self.next_pending
+ p = Popen(t.cmd(), bufsize=16384,
+ stdin=devnull, stdout=PIPE, stderr=PIPE,
+ cwd=self.cwd)
+
+ # Put the stdout and stderr pipes in non-blocking mode. See
+ # the post-'select' code below for details.
+ flags = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
+ fcntl.fcntl(p.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+ flags = fcntl.fcntl(p.stderr, fcntl.F_GETFL)
+ fcntl.fcntl(p.stderr, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+
+ t.start(p, time.time() + self.timeout)
+ running.add(t)
+ self.next_pending = self.get_next_pending()
+
+ # If we have no tasks running, and the above wasn't able to
+ # start any new ones, then we must be done!
+ if not running:
+ break
+
+ # How many seconds do we have until the earliest deadline?
+ now = time.time()
+ secs_to_next_deadline = max(min([t.deadline for t in running]) - now, 0)
+
+ # Wait for output or a timeout.
+ stdouts_and_stderrs = ([t.pipe.stdout for t in running]
+ + [t.pipe.stderr for t in running])
+ (readable,w,x) = select.select(stdouts_and_stderrs, [], [], secs_to_next_deadline)
+ finished = set()
+ terminate = set()
+ for t in running:
+ # Since we've placed the pipes in non-blocking mode, these
+ # 'read's will simply return as many bytes as are available,
+ # rather than blocking until they have accumulated the full
+ # amount requested (or reached EOF). The 'read's should
+ # never throw, since 'select' has told us there was
+ # something available.
+ if t.pipe.stdout in readable:
+ output = t.pipe.stdout.read(16384)
+ if output != "":
+ try:
+ t.onStdout(output)
+ except TerminateTask:
+ terminate.add(t)
+ if t.pipe.stderr in readable:
+ output = t.pipe.stderr.read(16384)
+ if output != "":
+ try:
+ t.onStderr(output)
+ except TerminateTask:
+ terminate.add(t)
+ else:
+ # We assume that, once a task has closed its stderr,
+ # it will soon terminate. If a task closes its
+ # stderr and then hangs, we'll hang too, here.
+ t.pipe.wait()
+ t.onFinished(t.pipe.returncode)
+ finished.add(t)
+ # Remove the finished tasks from the running set. (Do this here
+ # to avoid mutating the set while iterating over it.)
+ running -= finished
+
+ # Terminate any tasks whose handlers have asked us to do so.
+ for t in terminate:
+ t.pipe.terminate()
+ t.pipe.wait()
+ running.remove(t)
+
+ # Terminate any tasks which have missed their deadline.
+ finished = set()
+ for t in running:
+ if now >= t.deadline:
+ t.pipe.terminate()
+ t.pipe.wait()
+ t.onTimeout()
+ finished.add(t)
+ # Remove the finished tasks from the running set. (Do this here
+ # to avoid mutating the set while iterating over it.)
+ running -= finished
+ return None
+
+def get_cpu_count():
+ """
+ Guess at a reasonable parallelism count to set as the default for the
+ current machine and run.
+ """
+ # Python 2.6+
+ try:
+ import multiprocessing
+ return multiprocessing.cpu_count()
+ except (ImportError,NotImplementedError):
+ pass
+
+ # POSIX
+ try:
+ res = int(os.sysconf('SC_NPROCESSORS_ONLN'))
+ if res > 0:
+ return res
+ except (AttributeError,ValueError):
+ pass
+
+ # Windows
+ try:
+ res = int(os.environ['NUMBER_OF_PROCESSORS'])
+ if res > 0:
+ return res
+ except (KeyError, ValueError):
+ pass
+
+ return 1
+
+if __name__ == '__main__':
+ # Test TaskPool by using it to implement the unique 'sleep sort' algorithm.
+ def sleep_sort(ns, timeout):
+ sorted=[]
+ class SortableTask(TaskPool.Task):
+ def __init__(self, n):
+ super(SortableTask, self).__init__()
+ self.n = n
+ def start(self, pipe, deadline):
+ super(SortableTask, self).start(pipe, deadline)
+ def cmd(self):
+ return ['sh', '-c', 'echo out; sleep %d; echo err>&2' % (self.n,)]
+ def onStdout(self, text):
+ print '%d stdout: %r' % (self.n, text)
+ def onStderr(self, text):
+ print '%d stderr: %r' % (self.n, text)
+ def onFinished(self, returncode):
+ print '%d (rc=%d)' % (self.n, returncode)
+ sorted.append(self.n)
+ def onTimeout(self):
+ print '%d timed out' % (self.n,)
+
+ p = TaskPool([SortableTask(_) for _ in ns], job_limit=len(ns), timeout=timeout)
+ p.run_all()
+ return sorted
+
+ print repr(sleep_sort([1,1,2,3,5,8,13,21,34], 15))