https://forge.softwareheritage.org/source/swh-scheduler.git
Tip revision: 6725f573559b6db976f515e4b6378fcc748772f3 authored by Jérémy Bobbio (Lunar) on 11 December 2023, 16:32:10 UTC
Add a small hint when tasks are called with missing keywords
Add a small hint when tasks are called with missing keywords
Tip revision: 6725f57
test_cli_journal.py
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
from typing import Dict, List
from click.testing import CliRunner, Result
from confluent_kafka import Producer
import pytest
import yaml
from swh.journal.serializers import value_to_kafka
from swh.scheduler import get_scheduler
from swh.scheduler.cli import cli
from swh.scheduler.tests.test_journal_client import VISIT_STATUSES_1
@pytest.fixture
def swh_scheduler_cfg(postgresql_scheduler, kafka_server):
"""Journal client configuration ready"""
return {
"scheduler": {
"cls": "postgresql",
"db": postgresql_scheduler.dsn,
},
"journal": {
"brokers": [kafka_server],
"group_id": "test-consume-visit-status",
},
}
def _write_configuration_path(config: Dict, tmp_path: str) -> str:
config_path = os.path.join(str(tmp_path), "scheduler.yml")
with open(config_path, "w") as f:
f.write(yaml.dump(config))
return config_path
@pytest.fixture
def swh_scheduler_cfg_path(swh_scheduler_cfg, tmp_path):
"""Write scheduler configuration in temporary path and returns such path"""
return _write_configuration_path(swh_scheduler_cfg, tmp_path)
def invoke(args: List[str], config_path: str) -> Result:
"""Invoke swh scheduler journal subcommands"""
runner = CliRunner()
return runner.invoke(cli, ["-C" + config_path] + args)
def test_cli_journal_client_origin_visit_status_misconfiguration_no_scheduler(
swh_scheduler_cfg, tmp_path
):
config = swh_scheduler_cfg.copy()
config["scheduler"] = {"cls": "foo"}
config_path = _write_configuration_path(config, tmp_path)
result = invoke(
[
"journal-client",
"--stop-after-objects",
"1",
],
config_path,
)
assert "must be instantiated" in result.output
assert result.exit_code != 0
def test_cli_journal_client_origin_visit_status_misconfiguration_missing_journal_conf(
swh_scheduler_cfg, tmp_path
):
config = swh_scheduler_cfg.copy()
config.pop("journal", None)
config_path = _write_configuration_path(config, tmp_path)
result = invoke(
[
"journal-client",
"--stop-after-objects",
"1",
],
config_path,
)
assert "Missing 'journal'" in result.output
assert result.exit_code != 0
def test_cli_journal_client_origin_visit_status(
swh_scheduler_cfg,
swh_scheduler_cfg_path,
):
kafka_server = swh_scheduler_cfg["journal"]["brokers"][0]
swh_scheduler = get_scheduler(**swh_scheduler_cfg["scheduler"])
producer = Producer(
{
"bootstrap.servers": kafka_server,
"client.id": "test visit-stats producer",
"acks": "all",
}
)
visit_status = VISIT_STATUSES_1[0]
value = value_to_kafka(visit_status)
topic = "swh.journal.objects.origin_visit_status"
producer.produce(topic=topic, key=b"bogus-origin", value=value)
producer.flush()
result = invoke(
[
"journal-client",
"--stop-after-objects",
"1",
],
swh_scheduler_cfg_path,
)
# Check the output
expected_output = "Processed 1 message(s).\nDone.\n"
assert result.exit_code == 0, result.output
assert result.output == expected_output
actual_visit_stats = swh_scheduler.origin_visit_stats_get(
[(visit_status["origin"], visit_status["type"])]
)
assert actual_visit_stats
assert len(actual_visit_stats) == 1