Revision 182ee497328a55d99609e5230146268a108508e3 authored by Antoine R. Dumont (@ardumont) on 24 June 2020, 16:04:29 UTC, committed by Antoine R. Dumont (@ardumont) on 26 June 2020, 11:22:40 UTC
This allows clients to search from most recent to oldest visit when calling the endpoint with the "order" parameter set to "desc" (visit id desc). This keeps and explicits the existing sorting order as visit id "asc". Related to T2310
1 parent f75cd41
replay.py
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import logging
from typing import Any, Callable, Dict, Iterable, List
try:
from systemd.daemon import notify
except ImportError:
notify = None
from swh.core.statsd import statsd
from swh.storage.fixer import fix_objects
from swh.model.model import (
BaseContent,
BaseModel,
Content,
Directory,
Origin,
OriginVisit,
OriginVisitStatus,
Revision,
SkippedContent,
Snapshot,
Release,
)
from swh.storage.exc import HashCollision
logger = logging.getLogger(__name__)
GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total"
GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds"
object_converter_fn: Dict[str, Callable[[Dict], BaseModel]] = {
"origin": Origin.from_dict,
"origin_visit": OriginVisit.from_dict,
"origin_visit_status": OriginVisitStatus.from_dict,
"snapshot": Snapshot.from_dict,
"revision": Revision.from_dict,
"release": Release.from_dict,
"directory": Directory.from_dict,
"content": Content.from_dict,
"skipped_content": SkippedContent.from_dict,
}
def process_replay_objects(all_objects, *, storage):
for (object_type, objects) in all_objects.items():
logger.debug("Inserting %s %s objects", len(objects), object_type)
with statsd.timed(GRAPH_DURATION_METRIC, tags={"object_type": object_type}):
_insert_objects(object_type, objects, storage)
statsd.increment(
GRAPH_OPERATIONS_METRIC, len(objects), tags={"object_type": object_type}
)
if notify:
notify("WATCHDOG=1")
def collision_aware_content_add(
content_add_fn: Callable[[Iterable[Any]], None], contents: List[BaseContent]
) -> None:
"""Add contents to storage. If a hash collision is detected, an error is
logged. Then this adds the other non colliding contents to the storage.
Args:
content_add_fn: Storage content callable
contents: List of contents or skipped contents to add to storage
"""
if not contents:
return
colliding_content_hashes: List[Dict[str, Any]] = []
while True:
try:
content_add_fn(contents)
except HashCollision as e:
colliding_content_hashes.append(
{
"algo": e.algo,
"hash": e.hash_id, # hex hash id
"objects": e.colliding_contents, # hex hashes
}
)
colliding_hashes = e.colliding_content_hashes()
# Drop the colliding contents from the transaction
contents = [c for c in contents if c.hashes() not in colliding_hashes]
else:
# Successfully added contents, we are done
break
if colliding_content_hashes:
for collision in colliding_content_hashes:
logger.error("Collision detected: %(collision)s", {"collision": collision})
def _insert_objects(object_type: str, objects: List[Dict], storage) -> None:
"""Insert objects of type object_type in the storage.
"""
objects = fix_objects(object_type, objects)
if object_type == "content":
# for bw compat, skipped content should now be delivered in the skipped_content
# topic
contents: List[BaseContent] = []
skipped_contents: List[BaseContent] = []
for content in objects:
c = BaseContent.from_dict(content)
if isinstance(c, SkippedContent):
skipped_contents.append(c)
else:
contents.append(c)
collision_aware_content_add(storage.skipped_content_add, skipped_contents)
collision_aware_content_add(storage.content_add_metadata, contents)
if object_type == "skipped_content":
skipped_contents = [SkippedContent.from_dict(obj) for obj in objects]
collision_aware_content_add(storage.skipped_content_add, skipped_contents)
elif object_type in ("origin_visit", "origin_visit_status"):
origins: List[Origin] = []
converter_fn = object_converter_fn[object_type]
model_objs = []
for obj in objects:
origins.append(Origin(url=obj["origin"]))
model_objs.append(converter_fn(obj))
storage.origin_add(origins)
method = getattr(storage, f"{object_type}_add")
method(model_objs)
elif object_type in ("directory", "revision", "release", "snapshot", "origin",):
method = getattr(storage, object_type + "_add")
method(object_converter_fn[object_type](o) for o in objects)
else:
logger.warning("Received a series of %s, this should not happen", object_type)
Computing file changes ...