Raw File
writer.py
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

from typing import Any, Dict, Iterable

from attr import evolve

from swh.model.model import (
    Origin,
    OriginVisit,
    OriginVisitStatus,
    Snapshot,
    Directory,
    Revision,
    Release,
    Content,
    SkippedContent,
    RawExtrinsicMetadata,
    MetadataFetcher,
    MetadataAuthority,
)

try:
    from swh.journal.writer import get_journal_writer
except ImportError:
    get_journal_writer = None  # type: ignore
    # mypy limitation, see https://github.com/python/mypy/issues/1153


class JournalWriter:
    """Journal writer storage collaborator. It's in charge of adding objects to
    the journal.

    """

    def __init__(self, journal_writer):
        if journal_writer:
            if get_journal_writer is None:
                raise EnvironmentError(
                    "You need the swh.journal package to use the "
                    "journal_writer feature"
                )
            self.journal = get_journal_writer(**journal_writer)
        else:
            self.journal = None

    def write_additions(self, obj_type, values) -> None:
        if self.journal:
            self.journal.write_additions(obj_type, values)

    def content_add(self, contents: Iterable[Content]) -> None:
        """Add contents to the journal. Drop the data field if provided.

        """
        contents = [evolve(item, data=None) for item in contents]
        self.write_additions("content", contents)

    def content_update(self, contents: Iterable[Dict[str, Any]]) -> None:
        if self.journal:
            raise NotImplementedError("content_update is not supported by the journal.")

    def content_add_metadata(self, contents: Iterable[Content]) -> None:
        self.content_add(contents)

    def skipped_content_add(self, contents: Iterable[SkippedContent]) -> None:
        self.write_additions("skipped_content", contents)

    def directory_add(self, directories: Iterable[Directory]) -> None:
        self.write_additions("directory", directories)

    def revision_add(self, revisions: Iterable[Revision]) -> None:
        self.write_additions("revision", revisions)

    def release_add(self, releases: Iterable[Release]) -> None:
        self.write_additions("release", releases)

    def snapshot_add(self, snapshots: Iterable[Snapshot]) -> None:
        self.write_additions("snapshot", snapshots)

    def origin_visit_add(self, visits: Iterable[OriginVisit]) -> None:
        self.write_additions("origin_visit", visits)

    def origin_visit_status_add(
        self, visit_statuses: Iterable[OriginVisitStatus]
    ) -> None:
        self.write_additions("origin_visit_status", visit_statuses)

    def origin_add(self, origins: Iterable[Origin]) -> None:
        self.write_additions("origin", origins)

    def raw_extrinsic_metadata_add(
        self, metadata: Iterable[RawExtrinsicMetadata]
    ) -> None:
        self.write_additions("raw_extrinsic_metadata", metadata)

    def metadata_fetcher_add(self, fetchers: Iterable[MetadataFetcher]) -> None:
        self.write_additions("metadata_fetcher", fetchers)

    def metadata_authority_add(self, authorities: Iterable[MetadataAuthority]) -> None:
        self.write_additions("metadata_authority", authorities)
back to top