https://github.com/mozilla/gecko-dev
Raw File
Tip revision: 85438d95f5da0b235d9cd6521be81e75909a8f37 authored by ffxbld on 03 September 2024, 17:18:17 UTC
No Bug, mozilla-esr115 repo-update HSTS HPKP remote-settings tld-suffixes - a=diannaS
Tip revision: 85438d9
taskpool.py
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.

# flake8: noqa: F821

import fcntl
import os
import select
import time
from subprocess import PIPE, Popen


class TaskPool(object):
    # Run a series of subprocesses. Try to keep up to a certain number going in
    # parallel at any given time. Enforce time limits.
    #
    # This is implemented using non-blocking I/O, and so is Unix-specific.
    #
    # We assume that, if a task closes its standard error, then it's safe to
    # wait for it to terminate. So an ill-behaved task that closes its standard
    # output and then hangs will hang us, as well. However, as it takes special
    # effort to close one's standard output, this seems unlikely to be a
    # problem in practice.

    # A task we should run in a subprocess. Users should subclass this and
    # fill in the methods as given.
    class Task(object):
        def __init__(self):
            self.pipe = None
            self.start_time = None

        # Record that this task is running, with |pipe| as its Popen object,
        # and should time out at |deadline|.
        def start(self, pipe, deadline):
            self.pipe = pipe
            self.deadline = deadline

        # Return a shell command (a string or sequence of arguments) to be
        # passed to Popen to run the task. The command will be given
        # /dev/null as its standard input, and pipes as its standard output
        # and error.
        def cmd(self):
            raise NotImplementedError

        # TaskPool calls this method to report that the process wrote
        # |string| to its standard output.
        def onStdout(self, string):
            raise NotImplementedError

        # TaskPool calls this method to report that the process wrote
        # |string| to its standard error.
        def onStderr(self, string):
            raise NotImplementedError

        # TaskPool calls this method to report that the process terminated,
        # yielding |returncode|.
        def onFinished(self, returncode):
            raise NotImplementedError

        # TaskPool calls this method to report that the process timed out and
        # was killed.
        def onTimeout(self):
            raise NotImplementedError

    # If a task output handler (onStdout, onStderr) throws this, we terminate
    # the task.
    class TerminateTask(Exception):
        pass

    def __init__(self, tasks, cwd=".", job_limit=4, timeout=150):
        self.pending = iter(tasks)
        self.cwd = cwd
        self.job_limit = job_limit
        self.timeout = timeout
        self.next_pending = next(self.pending, None)

    def run_all(self):
        # The currently running tasks: a set of Task instances.
        running = set()
        with open(os.devnull, "r") as devnull:
            while True:
                while len(running) < self.job_limit and self.next_pending:
                    task = self.next_pending
                    p = Popen(
                        task.cmd(),
                        bufsize=16384,
                        stdin=devnull,
                        stdout=PIPE,
                        stderr=PIPE,
                        cwd=self.cwd,
                    )

                    # Put the stdout and stderr pipes in non-blocking mode. See
                    # the post-'select' code below for details.
                    flags = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
                    fcntl.fcntl(p.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK)
                    flags = fcntl.fcntl(p.stderr, fcntl.F_GETFL)
                    fcntl.fcntl(p.stderr, fcntl.F_SETFL, flags | os.O_NONBLOCK)

                    task.start(p, time.time() + self.timeout)
                    running.add(task)
                    self.next_pending = next(self.pending, None)

                # If we have no tasks running, and the above wasn't able to
                # start any new ones, then we must be done!
                if not running:
                    break

                # How many seconds do we have until the earliest deadline?
                now = time.time()
                secs_to_next_deadline = max(min([t.deadline for t in running]) - now, 0)

                # Wait for output or a timeout.
                stdouts_and_stderrs = [t.pipe.stdout for t in running] + [
                    t.pipe.stderr for t in running
                ]
                (readable, w, x) = select.select(
                    stdouts_and_stderrs, [], [], secs_to_next_deadline
                )
                finished = set()
                terminate = set()
                for t in running:
                    # Since we've placed the pipes in non-blocking mode, these
                    # 'read's will simply return as many bytes as are available,
                    # rather than blocking until they have accumulated the full
                    # amount requested (or reached EOF). The 'read's should
                    # never throw, since 'select' has told us there was
                    # something available.
                    if t.pipe.stdout in readable:
                        output = t.pipe.stdout.read(16384)
                        if len(output):
                            try:
                                t.onStdout(output.decode("utf-8"))
                            except TerminateTask:
                                terminate.add(t)
                    if t.pipe.stderr in readable:
                        output = t.pipe.stderr.read(16384)
                        if len(output):
                            try:
                                t.onStderr(output.decode("utf-8"))
                            except TerminateTask:
                                terminate.add(t)
                        else:
                            # We assume that, once a task has closed its stderr,
                            # it will soon terminate. If a task closes its
                            # stderr and then hangs, we'll hang too, here.
                            t.pipe.wait()
                            t.onFinished(t.pipe.returncode)
                            finished.add(t)
                # Remove the finished tasks from the running set. (Do this here
                # to avoid mutating the set while iterating over it.)
                running -= finished

                # Terminate any tasks whose handlers have asked us to do so.
                for t in terminate:
                    t.pipe.terminate()
                    t.pipe.wait()
                    running.remove(t)

                # Terminate any tasks which have missed their deadline.
                finished = set()
                for t in running:
                    if now >= t.deadline:
                        t.pipe.terminate()
                        t.pipe.wait()
                        t.onTimeout()
                        finished.add(t)
                # Remove the finished tasks from the running set. (Do this here
                # to avoid mutating the set while iterating over it.)
                running -= finished
        return None


def get_cpu_count():
    """
    Guess at a reasonable parallelism count to set as the default for the
    current machine and run.
    """
    # Python 2.6+
    try:
        import multiprocessing

        return multiprocessing.cpu_count()
    except (ImportError, NotImplementedError):
        pass

    # POSIX
    try:
        res = int(os.sysconf("SC_NPROCESSORS_ONLN"))
        if res > 0:
            return res
    except (AttributeError, ValueError):
        pass

    # Windows
    try:
        res = int(os.environ["NUMBER_OF_PROCESSORS"])
        if res > 0:
            return res
    except (KeyError, ValueError):
        pass

    return 1


if __name__ == "__main__":
    # Test TaskPool by using it to implement the unique 'sleep sort' algorithm.
    def sleep_sort(ns, timeout):
        sorted = []

        class SortableTask(TaskPool.Task):
            def __init__(self, n):
                super(SortableTask, self).__init__()
                self.n = n

            def start(self, pipe, deadline):
                super(SortableTask, self).start(pipe, deadline)

            def cmd(self):
                return ["sh", "-c", "echo out; sleep %d; echo err>&2" % (self.n,)]

            def onStdout(self, text):
                print("%d stdout: %r" % (self.n, text))

            def onStderr(self, text):
                print("%d stderr: %r" % (self.n, text))

            def onFinished(self, returncode):
                print("%d (rc=%d)" % (self.n, returncode))
                sorted.append(self.n)

            def onTimeout(self):
                print("%d timed out" % (self.n,))

        p = TaskPool([SortableTask(_) for _ in ns], job_limit=len(ns), timeout=timeout)
        p.run_all()
        return sorted

    print(repr(sleep_sort([1, 1, 2, 3, 5, 8, 13, 21, 34], 15)))
back to top