https://forge.softwareheritage.org/source/swh-scheduler.git
Raw File
Tip revision: 3f424238e3c147aa04c99ac3b776f057332808c1 authored by Nicolas Dandrimont on 03 June 2020, 09:29:58 UTC
Add future dependency, missing from celery 4.4.4
Tip revision: 3f42423
elasticsearch_memory.py
# Copyright (C) 2018-2019  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

"""Memory Elastic Search backend

"""

import datetime  # noqa serialization purposes
import hashlib
import logging

from ast import literal_eval
from typing import Optional

import psycopg2  # noqa serialization purposes


logger = logging.getLogger(__name__)


class BasicSerializer:
    """For memory elastic search implementation (not for production)

    """

    def __init__(self, *args, **kwargs):
        pass

    def dumps(self, *args, **kwargs):
        return str(*args)


class BasicTransport:
    """For memory elastic search implementation, (not for production)

    """

    def __init__(self, *args, **kwargs):
        self.serializer = BasicSerializer()


class MemoryElasticsearch:
    """Memory Elasticsearch instance (for test purposes)

    Partial implementation oriented towards index storage (and not search)

    For now, its sole client is the scheduler for task archival purposes.

    """

    def __init__(self, *args, **kwargs):
        self.index = {}
        self.mapping = {}
        self.settings = {}
        self.indices = self  # HACK
        self.main_mapping_key: Optional[str] = None
        self.main_settings_key: Optional[str] = None
        self.transport = BasicTransport()

    def create(self, index, **kwargs):
        logger.debug(f"create index {index}")
        logger.debug(f"indices: {self.index}")
        logger.debug(f"mapping: {self.mapping}")
        logger.debug(f"settings: {self.settings}")
        self.index[index] = {
            "status": "opened",
            "data": {},
            "mapping": self.get_mapping(self.main_mapping_key),
            "settings": self.get_settings(self.main_settings_key),
        }
        logger.debug(f"index {index} created")

    def close(self, index, **kwargs):
        """Close index"""
        idx = self.index.get(index)
        if idx:
            idx["status"] = "closed"

    def open(self, index, **kwargs):
        """Open index"""
        idx = self.index.get(index)
        if idx:
            idx["status"] = "opened"

    def bulk(self, body, **kwargs):
        """Bulk insert document in index"""
        assert isinstance(body, str)
        all_data = body.split("\n")
        if all_data[-1] == "":
            all_data = all_data[:-1]  # drop the empty line if any
        ids = []
        # data is sent as tuple (index, data-to-index)
        for i in range(0, len(all_data), 2):
            # The first entry is about the index to use
            # not about a data to index
            # find the index
            index_data = literal_eval(all_data[i])
            idx_name = index_data["index"]["_index"]
            # associated data to index
            data = all_data[i + 1]
            _id = hashlib.sha1(data.encode("utf-8")).hexdigest()
            parsed_data = eval(data)  # for datetime
            self.index[idx_name]["data"][_id] = parsed_data
            ids.append(_id)

        # everything is indexed fine
        return {"items": [{"index": {"status": 200, "_id": _id,}} for _id in ids]}

    def mget(self, *args, body, index, **kwargs):
        """Bulk indexed documents retrieval"""
        idx = self.index[index]
        docs = []
        idx_docs = idx["data"]
        for _id in body["ids"]:
            doc = idx_docs.get(_id)
            if doc:
                d = {
                    "found": True,
                    "_source": doc,
                }
                docs.append(d)
        return {"docs": docs}

    def stats(self, index, **kwargs):
        idx = self.index[index]  # will raise if it does not exist
        if not idx or idx["status"] == "closed":
            raise ValueError("Closed index")  # simulate issue if index closed

    def exists(self, index, **kwargs):
        return self.index.get(index) is not None

    def put_mapping(self, index, body, **kwargs):
        self.mapping[index] = body
        self.main_mapping_key = index

    def get_mapping(self, index, **kwargs):
        return self.mapping.get(index) or self.index.get(index, {}).get("mapping", {})

    def put_settings(self, index, body, **kwargs):
        self.settings[index] = body
        self.main_settings_key = index

    def get_settings(self, index, **kwargs):
        return self.settings.get(index) or self.index.get(index, {}).get("settings", {})
back to top