swh:1:snp:eb70f1f85391e4b077c211bec36af0061c4bf937
Raw File
Tip revision: a119a7a2d8534f426e9f7dd23a86a3dbaa2596b9 authored by Antoine R. Dumont (@ardumont) on 03 February 2020, 15:01:20 UTC
d/changelog: Update dependencies
Tip revision: a119a7a
retry.py
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import logging
import psycopg2
import traceback

from datetime import datetime
from typing import Dict, Iterable, List, Optional, Union

from requests.exceptions import ConnectionError
from tenacity import (
    retry, stop_after_attempt, wait_random_exponential, retry_if_exception_type
)

from swh.storage import get_storage, HashCollision


logger = logging.getLogger(__name__)


RETRY_EXCEPTIONS = [
    # raised when two parallel insertions insert the same data
    psycopg2.IntegrityError,
    HashCollision,
    # when the server is restarting
    ConnectionError,
]


def should_retry_adding(error: Exception) -> bool:
    """Retry if the error/exception if one of the RETRY_EXCEPTIONS type.

    """
    for exc in RETRY_EXCEPTIONS:
        if retry_if_exception_type(exc)(error):
            error_name = error.__module__ + '.' + error.__class__.__name__
            logger.warning('Retry adding a batch', exc_info=False, extra={
                'swh_type': 'storage_retry',
                'swh_exception_type': error_name,
                'swh_exception': traceback.format_exc(),
            })
            return True
    return False


swh_retry = retry(retry=should_retry_adding,
                  wait=wait_random_exponential(multiplier=1, max=10),
                  stop=stop_after_attempt(3))


class RetryingProxyStorage:
    """Storage implementation which retries adding objects when it specifically
       fails (hash collision, integrity error).

    """
    def __init__(self, storage):
        self.storage = get_storage(**storage)

    def __getattr__(self, key):
        return getattr(self.storage, key)

    @swh_retry
    def content_add(self, content: List[Dict]) -> Dict:
        contents = list(content)
        return self.storage.content_add(contents)

    @swh_retry
    def content_add_metadata(self, content: List[Dict]) -> Dict:
        contents = list(content)
        return self.storage.content_add_metadata(contents)

    @swh_retry
    def origin_add_one(self, origin: Dict) -> str:
        return self.storage.origin_add_one(origin)

    @swh_retry
    def origin_visit_add(self, origin: Dict,
                         date: Union[datetime, str], type: str) -> Dict:
        return self.storage.origin_visit_add(origin, date, type)

    @swh_retry
    def origin_visit_update(
            self, origin: str, visit_id: int, status: Optional[str] = None,
            metadata: Optional[Dict] = None,
            snapshot: Optional[Dict] = None) -> Dict:
        return self.storage.origin_visit_update(
            origin, visit_id, status=status,
            metadata=metadata, snapshot=snapshot)

    @swh_retry
    def tool_add(self, tools: List[Dict]) -> List[Dict]:
        tools = list(tools)
        return self.storage.tool_add(tools)

    @swh_retry
    def metadata_provider_add(
            self, provider_name: str, provider_type: str, provider_url: str,
            metadata: Dict) -> Union[str, int]:
        return self.storage.metadata_provider_add(
            provider_name, provider_type, provider_url, metadata)

    @swh_retry
    def origin_metadata_add(
            self, origin_url: str, ts: Union[str, datetime],
            provider_id: int, tool_id: int, metadata: Dict) -> None:
        return self.storage.origin_metadata_add(
            origin_url, ts, provider_id, tool_id, metadata)

    @swh_retry
    def directory_add(self, directories: List[Dict]) -> Dict:
        directories = list(directories)
        return self.storage.directory_add(directories)

    @swh_retry
    def revision_add(self, revisions: List[Dict]) -> Dict:
        revisions = list(revisions)
        return self.storage.revision_add(revisions)

    @swh_retry
    def release_add(self, releases: List[Dict]) -> Dict:
        releases = list(releases)
        return self.storage.release_add(releases)

    @swh_retry
    def snapshot_add(self, snapshot: List[Dict]) -> Dict:
        snapshots = list(snapshot)
        return self.storage.snapshot_add(snapshots)

    @swh_retry
    def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict:
        """Specific case for buffer proxy storage failing to flush data

        """
        if hasattr(self.storage, 'flush'):
            return self.storage.flush(object_types)
        return {}
back to top