swh:1:snp:eb70f1f85391e4b077c211bec36af0061c4bf937
Raw File
Tip revision: c21d0e38204aaca3999c00f2346b0d5c69a2ddff authored by Antoine R. Dumont (@ardumont) on 07 July 2020, 09:09:25 UTC
Move sharable fixtures out of conftest into a dedicated pytest plugin
Tip revision: c21d0e3
test_cli.py
# Copyright (C) 2020  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import copy
import logging
import re
import tempfile
import yaml

from typing import Any, Dict
from unittest.mock import patch

import pytest

from click.testing import CliRunner
from confluent_kafka import Producer

from swh.journal.serializers import key_to_kafka, value_to_kafka
from swh.storage import get_storage
from swh.storage.cli import storage as cli


logger = logging.getLogger(__name__)


CLI_CONFIG = {
    "storage": {"cls": "memory",},
}


@pytest.fixture
def storage():
    """An swh-storage object that gets injected into the CLI functions."""
    storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]}
    storage = get_storage(**storage_config)
    with patch("swh.storage.cli.get_storage") as get_storage_mock:
        get_storage_mock.return_value = storage
        yield storage


@pytest.fixture
def monkeypatch_retry_sleep(monkeypatch):
    from swh.journal.replay import copy_object, obj_in_objstorage

    monkeypatch.setattr(copy_object.retry, "sleep", lambda x: None)
    monkeypatch.setattr(obj_in_objstorage.retry, "sleep", lambda x: None)


def invoke(*args, env=None, journal_config=None):
    config = copy.deepcopy(CLI_CONFIG)
    if journal_config:
        config["journal_client"] = journal_config.copy()
        config["journal_client"]["cls"] = "kafka"

    runner = CliRunner()
    with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd:
        yaml.dump(config, config_fd)
        config_fd.seek(0)
        args = ["-C" + config_fd.name] + list(args)
        ret = runner.invoke(cli, args, obj={"log_level": logging.DEBUG}, env=env,)
        return ret


def test_replay(
    storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str,
):
    kafka_prefix += ".swh.journal.objects"

    producer = Producer(
        {
            "bootstrap.servers": kafka_server,
            "client.id": "test-producer",
            "acks": "all",
        }
    )

    snapshot = {
        "id": b"foo",
        "branches": {b"HEAD": {"target_type": "revision", "target": b"\x01" * 20,}},
    }  # type: Dict[str, Any]
    producer.produce(
        topic=kafka_prefix + ".snapshot",
        key=key_to_kafka(snapshot["id"]),
        value=value_to_kafka(snapshot),
    )
    producer.flush()

    logger.debug("Flushed producer")

    result = invoke(
        "replay",
        "--stop-after-objects",
        "1",
        journal_config={
            "brokers": [kafka_server],
            "group_id": kafka_consumer_group,
            "prefix": kafka_prefix,
        },
    )

    expected = r"Done.\n"
    assert result.exit_code == 0, result.output
    assert re.fullmatch(expected, result.output, re.MULTILINE), result.output

    assert storage.snapshot_get(snapshot["id"]) == {**snapshot, "next_branch": None}
back to top