https://forge.softwareheritage.org/source/swh-scheduler.git
Raw File
Tip revision: 349fc1c83c54c82cf70836db00df1ed51f6cec21 authored by Jenkins for Software Heritage on 15 February 2019, 14:15:25 UTC
New upstream version 0.0.46
Tip revision: 349fc1c
backend_es.py
# Copyright (C) 2018  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information


"""Elastic Search backend

"""
from copy import deepcopy

from swh.core import utils
from elasticsearch import Elasticsearch
from elasticsearch import helpers


DEFAULT_CONFIG = {
    'elastic_search': {
        'storage_nodes': {'host': 'localhost', 'port': 9200},
        'index_name_prefix': 'swh-tasks',
        'client_options': {
            'sniff_on_start': False,
            'sniff_on_connection_fail': True,
            'http_compress': False,
        },
    },
}


class SWHElasticSearchClient:
    def __init__(self, **config):
        self.config = deepcopy(DEFAULT_CONFIG)
        self.config.update(config)
        es_conf = self.config['elastic_search']
        options = es_conf.get('client_options', {})
        self.storage = Elasticsearch(
            # nodes to use by default
            es_conf['storage_nodes'],
            # auto detect cluster's status
            sniff_on_start=options['sniff_on_start'],
            sniff_on_connection_fail=options['sniff_on_connection_fail'],
            sniffer_timeout=60,
            # compression or not
            http_compress=options['http_compress'])
        self.index_name_prefix = es_conf['index_name_prefix']
        # document's index type (cf. ../../data/elastic-template.json)
        self.doc_type = 'task'

    def compute_index_name(self, year, month):
        """Given a year, month, compute the index's name.

        """
        return '%s-%s-%s' % (
            self.index_name_prefix, year, '%02d' % month)

    def index(self, data):
        """Index given data to elasticsearch.

        The field 'ended' in data is used to compute the index to
        index data to.

        """
        date = data['ended']
        index_name = self.compute_index_name(date.year, date.month)
        return self.storage.index(index=index_name,
                                  doc_type=self.doc_type,
                                  body=data)

    def mget(self, index_name, doc_ids, chunk_size=500,
             source=True, log=None):
        """Retrieve document's full content according to their ids as per
           source's setup.

           The `source` permits to retrieve only what's of interest to
           us, e.g:
           - source=True ; gives back the original indexed data
           - source=False ; returns without the original _source field
           - source=['task_id'] ; returns only task_id in the _source field

        Args:
            index_name (str): Name of the concerned index.
            doc_ids (generator): Generator of ids to retrieve
            chunk_size (int): Number of documents chunk to send for retrieval
            source (bool/[str]): Source of information to return

        Yields:
            document indexed as per source's setup

        """
        if isinstance(source, list):
            source = {'_source': ','.join(source)}
        else:
            source = {'_source': str(source).lower()}

        for ids in utils.grouper(doc_ids, n=1000):
            res = self.storage.mget(body={'ids': list(ids)},
                                    index=index_name,
                                    doc_type=self.doc_type,
                                    params=source)
            if not res:
                if log:
                    log.error('Error during retrieval of data, skipping!')
                continue

            for doc in res['docs']:
                found = doc.get('found')
                if not found:
                    msg = 'Doc id %s not found, not indexed yet' % doc['_id']
                    if log:
                        log.warning(msg)
                    continue
                yield doc['_source']

    def _streaming_bulk(self, index_name, doc_stream, chunk_size=500,
                        log=None):
        """Bulk index data and returns the successful indexed data's
           identifier.

        Args:
            index_name (str): Name of the concerned index.
            doc_stream (generator): Generator of documents to index
            chunk_size (int): Number of documents chunk to send for indexation

        Yields:
            document id indexed

        """
        actions = ({'_index': index_name,
                    '_op_type': 'index',
                    '_type': self.doc_type,
                    '_source': data} for data in doc_stream)
        for ok, result in helpers.streaming_bulk(client=self.storage,
                                                 actions=actions,
                                                 chunk_size=chunk_size,
                                                 raise_on_error=False,
                                                 raise_on_exception=False):
            if not ok:
                if log:
                    log.error('Error during %s indexation. Skipping.' % result)
                continue
            yield result['index']['_id']

    def streaming_bulk(self, index_name, doc_stream, chunk_size=500,
                       source=True, log=None):
        """Bulk index data and returns the successful indexed data as per
           source's setup.

           the `source` permits to retrieve only what's of interest to
           us, e.g:

           - source=True ; gives back the original indexed data
           - source=False ; returns without the original _source field
           - source=['task_id'] ; returns only task_id in the _source field

        Args:
            index_name (str): Name of the concerned index.
            doc_stream (generator): Document generator to index
            chunk_size (int): Number of documents chunk to send
            source (bool, [str]): the information to return

        """

        indexed_ids = self._streaming_bulk(
            index_name, doc_stream, chunk_size=chunk_size, log=log)
        yield from self.mget(index_name, indexed_ids, chunk_size=chunk_size,
                             source=source, log=log)
back to top