https://forge.softwareheritage.org/source/swh-scheduler.git
Raw File
Tip revision: b6bc2f2ae26b6d0bb7768db8f2ffac2002867ca3 authored by Nicolas Dandrimont on 18 December 2018, 16:08:45 UTC
Explicitly register class-based tasks inheriting from our own class
Tip revision: b6bc2f2
swh-worker-control
#!/usr/bin/env python3

# Copyright (C) 2017  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import datetime
from fnmatch import fnmatch
from operator import itemgetter
import os
import sys

import click


def list_remote_workers(inspect):
    ping_replies = inspect.ping()
    if not ping_replies:
        return {}
    workers = list(sorted(ping_replies))
    ret = {}

    for worker_name in workers:
        if not worker_name.startswith('celery@'):
            print('Unsupported worker: %s' % worker_name, file=sys.stderr)
            continue
        type, host = worker_name[len('celery@'):].split('.', 1)
        worker = {
            'name': worker_name,
            'host': host,
            'type': type,
        }
        ret[worker_name] = worker

    return ret


def make_filters(filter_host, filter_type):
    """Parse the filters and create test functions"""

    def include(field, value):
        def filter(worker, field=field, value=value):
            return fnmatch(worker[field], value)
        return filter

    def exclude(field, value):
        def filter(worker, field=field, value=value):
            return not fnmatch(worker[field], value)
        return filter

    filters = []
    for host in filter_host:
        if host.startswith('-'):
            filters.append(exclude('host', host[1:]))
        else:
            filters.append(include('host', host))

    for type_ in filter_type:
        if type_.startswith('-'):
            filters.append(exclude('type', type_[1:]))
        else:
            filters.append(include('type', type_))

    return filters


def filter_workers(workers, filters):
    """Filter workers according to the set criteria"""
    return {name: worker
            for name, worker in workers.items()
            if all(check(worker) for check in filters)}


def get_clock_offsets(workers, inspect):
    """Add a clock_offset entry for each worker"""
    err_msg = 'Could not get monotonic clock for {worker}'

    t = datetime.datetime.now(tz=datetime.timezone.utc)
    for worker, clock in inspect._request('monotonic').items():
        monotonic = clock.get('monotonic')
        if monotonic is None:
            monotonic = 0
            click.echo(err_msg.format(worker=worker), err=True)
        dt = datetime.timedelta(seconds=monotonic)
        workers[worker]['clock_offset'] = t - dt


def worker_to_wallclock(worker, monotonic):
    """Convert a monotonic timestamp from a worker to a wall clock time"""
    dt = datetime.timedelta(seconds=monotonic)
    return worker['clock_offset'] + dt


@click.group()
@click.option('--instance-config', metavar='CONFIG', default=None,
              help='Use this worker instance configuration')
@click.option('--host', metavar='HOSTNAME_FILTER', multiple=True,
              help='Filter by hostname')
@click.option('--type', metavar='WORKER_TYPE_FILTER', multiple=True,
              help='Filter by worker type')
@click.option('--timeout', metavar='TIMEOUT', type=float, default=1.0,
              help='Timeout for remote control communication')
@click.option('--debug/--no-debug', default=False, help='Turn on debugging')
@click.pass_context
def cli(ctx, debug, timeout, instance_config, host, type):
    """Manage the Software Heritage workers

    Filters support globs; a filter starting with a "-" excludes the
    corresponding values.

    """
    if instance_config:
        os.environ['SWH_WORKER_INSTANCE'] = instance_config

    from swh.scheduler.celery_backend.config import app
    full_inspect = app.control.inspect(timeout=timeout)

    workers = filter_workers(
        list_remote_workers(full_inspect),
        make_filters(host, type)
    )
    ctx.obj['workers'] = workers

    destination = list(workers)
    inspect = app.control.inspect(destination=destination,
                                  timeout=timeout)
    ctx.obj['inspect'] = inspect

    get_clock_offsets(workers, inspect)

    ctx.obj['control'] = app.control
    ctx.obj['destination'] = destination
    ctx.obj['timeout'] = timeout
    ctx.obj['debug'] = debug


@cli.command()
@click.pass_context
def list_workers(ctx):
    """List the currently running workers"""
    workers = ctx.obj['workers']

    for worker_name, worker in sorted(workers.items()):
        click.echo("{type} alive on {host}".format(**worker))

    if not workers:
        sys.exit(2)


@cli.command()
@click.pass_context
def list_tasks(ctx):
    """List the tasks currently running on workers"""
    task_template = ('{worker} {name}'
                     '[{id} '
                     'started={started:%Y-%m-%mT%H:%M:%S} '
                     'pid={worker_pid}] {args} {kwargs}')
    inspect = ctx.obj['inspect']
    workers = ctx.obj['workers']
    active = inspect.active()

    if not active:
        click.echo('No reply from workers', err=True)
        sys.exit(2)

    has_tasks = False
    for worker_name, tasks in sorted(active.items()):
        worker = workers[worker_name]
        if not tasks:
            click.echo("No active tasks on {name}".format(**worker), err=True)
        for task in sorted(tasks, key=itemgetter('time_start')):
            task['started'] = worker_to_wallclock(worker, task['time_start'])
            click.echo(task_template.format(worker=worker_name, **task))
            has_tasks = True

    if not has_tasks:
        sys.exit(2)


@cli.command()
@click.pass_context
def list_queues(ctx):
    """List all the queues currently enabled on the workers"""
    inspect = ctx.obj['inspect']
    active = inspect.active_queues()

    if not active:
        click.echo('No reply from workers', err=True)
        sys.exit(2)

    has_queues = False
    for worker_name, queues in sorted(active.items()):
        queues = sorted(queue['name'] for queue in queues)
        if queues:
            click.echo('{worker} {queues}'.format(worker=worker_name,
                                                  queues=' '.join(queues)))
            has_queues = True
        else:
            click.echo('No queues for {worker}'.format(worker=worker_name),
                       err=True)

    if not has_queues:
        sys.exit(2)


@cli.command()
@click.option('--noop', is_flag=True, default=False, help='Do not proceed')
@click.argument('queues', nargs=-1)
@click.pass_context
def remove_queues(ctx, noop, queues):
    """Cancel the queue for the given workers"""
    msg_template = 'Canceling queue {queue} on worker {worker}{noop}'

    inspect = ctx.obj['inspect']
    control = ctx.obj['control']
    timeout = ctx.obj['timeout']
    active = inspect.active_queues()

    if not queues:
        queues = ['*']

    if not active:
        click.echo('No reply from workers', err=True)
        sys.exit(2)

    for worker, active_queues in sorted(active.items()):
        for queue in sorted(active_queues, key=itemgetter('name')):
            if any(fnmatch(queue['name'], name) for name in queues):
                msg = msg_template.format(queue=queue['name'], worker=worker,
                                          noop=' (noop)' if noop else '')
                click.echo(msg, err=True)
                if not noop:
                    control.cancel_consumer(queue['name'],
                                            destination=[worker],
                                            timeout=timeout)


@cli.command()
@click.option('--noop', is_flag=True, default=False, help='Do not proceed')
@click.argument('queues', nargs=-1)
@click.pass_context
def add_queues(ctx, noop, queues):
    """Start the queue for the given workers"""
    msg_template = 'Starting queue {queue} on worker {worker}{noop}'

    control = ctx.obj['control']
    timeout = ctx.obj['timeout']
    workers = ctx.obj['workers']

    if not workers:
        click.echo('No reply from workers', err=True)
        sys.exit(2)

    for worker in sorted(workers):
        for queue in queues:
            msg = msg_template.format(queue=queue, worker=worker,
                                      noop=' (noop)' if noop else '')
            click.echo(msg, err=True)
            if not noop:
                ret = control.add_consumer(queue,
                                           destination=[worker],
                                           timeout=timeout)
                print(ret)


if __name__ == '__main__':
    cli(obj={})
back to top