https://forge.softwareheritage.org/source/swh-scheduler.git
Tip revision: b6bc2f2ae26b6d0bb7768db8f2ffac2002867ca3 authored by Nicolas Dandrimont on 18 December 2018, 16:08:45 UTC
Explicitly register class-based tasks inheriting from our own class
Explicitly register class-based tasks inheriting from our own class
Tip revision: b6bc2f2
swh-worker-control
#!/usr/bin/env python3
# Copyright (C) 2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from fnmatch import fnmatch
from operator import itemgetter
import os
import sys
import click
def list_remote_workers(inspect):
ping_replies = inspect.ping()
if not ping_replies:
return {}
workers = list(sorted(ping_replies))
ret = {}
for worker_name in workers:
if not worker_name.startswith('celery@'):
print('Unsupported worker: %s' % worker_name, file=sys.stderr)
continue
type, host = worker_name[len('celery@'):].split('.', 1)
worker = {
'name': worker_name,
'host': host,
'type': type,
}
ret[worker_name] = worker
return ret
def make_filters(filter_host, filter_type):
"""Parse the filters and create test functions"""
def include(field, value):
def filter(worker, field=field, value=value):
return fnmatch(worker[field], value)
return filter
def exclude(field, value):
def filter(worker, field=field, value=value):
return not fnmatch(worker[field], value)
return filter
filters = []
for host in filter_host:
if host.startswith('-'):
filters.append(exclude('host', host[1:]))
else:
filters.append(include('host', host))
for type_ in filter_type:
if type_.startswith('-'):
filters.append(exclude('type', type_[1:]))
else:
filters.append(include('type', type_))
return filters
def filter_workers(workers, filters):
"""Filter workers according to the set criteria"""
return {name: worker
for name, worker in workers.items()
if all(check(worker) for check in filters)}
def get_clock_offsets(workers, inspect):
"""Add a clock_offset entry for each worker"""
err_msg = 'Could not get monotonic clock for {worker}'
t = datetime.datetime.now(tz=datetime.timezone.utc)
for worker, clock in inspect._request('monotonic').items():
monotonic = clock.get('monotonic')
if monotonic is None:
monotonic = 0
click.echo(err_msg.format(worker=worker), err=True)
dt = datetime.timedelta(seconds=monotonic)
workers[worker]['clock_offset'] = t - dt
def worker_to_wallclock(worker, monotonic):
"""Convert a monotonic timestamp from a worker to a wall clock time"""
dt = datetime.timedelta(seconds=monotonic)
return worker['clock_offset'] + dt
@click.group()
@click.option('--instance-config', metavar='CONFIG', default=None,
help='Use this worker instance configuration')
@click.option('--host', metavar='HOSTNAME_FILTER', multiple=True,
help='Filter by hostname')
@click.option('--type', metavar='WORKER_TYPE_FILTER', multiple=True,
help='Filter by worker type')
@click.option('--timeout', metavar='TIMEOUT', type=float, default=1.0,
help='Timeout for remote control communication')
@click.option('--debug/--no-debug', default=False, help='Turn on debugging')
@click.pass_context
def cli(ctx, debug, timeout, instance_config, host, type):
"""Manage the Software Heritage workers
Filters support globs; a filter starting with a "-" excludes the
corresponding values.
"""
if instance_config:
os.environ['SWH_WORKER_INSTANCE'] = instance_config
from swh.scheduler.celery_backend.config import app
full_inspect = app.control.inspect(timeout=timeout)
workers = filter_workers(
list_remote_workers(full_inspect),
make_filters(host, type)
)
ctx.obj['workers'] = workers
destination = list(workers)
inspect = app.control.inspect(destination=destination,
timeout=timeout)
ctx.obj['inspect'] = inspect
get_clock_offsets(workers, inspect)
ctx.obj['control'] = app.control
ctx.obj['destination'] = destination
ctx.obj['timeout'] = timeout
ctx.obj['debug'] = debug
@cli.command()
@click.pass_context
def list_workers(ctx):
"""List the currently running workers"""
workers = ctx.obj['workers']
for worker_name, worker in sorted(workers.items()):
click.echo("{type} alive on {host}".format(**worker))
if not workers:
sys.exit(2)
@cli.command()
@click.pass_context
def list_tasks(ctx):
"""List the tasks currently running on workers"""
task_template = ('{worker} {name}'
'[{id} '
'started={started:%Y-%m-%mT%H:%M:%S} '
'pid={worker_pid}] {args} {kwargs}')
inspect = ctx.obj['inspect']
workers = ctx.obj['workers']
active = inspect.active()
if not active:
click.echo('No reply from workers', err=True)
sys.exit(2)
has_tasks = False
for worker_name, tasks in sorted(active.items()):
worker = workers[worker_name]
if not tasks:
click.echo("No active tasks on {name}".format(**worker), err=True)
for task in sorted(tasks, key=itemgetter('time_start')):
task['started'] = worker_to_wallclock(worker, task['time_start'])
click.echo(task_template.format(worker=worker_name, **task))
has_tasks = True
if not has_tasks:
sys.exit(2)
@cli.command()
@click.pass_context
def list_queues(ctx):
"""List all the queues currently enabled on the workers"""
inspect = ctx.obj['inspect']
active = inspect.active_queues()
if not active:
click.echo('No reply from workers', err=True)
sys.exit(2)
has_queues = False
for worker_name, queues in sorted(active.items()):
queues = sorted(queue['name'] for queue in queues)
if queues:
click.echo('{worker} {queues}'.format(worker=worker_name,
queues=' '.join(queues)))
has_queues = True
else:
click.echo('No queues for {worker}'.format(worker=worker_name),
err=True)
if not has_queues:
sys.exit(2)
@cli.command()
@click.option('--noop', is_flag=True, default=False, help='Do not proceed')
@click.argument('queues', nargs=-1)
@click.pass_context
def remove_queues(ctx, noop, queues):
"""Cancel the queue for the given workers"""
msg_template = 'Canceling queue {queue} on worker {worker}{noop}'
inspect = ctx.obj['inspect']
control = ctx.obj['control']
timeout = ctx.obj['timeout']
active = inspect.active_queues()
if not queues:
queues = ['*']
if not active:
click.echo('No reply from workers', err=True)
sys.exit(2)
for worker, active_queues in sorted(active.items()):
for queue in sorted(active_queues, key=itemgetter('name')):
if any(fnmatch(queue['name'], name) for name in queues):
msg = msg_template.format(queue=queue['name'], worker=worker,
noop=' (noop)' if noop else '')
click.echo(msg, err=True)
if not noop:
control.cancel_consumer(queue['name'],
destination=[worker],
timeout=timeout)
@cli.command()
@click.option('--noop', is_flag=True, default=False, help='Do not proceed')
@click.argument('queues', nargs=-1)
@click.pass_context
def add_queues(ctx, noop, queues):
"""Start the queue for the given workers"""
msg_template = 'Starting queue {queue} on worker {worker}{noop}'
control = ctx.obj['control']
timeout = ctx.obj['timeout']
workers = ctx.obj['workers']
if not workers:
click.echo('No reply from workers', err=True)
sys.exit(2)
for worker in sorted(workers):
for queue in queues:
msg = msg_template.format(queue=queue, worker=worker,
noop=' (noop)' if noop else '')
click.echo(msg, err=True)
if not noop:
ret = control.add_consumer(queue,
destination=[worker],
timeout=timeout)
print(ret)
if __name__ == '__main__':
cli(obj={})