https://github.com/mozilla/gecko-dev
Raw File
Tip revision: 171d4ac7b5c529f27694705c7f65eeffbd380725 authored by ffxbld on 27 October 2015, 00:02:27 UTC
Added FIREFOX_42_0_RELEASE FIREFOX_42_0_BUILD1 tag(s) for changeset b1a68883552e. DONTBUILD CLOSED TREE a=release
Tip revision: 171d4ac
taskpool.py
import fcntl, os, select, time
from subprocess import Popen, PIPE

# Run a series of subprocesses. Try to keep up to a certain number going in
# parallel at any given time. Enforce time limits.
#
# This is implemented using non-blocking I/O, and so is Unix-specific.
#
# We assume that, if a task closes its standard error, then it's safe to
# wait for it to terminate. So an ill-behaved task that closes its standard
# output and then hangs will hang us, as well. However, as it takes special
# effort to close one's standard output, this seems unlikely to be a
# problem in practice.
class TaskPool(object):

    # A task we should run in a subprocess. Users should subclass this and
    # fill in the methods as given.
    class Task(object):
        def __init__(self):
            self.pipe = None
            self.start_time = None

        # Record that this task is running, with |pipe| as its Popen object,
        # and should time out at |deadline|.
        def start(self, pipe, deadline):
            self.pipe = pipe
            self.deadline = deadline

        # Return a shell command (a string or sequence of arguments) to be
        # passed to Popen to run the task. The command will be given
        # /dev/null as its standard input, and pipes as its standard output
        # and error.
        def cmd(self):
            raise NotImplementedError

        # TaskPool calls this method to report that the process wrote
        # |string| to its standard output.
        def onStdout(self, string):
            raise NotImplementedError

        # TaskPool calls this method to report that the process wrote
        # |string| to its standard error.
        def onStderr(self, string):
            raise NotImplementedError

        # TaskPool calls this method to report that the process terminated,
        # yielding |returncode|.
        def onFinished(self, returncode):
            raise NotImplementedError

        # TaskPool calls this method to report that the process timed out and
        # was killed.
        def onTimeout(self):
            raise NotImplementedError

    # If a task output handler (onStdout, onStderr) throws this, we terminate
    # the task.
    class TerminateTask(Exception):
        pass

    def __init__(self, tasks, cwd='.', job_limit=4, timeout=150):
        self.pending = iter(tasks)
        self.cwd = cwd
        self.job_limit = job_limit
        self.timeout = timeout
        self.next_pending = self.get_next_pending()

    # Set self.next_pending to the next task that has not yet been executed.
    def get_next_pending(self):
        try:
            return self.pending.next()
        except StopIteration:
            return None

    def run_all(self):
        # The currently running tasks: a set of Task instances.
        running = set()
        with open(os.devnull, 'r') as devnull:
            while True:
                while len(running) < self.job_limit and self.next_pending:
                    t = self.next_pending
                    p = Popen(t.cmd(), bufsize=16384,
                              stdin=devnull, stdout=PIPE, stderr=PIPE,
                              cwd=self.cwd)

                    # Put the stdout and stderr pipes in non-blocking mode. See
                    # the post-'select' code below for details.
                    flags = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
                    fcntl.fcntl(p.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK)
                    flags = fcntl.fcntl(p.stderr, fcntl.F_GETFL)
                    fcntl.fcntl(p.stderr, fcntl.F_SETFL, flags | os.O_NONBLOCK)

                    t.start(p, time.time() + self.timeout)
                    running.add(t)
                    self.next_pending = self.get_next_pending()

                # If we have no tasks running, and the above wasn't able to
                # start any new ones, then we must be done!
                if not running:
                    break

                # How many seconds do we have until the earliest deadline?
                now = time.time()
                secs_to_next_deadline = max(min([t.deadline for t in running]) - now, 0)

                # Wait for output or a timeout.
                stdouts_and_stderrs = ([t.pipe.stdout for t in running]
                                     + [t.pipe.stderr for t in running])
                (readable,w,x) = select.select(stdouts_and_stderrs, [], [], secs_to_next_deadline)
                finished = set()
                terminate = set()
                for t in running:
                    # Since we've placed the pipes in non-blocking mode, these
                    # 'read's will simply return as many bytes as are available,
                    # rather than blocking until they have accumulated the full
                    # amount requested (or reached EOF). The 'read's should
                    # never throw, since 'select' has told us there was
                    # something available.
                    if t.pipe.stdout in readable:
                        output = t.pipe.stdout.read(16384)
                        if output != "":
                            try:
                                t.onStdout(output)
                            except TerminateTask:
                                terminate.add(t)
                    if t.pipe.stderr in readable:
                        output = t.pipe.stderr.read(16384)
                        if output != "":
                            try:
                                t.onStderr(output)
                            except TerminateTask:
                                terminate.add(t)
                        else:
                            # We assume that, once a task has closed its stderr,
                            # it will soon terminate. If a task closes its
                            # stderr and then hangs, we'll hang too, here.
                            t.pipe.wait()
                            t.onFinished(t.pipe.returncode)
                            finished.add(t)
                # Remove the finished tasks from the running set. (Do this here
                # to avoid mutating the set while iterating over it.)
                running -= finished

                # Terminate any tasks whose handlers have asked us to do so.
                for t in terminate:
                    t.pipe.terminate()
                    t.pipe.wait()
                    running.remove(t)

                # Terminate any tasks which have missed their deadline.
                finished = set()
                for t in running:
                    if now >= t.deadline:
                        t.pipe.terminate()
                        t.pipe.wait()
                        t.onTimeout()
                        finished.add(t)
                # Remove the finished tasks from the running set. (Do this here
                # to avoid mutating the set while iterating over it.)
                running -= finished
        return None

def get_cpu_count():
    """
    Guess at a reasonable parallelism count to set as the default for the
    current machine and run.
    """
    # Python 2.6+
    try:
        import multiprocessing
        return multiprocessing.cpu_count()
    except (ImportError,NotImplementedError):
        pass

    # POSIX
    try:
        res = int(os.sysconf('SC_NPROCESSORS_ONLN'))
        if res > 0:
            return res
    except (AttributeError,ValueError):
        pass

    # Windows
    try:
        res = int(os.environ['NUMBER_OF_PROCESSORS'])
        if res > 0:
            return res
    except (KeyError, ValueError):
        pass

    return 1

if __name__ == '__main__':
    # Test TaskPool by using it to implement the unique 'sleep sort' algorithm.
    def sleep_sort(ns, timeout):
        sorted=[]
        class SortableTask(TaskPool.Task):
            def __init__(self, n):
                super(SortableTask, self).__init__()
                self.n = n
            def start(self, pipe, deadline):
                super(SortableTask, self).start(pipe, deadline)
            def cmd(self):
                return ['sh', '-c', 'echo out; sleep %d; echo err>&2' % (self.n,)]
            def onStdout(self, text):
                print '%d stdout: %r' % (self.n, text)
            def onStderr(self, text):
                print '%d stderr: %r' % (self.n, text)
            def onFinished(self, returncode):
                print '%d (rc=%d)' % (self.n, returncode)
                sorted.append(self.n)
            def onTimeout(self):
                print '%d timed out' % (self.n,)

        p = TaskPool([SortableTask(_) for _ in ns], job_limit=len(ns), timeout=timeout)
        p.run_all()
        return sorted

    print repr(sleep_sort([1,1,2,3,5,8,13,21,34], 15))
back to top