swh:1:snp:eb70f1f85391e4b077c211bec36af0061c4bf937
Raw File
Tip revision: 85189d3f7d069e4eccc3275eb9c094c60452b778 authored by Jenkins for Software Heritage on 04 September 2020, 13:52:12 UTC
New upstream version 0.14.1
Tip revision: 85189d3
retry.py
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import logging
import traceback

from typing import Dict, Iterable, List, Optional

from tenacity import (
    retry,
    stop_after_attempt,
    wait_random_exponential,
)

from swh.model.model import (
    Content,
    SkippedContent,
    Directory,
    Revision,
    Release,
    Snapshot,
    OriginVisit,
    MetadataAuthority,
    MetadataFetcher,
    RawExtrinsicMetadata,
)

from swh.storage import get_storage
from swh.storage.exc import StorageArgumentException
from swh.storage.interface import StorageInterface


logger = logging.getLogger(__name__)


def should_retry_adding(retry_state) -> bool:
    """Retry if the error/exception is (probably) not about a caller error

    """
    try:
        attempt = retry_state.outcome
    except AttributeError:
        # tenacity < 5.0
        attempt = retry_state

    if attempt.failed:
        error = attempt.exception()
        if isinstance(error, StorageArgumentException):
            # Exception is due to an invalid argument
            return False
        else:
            # Other exception
            module = getattr(error, "__module__", None)
            if module:
                error_name = error.__module__ + "." + error.__class__.__name__
            else:
                error_name = error.__class__.__name__
            logger.warning(
                "Retry adding a batch",
                exc_info=False,
                extra={
                    "swh_type": "storage_retry",
                    "swh_exception_type": error_name,
                    "swh_exception": traceback.format_exc(),
                },
            )
            return True
    else:
        # No exception
        return False


swh_retry = retry(
    retry=should_retry_adding,
    wait=wait_random_exponential(multiplier=1, max=10),
    stop=stop_after_attempt(3),
)


class RetryingProxyStorage:
    """Storage implementation which retries adding objects when it specifically
       fails (hash collision, integrity error).

    """

    def __init__(self, storage):
        self.storage: StorageInterface = get_storage(**storage)

    def __getattr__(self, key):
        if key == "storage":
            raise AttributeError(key)
        return getattr(self.storage, key)

    @swh_retry
    def content_add(self, content: List[Content]) -> Dict:
        return self.storage.content_add(content)

    @swh_retry
    def content_add_metadata(self, content: List[Content]) -> Dict:
        return self.storage.content_add_metadata(content)

    @swh_retry
    def skipped_content_add(self, content: List[SkippedContent]) -> Dict:
        return self.storage.skipped_content_add(content)

    @swh_retry
    def origin_visit_add(self, visits: List[OriginVisit]) -> Iterable[OriginVisit]:
        return self.storage.origin_visit_add(visits)

    @swh_retry
    def metadata_fetcher_add(self, fetchers: List[MetadataFetcher],) -> None:
        return self.storage.metadata_fetcher_add(fetchers)

    @swh_retry
    def metadata_authority_add(self, authorities: List[MetadataAuthority]) -> None:
        return self.storage.metadata_authority_add(authorities)

    @swh_retry
    def raw_extrinsic_metadata_add(self, metadata: List[RawExtrinsicMetadata],) -> None:
        return self.storage.raw_extrinsic_metadata_add(metadata)

    @swh_retry
    def directory_add(self, directories: List[Directory]) -> Dict:
        return self.storage.directory_add(directories)

    @swh_retry
    def revision_add(self, revisions: List[Revision]) -> Dict:
        return self.storage.revision_add(revisions)

    @swh_retry
    def release_add(self, releases: List[Release]) -> Dict:
        return self.storage.release_add(releases)

    @swh_retry
    def snapshot_add(self, snapshots: List[Snapshot]) -> Dict:
        return self.storage.snapshot_add(snapshots)

    def clear_buffers(self, object_types: Optional[List[str]] = None) -> None:
        return self.storage.clear_buffers(object_types)

    def flush(self, object_types: Optional[List[str]] = None) -> Dict:
        """Specific case for buffer proxy storage failing to flush data

        """
        return self.storage.flush(object_types)
back to top