swh:1:snp:eb70f1f85391e4b077c211bec36af0061c4bf937
Raw File
Tip revision: 4d52fc1d076f26ba8aa6c4da7e4e30008a368df0 authored by Antoine R. Dumont (@ardumont) on 30 July 2020, 12:32:20 UTC
storage*: Adapt origin_list(...) -> PagedResult[Origin]
Tip revision: 4d52fc1
test_origin.py
# Copyright (C) 2019-2020  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import datetime
import pytest

from unittest.mock import patch

from swh.model.model import Origin, OriginVisit, OriginVisitStatus

from swh.storage.algos.origin import (
    iter_origins,
    origin_get_latest_visit_status,
    iter_origin_visits,
    iter_origin_visit_statuses,
)
from swh.storage.interface import ListOrder
from swh.storage.utils import now

from swh.storage.tests.test_storage import round_to_milliseconds


def assert_list_eq(left, right, msg=None):
    assert list(left) == list(right), msg


@pytest.fixture
def swh_storage_backend_config():
    yield {
        "cls": "memory",
    }


def test_iter_origins(swh_storage):
    origins = [
        Origin(url="bar"),
        Origin(url="qux"),
        Origin(url="quuz"),
    ]
    assert swh_storage.origin_add(origins) == {"origin:add": 3}
    assert_list_eq(iter_origins(swh_storage), origins)
    assert_list_eq(iter_origins(swh_storage, batch_size=1), origins)
    assert_list_eq(iter_origins(swh_storage, batch_size=2), origins)

    for i in range(1, 5):
        assert_list_eq(iter_origins(swh_storage, origin_from=i + 1), origins[i:], i)

        assert_list_eq(
            iter_origins(swh_storage, origin_from=i + 1, batch_size=1), origins[i:], i
        )

        assert_list_eq(
            iter_origins(swh_storage, origin_from=i + 1, batch_size=2), origins[i:], i
        )

        for j in range(i, 5):
            assert_list_eq(
                iter_origins(swh_storage, origin_from=i + 1, origin_to=j + 1),
                origins[i:j],
                (i, j),
            )

            assert_list_eq(
                iter_origins(
                    swh_storage, origin_from=i + 1, origin_to=j + 1, batch_size=1
                ),
                origins[i:j],
                (i, j),
            )

            assert_list_eq(
                iter_origins(
                    swh_storage, origin_from=i + 1, origin_to=j + 1, batch_size=2
                ),
                origins[i:j],
                (i, j),
            )


@patch("swh.storage.in_memory.InMemoryStorage.origin_get_range")
def test_iter_origins_batch_size(mock_origin_get_range, swh_storage):
    mock_origin_get_range.return_value = []

    list(iter_origins(swh_storage))
    mock_origin_get_range.assert_called_with(origin_from=1, origin_count=10000)

    list(iter_origins(swh_storage, batch_size=42))
    mock_origin_get_range.assert_called_with(origin_from=1, origin_count=42)


def test_origin_get_latest_visit_status_none(swh_storage, sample_data):
    """Looking up unknown objects should return nothing

    """
    # unknown origin so no result
    assert origin_get_latest_visit_status(swh_storage, "unknown-origin") is None

    # unknown type so no result
    origin = sample_data.origin
    origin_visit = sample_data.origin_visit
    assert origin_visit.origin == origin.url

    swh_storage.origin_add([origin])
    swh_storage.origin_visit_add([origin_visit])[0]
    assert origin_visit.type != "unknown"
    actual_origin_visit = origin_get_latest_visit_status(
        swh_storage, origin.url, type="unknown"
    )
    assert actual_origin_visit is None

    actual_origin_visit = origin_get_latest_visit_status(
        swh_storage, origin.url, require_snapshot=True
    )
    assert actual_origin_visit is None

    actual_origin_visit = origin_get_latest_visit_status(
        swh_storage, origin.url, allowed_statuses=["unknown"]
    )
    assert actual_origin_visit is None


def init_storage_with_origin_visits(swh_storage, sample_data):
    """Initialize storage with origin/origin-visit/origin-visit-status

    """
    snapshot = sample_data.snapshots[2]
    origin1, origin2 = sample_data.origins[:2]
    swh_storage.origin_add([origin1, origin2])

    ov1, ov2 = swh_storage.origin_visit_add(
        [
            OriginVisit(
                origin=origin1.url,
                date=sample_data.date_visit1,
                type=sample_data.type_visit1,
            ),
            OriginVisit(
                origin=origin2.url,
                date=sample_data.date_visit2,
                type=sample_data.type_visit2,
            ),
        ]
    )

    swh_storage.snapshot_add([snapshot])

    date_now = now()
    date_now = round_to_milliseconds(date_now)
    assert sample_data.date_visit1 < sample_data.date_visit2
    assert sample_data.date_visit2 < date_now

    # origin visit status 1 for origin visit 1
    ovs11 = OriginVisitStatus(
        origin=origin1.url,
        visit=ov1.visit,
        date=sample_data.date_visit1,
        status="partial",
        snapshot=None,
    )
    # origin visit status 2 for origin visit 1
    ovs12 = OriginVisitStatus(
        origin=origin1.url,
        visit=ov1.visit,
        date=sample_data.date_visit2,
        status="ongoing",
        snapshot=None,
    )
    # origin visit status 1 for origin visit 2
    ovs21 = OriginVisitStatus(
        origin=origin2.url,
        visit=ov2.visit,
        date=sample_data.date_visit2,
        status="ongoing",
        snapshot=None,
    )
    # origin visit status 2 for origin visit 2
    ovs22 = OriginVisitStatus(
        origin=origin2.url,
        visit=ov2.visit,
        date=date_now,
        status="full",
        snapshot=snapshot.id,
        metadata={"something": "wicked"},
    )

    swh_storage.origin_visit_status_add([ovs11, ovs12, ovs21, ovs22])
    return {
        "origin": [origin1, origin2],
        "origin_visit": [ov1, ov2],
        "origin_visit_status": [ovs11, ovs12, ovs21, ovs22],
    }


def test_origin_get_latest_visit_status_filter_type(swh_storage, sample_data):
    """Filtering origin visit per types should yield consistent results

    """
    objects = init_storage_with_origin_visits(swh_storage, sample_data)
    origin1, origin2 = objects["origin"]
    ov1, ov2 = objects["origin_visit"]
    ovs11, ovs12, _, ovs22 = objects["origin_visit_status"]

    # no visit for origin1 url with type_visit2
    assert (
        origin_get_latest_visit_status(
            swh_storage, origin1.url, type=sample_data.type_visit2
        )
        is None
    )

    # no visit for origin2 url with type_visit1
    assert (
        origin_get_latest_visit_status(
            swh_storage, origin2.url, type=sample_data.type_visit1
        )
        is None
    )

    # Two visits, both with no snapshot, take the most recent
    actual_ov1, actual_ovs12 = origin_get_latest_visit_status(
        swh_storage, origin1.url, type=sample_data.type_visit1
    )
    assert isinstance(actual_ov1, OriginVisit)
    assert isinstance(actual_ovs12, OriginVisitStatus)
    assert actual_ov1.origin == ov1.origin
    assert actual_ov1.visit == ov1.visit
    assert actual_ov1.type == sample_data.type_visit1
    assert actual_ovs12 == ovs12

    # take the most recent visit with type_visit2
    actual_ov2, actual_ovs22 = origin_get_latest_visit_status(
        swh_storage, origin2.url, type=sample_data.type_visit2
    )
    assert isinstance(actual_ov2, OriginVisit)
    assert isinstance(actual_ovs22, OriginVisitStatus)
    assert actual_ov2.origin == ov2.origin
    assert actual_ov2.visit == ov2.visit
    assert actual_ov2.type == sample_data.type_visit2
    assert actual_ovs22 == ovs22


def test_origin_get_latest_visit_status_filter_status(swh_storage, sample_data):
    objects = init_storage_with_origin_visits(swh_storage, sample_data)
    origin1, origin2 = objects["origin"]
    ov1, ov2 = objects["origin_visit"]
    ovs11, ovs12, _, ovs22 = objects["origin_visit_status"]

    # no failed status for that visit
    assert (
        origin_get_latest_visit_status(
            swh_storage, origin2.url, allowed_statuses=["failed"]
        )
        is None
    )

    # only 1 partial for that visit
    actual_ov1, actual_ovs11 = origin_get_latest_visit_status(
        swh_storage, origin1.url, allowed_statuses=["partial"]
    )
    assert actual_ov1.origin == ov1.origin
    assert actual_ov1.visit == ov1.visit
    assert actual_ov1.type == sample_data.type_visit1
    assert actual_ovs11 == ovs11

    # both status exist, take the latest one
    actual_ov1, actual_ovs12 = origin_get_latest_visit_status(
        swh_storage, origin1.url, allowed_statuses=["partial", "ongoing"]
    )
    assert actual_ov1.origin == ov1.origin
    assert actual_ov1.visit == ov1.visit
    assert actual_ov1.type == sample_data.type_visit1
    assert actual_ovs12 == ovs12

    assert isinstance(actual_ov1, OriginVisit)
    assert isinstance(actual_ovs12, OriginVisitStatus)
    assert actual_ov1.origin == ov1.origin
    assert actual_ov1.visit == ov1.visit
    assert actual_ov1.type == sample_data.type_visit1
    assert actual_ovs12 == ovs12

    # take the most recent visit with type_visit2
    actual_ov2, actual_ovs22 = origin_get_latest_visit_status(
        swh_storage, origin2.url, allowed_statuses=["full"]
    )
    assert actual_ov2.origin == ov2.origin
    assert actual_ov2.visit == ov2.visit
    assert actual_ov2.type == sample_data.type_visit2
    assert actual_ovs22 == ovs22


def test_origin_get_latest_visit_status_filter_snapshot(swh_storage, sample_data):
    objects = init_storage_with_origin_visits(swh_storage, sample_data)
    origin1, origin2 = objects["origin"]
    _, ov2 = objects["origin_visit"]
    _, _, _, ovs22 = objects["origin_visit_status"]

    # there is no visit with snapshot yet for that visit
    assert (
        origin_get_latest_visit_status(swh_storage, origin1.url, require_snapshot=True)
        is None
    )

    # visit status with partial status visit elected
    actual_ov2, actual_ovs22 = origin_get_latest_visit_status(
        swh_storage, origin2.url, require_snapshot=True
    )
    assert actual_ov2.origin == ov2.origin
    assert actual_ov2.visit == ov2.visit
    assert actual_ov2.type == ov2.type
    assert actual_ovs22 == ovs22

    date_now = now()

    # Add another visit
    swh_storage.origin_visit_add(
        [OriginVisit(origin=origin2.url, date=date_now, type=sample_data.type_visit2,),]
    )

    # Requiring the latest visit with a snapshot, we still find the previous visit
    ov2, ovs22 = origin_get_latest_visit_status(
        swh_storage, origin2.url, require_snapshot=True
    )
    assert actual_ov2.origin == ov2.origin
    assert actual_ov2.visit == ov2.visit
    assert actual_ov2.type == ov2.type
    assert actual_ovs22 == ovs22


def test_iter_origin_visits(swh_storage, sample_data):
    """Iter over origin visits for an origin returns all visits"""
    origin1, origin2 = sample_data.origins[:2]
    swh_storage.origin_add([origin1, origin2])

    date_past = now() - datetime.timedelta(weeks=20)

    new_visits = []
    for visit_id in range(20):
        new_visits.append(
            OriginVisit(
                origin=origin1.url,
                date=date_past + datetime.timedelta(days=visit_id),
                type="git",
            )
        )

    visits = swh_storage.origin_visit_add(new_visits)
    reversed_visits = list(reversed(visits))

    # no limit, order asc
    actual_visits = list(iter_origin_visits(swh_storage, origin1.url))
    assert actual_visits == visits

    # no limit, order desc
    actual_visits = list(
        iter_origin_visits(swh_storage, origin1.url, order=ListOrder.DESC)
    )
    assert actual_visits == reversed_visits

    # no result
    actual_visits = list(iter_origin_visits(swh_storage, origin2.url))
    assert actual_visits == []


def test_iter_origin_visit_status(swh_storage, sample_data):
    origin1, origin2 = sample_data.origins[:2]
    swh_storage.origin_add([origin1])

    ov1 = swh_storage.origin_visit_add([sample_data.origin_visit])[0]
    assert ov1.origin == origin1.url

    date_past = now() - datetime.timedelta(weeks=20)

    ovs1 = OriginVisitStatus(
        origin=origin1.url,
        visit=ov1.visit,
        date=ov1.date,
        status="created",
        snapshot=None,
    )
    new_visit_statuses = [ovs1]
    for i in range(20):
        status_date = date_past + datetime.timedelta(days=i)

        new_visit_statuses.append(
            OriginVisitStatus(
                origin=origin1.url,
                visit=ov1.visit,
                date=status_date,
                status="created",
                snapshot=None,
            )
        )

    visit_statuses = swh_storage.origin_visit_add(new_visit_statuses)
    reversed_visit_statuses = list(reversed(visit_statuses))

    # order asc
    actual_visit_statuses = list(
        iter_origin_visit_statuses(swh_storage, ov1.origin, ov1.visit)
    )
    assert actual_visit_statuses == visit_statuses

    # order desc
    actual_visit_statuses = list(
        iter_origin_visit_statuses(
            swh_storage, ov1.origin, ov1.visit, order=ListOrder.DESC
        )
    )
    assert actual_visit_statuses == reversed_visit_statuses

    # no result
    actual_visit_statuses = list(
        iter_origin_visit_statuses(swh_storage, origin2.url, ov1.visit)
    )
    assert actual_visit_statuses == []
back to top