https://forge.softwareheritage.org/source/swh-scheduler.git
Raw File
Tip revision: 51cf8d2220ad84150892f08496bea08f7f35b6b6 authored by Nicolas Dandrimont on 13 December 2019, 10:58:10 UTC
Add pytest-mock build-dependency.
Tip revision: 51cf8d2
test_scheduler.py
# Copyright (C) 2017-2018  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import copy
import datetime
import random
import uuid
from collections import defaultdict

import psycopg2
from arrow import utcnow
import pytest


TASK_TYPES = {
    'git': {
        'type': 'update-git',
        'description': 'Update a git repository',
        'backend_name': 'swh.loader.git.tasks.UpdateGitRepository',
        'default_interval': datetime.timedelta(days=64),
        'min_interval': datetime.timedelta(hours=12),
        'max_interval': datetime.timedelta(days=64),
        'backoff_factor': 2,
        'max_queue_length': None,
        'num_retries': 7,
        'retry_delay': datetime.timedelta(hours=2),
    },
    'hg': {
        'type': 'update-hg',
        'description': 'Update a mercurial repository',
        'backend_name': 'swh.loader.mercurial.tasks.UpdateHgRepository',
        'default_interval': datetime.timedelta(days=64),
        'min_interval': datetime.timedelta(hours=12),
        'max_interval': datetime.timedelta(days=64),
        'backoff_factor': 2,
        'max_queue_length': None,
        'num_retries': 7,
        'retry_delay': datetime.timedelta(hours=2),
    },
}

TEMPLATES = {
    'git': {
        'type': 'update-git',
        'arguments': {
            'args': [],
            'kwargs': {},
        },
        'next_run': None,
    },
    'hg': {
        'type': 'update-hg',
        'arguments': {
            'args': [],
            'kwargs': {},
        },
        'next_run': None,
        'policy': 'oneshot',
    }
}


def subdict(d, keys=None, excl=()):
    if keys is None:
        keys = [k for k in d.keys()]
    return {k: d[k] for k in keys if k not in excl}


@pytest.mark.db
class TestScheduler:
    def test_get_priority_ratios(self, swh_scheduler):
        assert swh_scheduler.get_priority_ratios() == {
            'high': 0.5,
            'normal': 0.3,
            'low': 0.2,
            }

    def test_add_task_type(self, swh_scheduler):
        tt = TASK_TYPES['git']
        swh_scheduler.create_task_type(tt)
        assert tt == swh_scheduler.get_task_type(tt['type'])
        with pytest.raises(psycopg2.IntegrityError,
                           match=r'\(type\)=\(%s\)' % tt['type']):
            swh_scheduler.create_task_type(tt)

        tt2 = TASK_TYPES['hg']
        swh_scheduler.create_task_type(tt2)
        assert tt == swh_scheduler.get_task_type(tt['type'])
        assert tt2 == swh_scheduler.get_task_type(tt2['type'])

    def test_get_task_types(self, swh_scheduler):
        tt, tt2 = TASK_TYPES['git'], TASK_TYPES['hg']
        swh_scheduler.create_task_type(tt)
        swh_scheduler.create_task_type(tt2)
        actual_task_types = swh_scheduler.get_task_types()
        assert tt in actual_task_types
        assert tt2 in actual_task_types

    def test_create_tasks(self, swh_scheduler):
        priority_ratio = self._priority_ratio(swh_scheduler)
        self._create_task_types(swh_scheduler)
        num_tasks_priority = 100
        tasks_1 = self._tasks_from_template(
            TEMPLATES['git'], utcnow(), 100)
        tasks_2 = self._tasks_from_template(
            TEMPLATES['hg'], utcnow(), 100,
            num_tasks_priority, priorities=priority_ratio)
        tasks = tasks_1 + tasks_2

        # tasks are returned only once with their ids
        ret1 = swh_scheduler.create_tasks(tasks + tasks_1 + tasks_2)
        set_ret1 = set([t['id'] for t in ret1])

        # creating the same set result in the same ids
        ret = swh_scheduler.create_tasks(tasks)
        set_ret = set([t['id'] for t in ret])

        # Idempotence results
        assert set_ret == set_ret1
        assert len(ret) == len(ret1)

        ids = set()
        actual_priorities = defaultdict(int)

        for task, orig_task in zip(ret, tasks):
            task = copy.deepcopy(task)
            task_type = TASK_TYPES[orig_task['type'].split('-')[-1]]
            assert task['id'] not in ids
            assert task['status'] == 'next_run_not_scheduled'
            assert task['current_interval'] == task_type['default_interval']
            assert task['policy'] == orig_task.get('policy', 'recurring')
            priority = task.get('priority')
            if priority:
                actual_priorities[priority] += 1

            assert task['retries_left'] == (task_type['num_retries'] or 0)
            ids.add(task['id'])
            del task['id']
            del task['status']
            del task['current_interval']
            del task['retries_left']
            if 'policy' not in orig_task:
                del task['policy']
            if 'priority' not in orig_task:
                del task['priority']
                assert task == orig_task

        assert dict(actual_priorities) == {
            priority: int(ratio * num_tasks_priority)
            for priority, ratio in priority_ratio.items()
        }

    def test_peek_ready_tasks_no_priority(self, swh_scheduler):
        self._create_task_types(swh_scheduler)
        t = utcnow()
        task_type = TEMPLATES['git']['type']
        tasks = self._tasks_from_template(TEMPLATES['git'], t, 100)
        random.shuffle(tasks)
        swh_scheduler.create_tasks(tasks)

        ready_tasks = swh_scheduler.peek_ready_tasks(task_type)
        assert len(ready_tasks) == len(tasks)
        for i in range(len(ready_tasks) - 1):
            assert ready_tasks[i]['next_run'] <= ready_tasks[i+1]['next_run']

        # Only get the first few ready tasks
        limit = random.randrange(5, 5 + len(tasks)//2)
        ready_tasks_limited = swh_scheduler.peek_ready_tasks(
            task_type, num_tasks=limit)

        assert len(ready_tasks_limited) == limit
        assert ready_tasks_limited == ready_tasks[:limit]

        # Limit by timestamp
        max_ts = tasks[limit-1]['next_run']
        ready_tasks_timestamped = swh_scheduler.peek_ready_tasks(
            task_type, timestamp=max_ts)

        for ready_task in ready_tasks_timestamped:
            assert ready_task['next_run'] <= max_ts

        # Make sure we get proper behavior for the first ready tasks
        assert ready_tasks[:len(ready_tasks_timestamped)] \
            == ready_tasks_timestamped

        # Limit by both
        ready_tasks_both = swh_scheduler.peek_ready_tasks(
            task_type, timestamp=max_ts, num_tasks=limit//3)
        assert len(ready_tasks_both) <= limit//3
        for ready_task in ready_tasks_both:
            assert ready_task['next_run'] <= max_ts
            assert ready_task in ready_tasks[:limit//3]

    def _priority_ratio(self, swh_scheduler):
        return swh_scheduler.get_priority_ratios()

    def test_peek_ready_tasks_mixed_priorities(self, swh_scheduler):
        priority_ratio = self._priority_ratio(swh_scheduler)
        self._create_task_types(swh_scheduler)
        t = utcnow()
        task_type = TEMPLATES['git']['type']
        num_tasks_priority = 100
        num_tasks_no_priority = 100
        # Create tasks with and without priorities
        tasks = self._tasks_from_template(
            TEMPLATES['git'], t,
            num=num_tasks_no_priority,
            num_priority=num_tasks_priority,
            priorities=priority_ratio)

        random.shuffle(tasks)
        swh_scheduler.create_tasks(tasks)

        # take all available tasks
        ready_tasks = swh_scheduler.peek_ready_tasks(
            task_type)

        assert len(ready_tasks) == len(tasks)
        assert num_tasks_priority + num_tasks_no_priority \
            == len(ready_tasks)

        count_tasks_per_priority = defaultdict(int)
        for task in ready_tasks:
            priority = task.get('priority')
            if priority:
                count_tasks_per_priority[priority] += 1

        assert dict(count_tasks_per_priority) == {
            priority: int(ratio * num_tasks_priority)
            for priority, ratio in priority_ratio.items()
        }

        # Only get some ready tasks
        num_tasks = random.randrange(5, 5 + num_tasks_no_priority//2)
        num_tasks_priority = random.randrange(5, num_tasks_priority//2)
        ready_tasks_limited = swh_scheduler.peek_ready_tasks(
            task_type, num_tasks=num_tasks,
            num_tasks_priority=num_tasks_priority)

        count_tasks_per_priority = defaultdict(int)
        for task in ready_tasks_limited:
            priority = task.get('priority')
            count_tasks_per_priority[priority] += 1

        import math
        for priority, ratio in priority_ratio.items():
            expected_count = math.ceil(ratio * num_tasks_priority)
            actual_prio = count_tasks_per_priority[priority]
            assert (actual_prio == expected_count or
                    actual_prio == expected_count + 1)

        assert count_tasks_per_priority[None] == num_tasks

    def test_grab_ready_tasks(self, swh_scheduler):
        priority_ratio = self._priority_ratio(swh_scheduler)
        self._create_task_types(swh_scheduler)
        t = utcnow()
        task_type = TEMPLATES['git']['type']
        num_tasks_priority = 100
        num_tasks_no_priority = 100
        # Create tasks with and without priorities
        tasks = self._tasks_from_template(
            TEMPLATES['git'], t,
            num=num_tasks_no_priority,
            num_priority=num_tasks_priority,
            priorities=priority_ratio)
        random.shuffle(tasks)
        swh_scheduler.create_tasks(tasks)

        first_ready_tasks = swh_scheduler.peek_ready_tasks(
            task_type, num_tasks=10, num_tasks_priority=10)
        grabbed_tasks = swh_scheduler.grab_ready_tasks(
            task_type, num_tasks=10, num_tasks_priority=10)

        for peeked, grabbed in zip(first_ready_tasks, grabbed_tasks):
            assert peeked['status'] == 'next_run_not_scheduled'
            del peeked['status']
            assert grabbed['status'] == 'next_run_scheduled'
            del grabbed['status']
            assert peeked == grabbed
            assert peeked['priority'] == grabbed['priority']

    def test_get_tasks(self, swh_scheduler):
        self._create_task_types(swh_scheduler)
        t = utcnow()
        tasks = self._tasks_from_template(TEMPLATES['git'], t, 100)
        tasks = swh_scheduler.create_tasks(tasks)
        random.shuffle(tasks)
        while len(tasks) > 1:
            length = random.randrange(1, len(tasks))
            cur_tasks = sorted(tasks[:length], key=lambda x: x['id'])
            tasks[:length] = []

            ret = swh_scheduler.get_tasks(task['id'] for task in cur_tasks)
            # result is not guaranteed to be sorted
            ret.sort(key=lambda x: x['id'])
            assert ret == cur_tasks

    def test_search_tasks(self, swh_scheduler):
        def make_real_dicts(l):
            """RealDictRow is not a real dict."""
            return [dict(d.items()) for d in l]
        self._create_task_types(swh_scheduler)
        t = utcnow()
        tasks = self._tasks_from_template(TEMPLATES['git'], t, 100)
        tasks = swh_scheduler.create_tasks(tasks)
        assert make_real_dicts(swh_scheduler.search_tasks()) \
            == make_real_dicts(tasks)

    def test_filter_task_to_archive(self, swh_scheduler):
        """Filtering only list disabled recurring or completed oneshot tasks

        """
        self._create_task_types(swh_scheduler)
        _time = utcnow()
        recurring = self._tasks_from_template(TEMPLATES['git'], _time, 12)
        oneshots = self._tasks_from_template(TEMPLATES['hg'], _time, 12)
        total_tasks = len(recurring) + len(oneshots)

        # simulate scheduling tasks
        pending_tasks = swh_scheduler.create_tasks(recurring + oneshots)
        backend_tasks = [{
            'task': task['id'],
            'backend_id': str(uuid.uuid4()),
            'scheduled': utcnow(),
        } for task in pending_tasks]
        swh_scheduler.mass_schedule_task_runs(backend_tasks)

        # we simulate the task are being done
        _tasks = []
        for task in backend_tasks:
            t = swh_scheduler.end_task_run(
                task['backend_id'], status='eventful')
            _tasks.append(t)

        # Randomly update task's status per policy
        status_per_policy = {'recurring': 0, 'oneshot': 0}
        status_choice = {
            # policy: [tuple (1-for-filtering, 'associated-status')]
            'recurring': [(1, 'disabled'),
                          (0, 'completed'),
                          (0, 'next_run_not_scheduled')],
            'oneshot': [(0, 'next_run_not_scheduled'),
                        (1, 'disabled'),
                        (1, 'completed')]
        }

        tasks_to_update = defaultdict(list)
        _task_ids = defaultdict(list)
        # randomize 'disabling' recurring task or 'complete' oneshot task
        for task in pending_tasks:
            policy = task['policy']
            _task_ids[policy].append(task['id'])
            status = random.choice(status_choice[policy])
            if status[0] != 1:
                continue
            # elected for filtering
            status_per_policy[policy] += status[0]
            tasks_to_update[policy].append(task['id'])

        swh_scheduler.disable_tasks(tasks_to_update['recurring'])
        # hack: change the status to something else than completed/disabled
        swh_scheduler.set_status_tasks(
            _task_ids['oneshot'], status='next_run_not_scheduled')
        # complete the tasks to update
        swh_scheduler.set_status_tasks(
            tasks_to_update['oneshot'], status='completed')

        total_tasks_filtered = (status_per_policy['recurring'] +
                                status_per_policy['oneshot'])

        # retrieve tasks to archive
        after = _time.shift(days=-1).format('YYYY-MM-DD')
        before = utcnow().shift(days=1).format('YYYY-MM-DD')
        tasks_to_archive = list(swh_scheduler.filter_task_to_archive(
            after_ts=after, before_ts=before, limit=total_tasks))

        assert len(tasks_to_archive) == total_tasks_filtered

        actual_filtered_per_status = {'recurring': 0, 'oneshot': 0}
        for task in tasks_to_archive:
            actual_filtered_per_status[task['task_policy']] += 1

        assert actual_filtered_per_status == status_per_policy

    def test_delete_archived_tasks(self, swh_scheduler):
        self._create_task_types(swh_scheduler)
        _time = utcnow()
        recurring = self._tasks_from_template(
            TEMPLATES['git'], _time, 12)
        oneshots = self._tasks_from_template(
            TEMPLATES['hg'], _time, 12)
        total_tasks = len(recurring) + len(oneshots)
        pending_tasks = swh_scheduler.create_tasks(recurring + oneshots)
        backend_tasks = [{
            'task': task['id'],
            'backend_id': str(uuid.uuid4()),
            'scheduled': utcnow(),
        } for task in pending_tasks]
        swh_scheduler.mass_schedule_task_runs(backend_tasks)

        _tasks = []
        percent = random.randint(0, 100)  # random election removal boundary
        for task in backend_tasks:
            t = swh_scheduler.end_task_run(
                task['backend_id'], status='eventful')
            c = random.randint(0, 100)
            if c <= percent:
                _tasks.append({'task_id': t['task'], 'task_run_id': t['id']})

        swh_scheduler.delete_archived_tasks(_tasks)

        all_tasks = [task['id'] for task in swh_scheduler.search_tasks()]
        tasks_count = len(all_tasks)
        tasks_run_count = len(swh_scheduler.get_task_runs(all_tasks))

        assert tasks_count == total_tasks - len(_tasks)
        assert tasks_run_count == total_tasks - len(_tasks)

    def test_get_task_runs_no_task(self, swh_scheduler):
        '''No task exist in the scheduler's db, get_task_runs() should always return an
        empty list.

        '''
        assert not swh_scheduler.get_task_runs(task_ids=())
        assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3))
        assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3),
                                               limit=10)

    def test_get_task_runs_no_task_executed(self, swh_scheduler):
        '''No task has been executed yet, get_task_runs() should always return an empty
        list.

        '''
        self._create_task_types(swh_scheduler)
        _time = utcnow()
        recurring = self._tasks_from_template(
            TEMPLATES['git'], _time, 12)
        oneshots = self._tasks_from_template(
            TEMPLATES['hg'], _time, 12)
        swh_scheduler.create_tasks(recurring + oneshots)

        assert not swh_scheduler.get_task_runs(task_ids=())
        assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3))
        assert not swh_scheduler.get_task_runs(task_ids=(1, 2, 3), limit=10)

    def test_get_task_runs_with_scheduled(self, swh_scheduler):
        '''Some tasks have been scheduled but not executed yet, get_task_runs() should
        not return an empty list. limit should behave as expected.

        '''
        self._create_task_types(swh_scheduler)
        _time = utcnow()
        recurring = self._tasks_from_template(
            TEMPLATES['git'], _time, 12)
        oneshots = self._tasks_from_template(
            TEMPLATES['hg'], _time, 12)
        total_tasks = len(recurring) + len(oneshots)
        pending_tasks = swh_scheduler.create_tasks(recurring + oneshots)
        backend_tasks = [{
            'task': task['id'],
            'backend_id': str(uuid.uuid4()),
            'scheduled': utcnow(),
        } for task in pending_tasks]
        swh_scheduler.mass_schedule_task_runs(backend_tasks)

        assert not swh_scheduler.get_task_runs(
            task_ids=[total_tasks + 1])

        btask = backend_tasks[0]
        runs = swh_scheduler.get_task_runs(
            task_ids=[btask['task']])
        assert len(runs) == 1
        run = runs[0]

        assert subdict(run, excl=('id',)) == {
            'task': btask['task'],
            'backend_id': btask['backend_id'],
            'scheduled': btask['scheduled'],
            'started': None,
            'ended': None,
            'metadata': None,
            'status': 'scheduled',
        }

        runs = swh_scheduler.get_task_runs(
            task_ids=[bt['task'] for bt in backend_tasks], limit=2)
        assert len(runs) == 2

        runs = swh_scheduler.get_task_runs(
            task_ids=[bt['task'] for bt in backend_tasks])
        assert len(runs) == total_tasks

        keys = ('task', 'backend_id', 'scheduled')
        assert sorted([subdict(x, keys) for x in runs],
                      key=lambda x: x['task']) == backend_tasks

    def test_get_task_runs_with_executed(self, swh_scheduler):
        '''Some tasks have been executed, get_task_runs() should
        not return an empty list. limit should behave as expected.

        '''
        self._create_task_types(swh_scheduler)
        _time = utcnow()
        recurring = self._tasks_from_template(
            TEMPLATES['git'], _time, 12)
        oneshots = self._tasks_from_template(
            TEMPLATES['hg'], _time, 12)
        pending_tasks = swh_scheduler.create_tasks(recurring + oneshots)
        backend_tasks = [{
            'task': task['id'],
            'backend_id': str(uuid.uuid4()),
            'scheduled': utcnow(),
        } for task in pending_tasks]
        swh_scheduler.mass_schedule_task_runs(backend_tasks)

        btask = backend_tasks[0]
        ts = utcnow()
        swh_scheduler.start_task_run(btask['backend_id'],
                                     metadata={'something': 'stupid'},
                                     timestamp=ts)
        runs = swh_scheduler.get_task_runs(task_ids=[btask['task']])
        assert len(runs) == 1
        assert subdict(runs[0], excl=('id')) == {
            'task': btask['task'],
            'backend_id': btask['backend_id'],
            'scheduled': btask['scheduled'],
            'started': ts,
            'ended': None,
            'metadata': {'something': 'stupid'},
            'status': 'started',
            }

        ts2 = utcnow()
        swh_scheduler.end_task_run(btask['backend_id'],
                                   metadata={'other': 'stuff'},
                                   timestamp=ts2,
                                   status='eventful')
        runs = swh_scheduler.get_task_runs(task_ids=[btask['task']])
        assert len(runs) == 1
        assert subdict(runs[0], excl=('id')) == {
            'task': btask['task'],
            'backend_id': btask['backend_id'],
            'scheduled': btask['scheduled'],
            'started': ts,
            'ended': ts2,
            'metadata': {'something': 'stupid', 'other': 'stuff'},
            'status': 'eventful',
            }

    @staticmethod
    def _task_from_template(template, next_run, priority, *args, **kwargs):
        ret = copy.deepcopy(template)
        ret['next_run'] = next_run
        if priority:
            ret['priority'] = priority
        if args:
            ret['arguments']['args'] = list(args)
        if kwargs:
            ret['arguments']['kwargs'] = kwargs
        return ret

    def _pop_priority(self, priorities):
        if not priorities:
            return None
        for priority, remains in priorities.items():
            if remains > 0:
                priorities[priority] = remains - 1
                return priority
        return None

    def _tasks_from_template(self, template, max_timestamp, num,
                             num_priority=0, priorities=None):
        if num_priority and priorities:
            priorities = {
                priority: ratio * num_priority
                for priority, ratio in priorities.items()
            }

        tasks = []
        for i in range(num + num_priority):
            priority = self._pop_priority(priorities)
            tasks.append(self._task_from_template(
                template,
                max_timestamp - datetime.timedelta(microseconds=i),
                priority,
                'argument-%03d' % i,
                **{'kwarg%03d' % i: 'bogus-kwarg'}
            ))
        return tasks

    def _create_task_types(self, scheduler):
        for tt in TASK_TYPES.values():
            scheduler.create_task_type(tt)
back to top