https://forge.softwareheritage.org/source/swh-scheduler.git
Raw File
Tip revision: d9864de48ce51873262c0b9e639df085d2b33718 authored by Jenkins for Software Heritage on 10 July 2020, 11:08:30 UTC
Updated debian changelog for version 0.5.2
Tip revision: d9864de
pytest_plugin.py
# Copyright (C) 2020  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

from datetime import timedelta
import glob
import os

from celery.contrib.testing import worker
from celery.contrib.testing.app import TestApp, setup_default_app

import pkg_resources
import pytest

from swh.core.utils import numfile_sortkey as sortkey

import swh.scheduler
from swh.scheduler import get_scheduler


SQL_DIR = os.path.join(os.path.dirname(swh.scheduler.__file__), "sql")
DUMP_FILES = os.path.join(SQL_DIR, "*.sql")

# celery tasks for testing purpose; tasks themselves should be
# in swh/scheduler/tests/tasks.py
TASK_NAMES = ["ping", "multiping", "add", "error", "echo"]


@pytest.fixture
def swh_scheduler_config(request, postgresql):
    scheduler_config = {
        "db": postgresql.dsn,
    }

    all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey)

    cursor = postgresql.cursor()
    for fname in all_dump_files:
        with open(fname) as fobj:
            cursor.execute(fobj.read())
    postgresql.commit()

    return scheduler_config


@pytest.fixture
def swh_scheduler(swh_scheduler_config):
    scheduler = get_scheduler("local", swh_scheduler_config)
    for taskname in TASK_NAMES:
        scheduler.create_task_type(
            {
                "type": "swh-test-{}".format(taskname),
                "description": "The {} testing task".format(taskname),
                "backend_name": "swh.scheduler.tests.tasks.{}".format(taskname),
                "default_interval": timedelta(days=1),
                "min_interval": timedelta(hours=6),
                "max_interval": timedelta(days=12),
            }
        )

    return scheduler


# this alias is used to be able to easily instantiate a db-backed Scheduler
# eg. for the RPC client/server test suite.
swh_db_scheduler = swh_scheduler


@pytest.fixture(scope="session")
def swh_scheduler_celery_app():
    """Set up a Celery app as swh.scheduler and swh worker tests would expect it"""
    test_app = TestApp(
        set_as_current=True,
        enable_logging=True,
        task_cls="swh.scheduler.task:SWHTask",
        config={
            "accept_content": ["application/x-msgpack", "application/json"],
            "task_serializer": "msgpack",
            "result_serializer": "json",
        },
    )
    with setup_default_app(test_app, use_trap=False):
        from swh.scheduler.celery_backend import config

        config.app = test_app
        test_app.set_default()
        test_app.set_current()
        yield test_app


@pytest.fixture(scope="session")
def swh_scheduler_celery_includes():
    """List of task modules that should be loaded by the swh_scheduler_celery_worker on
startup."""
    task_modules = ["swh.scheduler.tests.tasks"]

    for entrypoint in pkg_resources.iter_entry_points("swh.workers"):
        task_modules.extend(entrypoint.load()().get("task_modules", []))
    return task_modules


@pytest.fixture(scope="session")
def swh_scheduler_celery_worker(
    swh_scheduler_celery_app, swh_scheduler_celery_includes,
):
    """Spawn a worker"""
    for module in swh_scheduler_celery_includes:
        swh_scheduler_celery_app.loader.import_task_module(module)
    with worker.start_worker(swh_scheduler_celery_app, pool="solo") as w:
        yield w
back to top