Revision 414d85f8b14e52b06914c93cdf822fa34f7d5493 authored by Daniel Mitterdorfer on 19 April 2018, 10:54:14 UTC, committed by Daniel Mitterdorfer on 19 April 2018, 10:54:14 UTC
1 parent 4b99c70
Raw File
metrics.py
import collections
import logging
import math
import pickle
import statistics
import sys
import zlib
from enum import Enum, IntEnum

import certifi
import tabulate
from esrally import time, exceptions, config, version, paths
from esrally.utils import console, io, versions

logger = logging.getLogger("rally.metrics")


class EsClient:
    """
    Provides a stripped-down client interface that is easier to exchange for testing
    """

    def __init__(self, client):
        self._client = client

    def put_template(self, name, template):
        return self.guarded(self._client.indices.put_template, name, template)

    def template_exists(self, name):
        return self.guarded(self._client.indices.exists_template, name)

    def delete_template(self,  name):
        self.guarded(self._client.indices.delete_template, name)

    def create_index(self, index):
        # ignore 400 cause by IndexAlreadyExistsException when creating an index
        return self.guarded(self._client.indices.create, index=index, ignore=400)

    def exists(self, index):
        return self.guarded(self._client.indices.exists, index=index)

    def refresh(self, index):
        return self.guarded(self._client.indices.refresh, index=index)

    def bulk_index(self, index, doc_type, items):
        import elasticsearch.helpers
        self.guarded(elasticsearch.helpers.bulk, self._client, items, index=index, doc_type=doc_type)

    def index(self, index, doc_type, item):
        self.guarded(self._client.index, index=index, doc_type=doc_type, body=item)

    def search(self, index, doc_type, body):
        return self.guarded(self._client.search, index=index, doc_type=doc_type, body=body)

    def guarded(self, target, *args, **kwargs):
        import elasticsearch
        max_execution_count = 3
        execution_count = 0

        while execution_count < max_execution_count:
            execution_count += 1
            try:
                return target(*args, **kwargs)
            except elasticsearch.exceptions.AuthenticationException:
                # we know that it is just one host (see EsClientFactory)
                node = self._client.transport.hosts[0]
                msg = "The configured user could not authenticate against your Elasticsearch metrics store running on host [%s] at " \
                      "port [%s] (wrong password?). Please fix the configuration in [%s]." % \
                      (node["host"], node["port"], config.ConfigFile().location)
                logger.exception(msg)
                raise exceptions.SystemSetupError(msg)
            except elasticsearch.exceptions.AuthorizationException:
                node = self._client.transport.hosts[0]
                msg = "The configured user does not have enough privileges to run the operation [%s] against your Elasticsearch metrics " \
                      "store running on host [%s] at port [%s]. Please adjust your x-pack configuration or specify a user with enough " \
                      "privileges in the configuration in [%s]." % \
                      (target.__name__, node["host"], node["port"], config.ConfigFile().location)
                logger.exception(msg)
                raise exceptions.SystemSetupError(msg)
            except elasticsearch.exceptions.ConnectionTimeout:
                if execution_count < max_execution_count:
                    logger.info("Received a connection timeout from the metrics store in attempt [%d/%d]." %
                                (execution_count, max_execution_count))
                    time.sleep(1)
                else:
                    operation = target.__name__
                    logger.exception("Got a connection timeout while running [%s] (retried %d times)." % (operation, max_execution_count))
                    node = self._client.transport.hosts[0]
                    msg = "A connection timeout occurred while running the operation [%s] against your Elasticsearch metrics store on " \
                          "host [%s] at port [%s]." % (operation, node["host"], node["port"])
                    raise exceptions.RallyError(msg)
            except elasticsearch.exceptions.ConnectionError:
                node = self._client.transport.hosts[0]
                msg = "Could not connect to your Elasticsearch metrics store. Please check that it is running on host [%s] at port [%s]" \
                      " or fix the configuration in [%s]." % (node["host"], node["port"], config.ConfigFile().location)
                logger.exception(msg)
                raise exceptions.SystemSetupError(msg)
            except elasticsearch.TransportError as e:
                # gateway timeout - let's wait a bit and retry
                if e.status_code == 504 and execution_count < max_execution_count:
                    logger.info("Received a gateway timeout from the metrics store in attempt [%d/%d]." %
                                (execution_count, max_execution_count))
                    time.sleep(1)
                else:
                    node = self._client.transport.hosts[0]
                    msg = "A transport error occurred while running the operation [%s] against your Elasticsearch metrics store on " \
                          "host [%s] at port [%s]." % (target.__name__, node["host"], node["port"])
                    logger.exception(msg)
                    raise exceptions.RallyError(msg)

            except elasticsearch.exceptions.ElasticsearchException:
                node = self._client.transport.hosts[0]
                msg = "An unknown error occurred while running the operation [%s] against your Elasticsearch metrics store on host [%s] " \
                      "at port [%s]." % (target.__name__, node["host"], node["port"])
                logger.exception(msg)
                # this does not necessarily mean it's a system setup problem...
                raise exceptions.RallyError(msg)


class EsClientFactory:
    """
    Abstracts how the Elasticsearch client is created. Intended for testing.
    """

    def __init__(self, cfg):
        self._config = cfg
        host = self._config.opts("reporting", "datastore.host")
        port = self._config.opts("reporting", "datastore.port")
        # poor man's boolean conversion
        secure = self._config.opts("reporting", "datastore.secure") == "True"
        user = self._config.opts("reporting", "datastore.user")
        password = self._config.opts("reporting", "datastore.password")
        verify = self._config.opts("reporting", "datastore.ssl.verification_mode", default_value="full", mandatory=False) != "none"
        ca_path = self._config.opts("reporting", "datastore.ssl.certificate_authorities", default_value=None, mandatory=False)

        from esrally import client

        # Instead of duplicating code, we're just adapting the metrics store specific properties to match the regular client options.
        client_options = {
            "use_ssl": secure,
            "verify_certs": verify,
            "timeout": 120
        }
        if ca_path:
            client_options["ca_certs"] = ca_path
        if user and password:
            client_options["basic_auth_user"] = user
            client_options["basic_auth_password"] = password

        logger.info("Creating connection to metrics store at %s:%s", host, port)
        factory = client.EsClientFactory(hosts=[{"host": host, "port": port}], client_options=client_options)
        self._client = factory.create()

    def create(self):
        return EsClient(self._client)


class IndexTemplateProvider:
    """
    Abstracts how the Rally index template is retrieved. Intended for testing.
    """

    def __init__(self, cfg):
        self.script_dir = cfg.opts("node", "rally.root")

    def metrics_template(self):
        return self._read("metrics-template")

    def races_template(self):
        return self._read("races-template")

    def results_template(self):
        return self._read("results-template")

    def _read(self, template_name):
        with open("%s/resources/%s.json" % (self.script_dir, template_name), encoding="utf-8") as f:
            return f.read()


class MetaInfoScope(Enum):
    """
    Defines the scope of a meta-information. Meta-information provides more context for a metric, for example the concrete version
    of Elasticsearch that has been benchmarked or environment information like CPU model or OS.
    """
    cluster = 1
    """
    Cluster level meta-information is valid for all nodes in the cluster (e.g. the benchmarked Elasticsearch version)
    """
    # host = 2
    """
    Host level meta-information is valid for all nodes on the same host (e.g. the OS name and version)
    """
    node = 3
    """
    Node level meta-information is valid for a single node (e.g. GC times)
    """


def metrics_store(cfg, read_only=True, track=None, challenge=None, car=None, meta_info=None, lap=None):
    """
    Creates a proper metrics store based on the current configuration.

    :param cfg: Config object.
    :param read_only: Whether to open the metrics store only for reading (Default: True).
    :return: A metrics store implementation.
    """
    cls = metrics_store_class(cfg)
    store = cls(cfg=cfg, meta_info=meta_info, lap=lap)
    logger.info("Creating %s" % str(store))

    trial_id = cfg.opts("system", "trial.id")
    trial_timestamp = cfg.opts("system", "time.start")
    selected_car = cfg.opts("mechanic", "car.names") if car is None else car

    store.open(trial_id, trial_timestamp, track, challenge, selected_car, create=not read_only)
    return store


def metrics_store_class(cfg):
    if cfg.opts("reporting", "datastore.type") == "elasticsearch":
        return EsMetricsStore
    else:
        return InMemoryMetricsStore


def extract_user_tags_from_config(cfg):
    """
    Extracts user tags into a structured dict

    :param cfg: The current configuration object.
    :return: A dict containing user tags. If no user tags are given, an empty dict is returned.
    """
    user_tags = cfg.opts("race", "user.tag", mandatory=False)
    return extract_user_tags_from_string(user_tags)


def extract_user_tags_from_string(user_tags):
    """
    Extracts user tags into a structured dict

    :param user_tags: A string containing user tags (tags separated by comma, key and value separated by colon).
    :return: A dict containing user tags. If no user tags are given, an empty dict is returned.
    """
    user_tags_dict = {}
    if user_tags and user_tags.strip() != "":
        try:
            for user_tag in user_tags.split(","):
                user_tag_key, user_tag_value = user_tag.split(":")
                user_tags_dict[user_tag_key] = user_tag_value
        except ValueError:
            msg = "User tag keys and values have to separated by a ':'. Invalid value [%s]" % user_tags
            logger.exception(msg)
            raise exceptions.SystemSetupError(msg)
    return user_tags_dict


class SampleType(IntEnum):
    Warmup = 0,
    Normal = 1


class MetricsStore:
    """
    Abstract metrics store
    """

    def __init__(self, cfg, clock=time.Clock, meta_info=None, lap=None):
        """
        Creates a new metrics store.

        :param cfg: The config object. Mandatory.
        :param clock: This parameter is optional and needed for testing.
        :param meta_info: This parameter is optional and intended for creating a metrics store with a previously serialized meta-info.
        :param lap: This parameter is optional and intended for creating a metrics store with a previously serialized lap.
        """
        self._config = cfg
        self._trial_id = None
        self._trial_timestamp = None
        self._track = None
        self._track_params = cfg.opts("track", "params")
        self._challenge = None
        self._car = None
        self._car_name = None
        self._lap = lap
        self._environment_name = cfg.opts("system", "env.name")
        self.opened = False
        if meta_info is None:
            self._meta_info = {
                MetaInfoScope.cluster: {},
                MetaInfoScope.node: {}
            }
        else:
            self._meta_info = meta_info
        self._clock = clock
        self._stop_watch = self._clock.stop_watch()

    def open(self, trial_id=None, trial_timestamp=None, track_name=None, challenge_name=None, car_name=None, ctx=None, create=False):
        """
        Opens a metrics store for a specific trial, track, challenge and car.

        :param trial_id: The trial id. This attribute is sufficient to uniquely identify a challenge.
        :param trial_timestamp: The trial timestamp as a datetime.
        :param track_name: Track name.
        :param challenge_name: Challenge name.
        :param car_name: Car name.
        :param ctx: An metrics store open context retrieved from another metrics store with ``#open_context``.
        :param create: True if an index should be created (if necessary). This is typically True, when attempting to write metrics and
        False when it is just opened for reading (as we can assume all necessary indices exist at this point).
        """
        if ctx:
            self._trial_id = ctx["trial-id"]
            self._trial_timestamp = ctx["trial-timestamp"]
            self._track = ctx["track"]
            self._challenge = ctx["challenge"]
            self._car = ctx["car"]
        else:
            self._trial_id = trial_id
            self._trial_timestamp = time.to_iso8601(trial_timestamp)
            self._track = track_name
            self._challenge = challenge_name
            self._car = car_name
        assert self._trial_id is not None, "Attempting to open metrics store without a trial id"
        assert self._trial_timestamp is not None, "Attempting to open metrics store without a trial timestamp"
        assert self._track is not None, "Attempting to open metrics store without a track"
        assert self._challenge is not None, "Attempting to open metrics store without a challenge"
        assert self._car is not None, "Attempting to open metrics store without a car"

        self._car_name = "+".join(self._car) if isinstance(self._car, list) else self._car

        logger.info("Opening metrics store for trial timestamp=[%s], track=[%s], challenge=[%s], car=[%s]" %
                    (self._trial_timestamp, self._track, self._challenge, self._car))

        user_tags = extract_user_tags_from_config(self._config)
        for k, v in user_tags.items():
            # prefix user tag with "tag_" in order to avoid clashes with our internal meta data
            self.add_meta_info(MetaInfoScope.cluster, None, "tag_%s" % k, v)
        # Don't store it for each metrics record as it's probably sufficient on race level
        # self.add_meta_info(MetaInfoScope.cluster, None, "rally_version", version.version())
        self._stop_watch.start()
        self.opened = True

    def reset_relative_time(self):
        """
        Resets the internal relative-time counter to zero.
        """
        self._stop_watch.start()

    @property
    def lap(self):
        return self._lap

    @lap.setter
    def lap(self, lap):
        self._lap = lap

    def flush(self, refresh=True):
        """
        Explicitly flushes buffered metrics to the metric store. It is not required to flush before closing the metrics store.
        """
        raise NotImplementedError("abstract method")

    def close(self):
        logger.info("Closing metrics store.")
        """
        Closes the metric store. Note that it is mandatory to close the metrics store when it is no longer needed as it only persists
        metrics on close (in order to avoid additional latency during the benchmark).
        """
        self.flush()
        self.clear_meta_info()
        self.lap = None
        self.opened = False

    def add_meta_info(self, scope, scope_key, key, value):
        """
        Adds new meta information to the metrics store. All metrics entries that are created after calling this method are guaranteed to
        contain the added meta info (provided is on the same level or a level below, e.g. a cluster level metric will not contain node
        level meta information but all cluster level meta information will be contained in a node level metrics record).

        :param scope: The scope of the meta information. See MetaInfoScope.
        :param scope_key: The key within the scope. For cluster level metrics None is expected, for node level metrics the node name.
        :param key: The key of the meta information.
        :param value: The value of the meta information.
        """
        if scope == MetaInfoScope.cluster:
            self._meta_info[MetaInfoScope.cluster][key] = value
        elif scope == MetaInfoScope.node:
            if scope_key not in self._meta_info[MetaInfoScope.node]:
                self._meta_info[MetaInfoScope.node][scope_key] = {}
            self._meta_info[MetaInfoScope.node][scope_key][key] = value
        else:
            raise exceptions.SystemSetupError("Unknown meta info scope [%s]" % scope)

    def clear_meta_info(self):
        """
        Clears all internally stored meta-info. This is considered Rally internal API and not intended for normal client consumption.
        """
        self._meta_info = {
            MetaInfoScope.cluster: {},
            MetaInfoScope.node: {}
        }

    def merge_meta_info(self, to_merge):
        """
        Merges the current meta info with another one.

        :param to_merge: A meta info representation that should be merged with the current one.
        """
        if MetaInfoScope.cluster in to_merge:
            self._meta_info[MetaInfoScope.cluster].update(to_merge[MetaInfoScope.cluster])
        if MetaInfoScope.node in to_merge:
            self._meta_info[MetaInfoScope.node].update(to_merge[MetaInfoScope.node])

    @property
    def meta_info(self):
        """
        :return: All currently stored meta-info. This is considered Rally internal API and not intended for normal client consumption.
        """
        return self._meta_info

    @meta_info.setter
    def meta_info(self, meta_info):
        self._meta_info = meta_info

    @property
    def open_context(self):
        return {
            "trial-id": self._trial_id,
            "trial-timestamp": self._trial_timestamp,
            "track": self._track,
            "challenge": self._challenge,
            "car": self._car
        }

    def put_count_cluster_level(self, name, count, unit=None, task=None, operation=None, operation_type=None, sample_type=SampleType.Normal,
                                absolute_time=None, relative_time=None, meta_data=None):
        """
        Adds a new cluster level counter metric.

        :param name: The name of the metric.
        :param count: The metric value. It is expected to be of type int (otherwise use put_value_*).
        :param unit: A count may or may not have unit.
        :param task: The task name to which this value applies. Optional. Defaults to None.
        :param operation: The operation name to which this value applies. Optional. Defaults to None.
        :param operation_type: The operation type to which this value applies. Optional. Defaults to None.
        :param sample_type: Whether this is a warmup or a normal measurement sample. Defaults to SampleType.Normal.
        :param absolute_time: The absolute timestamp in seconds since epoch when this metric record is stored. Defaults to None. The metrics
               store will derive the timestamp automatically.
        :param relative_time: The relative timestamp in seconds since the start of the benchmark when this metric record is stored.
               Defaults to None. The metrics store will derive the timestamp automatically.
        :param meta_data: A dict, containing additional key-value pairs. Defaults to None.
        """
        self._put(MetaInfoScope.cluster, None, name, count, unit, task, operation, operation_type, sample_type, absolute_time,
                  relative_time, meta_data)

    def put_count_node_level(self, node_name, name, count, unit=None, task=None, operation=None, operation_type=None,
                             sample_type=SampleType.Normal, absolute_time=None, relative_time=None, meta_data=None):
        """
        Adds a new node level counter metric.

        :param name: The name of the metric.
        :param node_name: The name of the cluster node for which this metric has been determined.
        :param count: The metric value. It is expected to be of type int (otherwise use put_value_*).
        :param unit: A count may or may not have unit.
        :param task: The task name to which this value applies. Optional. Defaults to None.
        :param operation: The operation name to which this value applies. Optional. Defaults to None.
        :param operation_type: The operation type to which this value applies. Optional. Defaults to None.
        :param sample_type Whether this is a warmup or a normal measurement sample. Defaults to SampleType.Normal.
        :param absolute_time: The absolute timestamp in seconds since epoch when this metric record is stored. Defaults to None. The metrics
               store will derive the timestamp automatically.
        :param relative_time: The relative timestamp in seconds since the start of the benchmark when this metric record is stored.
               Defaults to None. The metrics store will derive the timestamp automatically.
        :param meta_data: A dict, containing additional key-value pairs. Defaults to None.
        """
        self._put(MetaInfoScope.node, node_name, name, count, unit, task, operation, operation_type, sample_type, absolute_time,
                  relative_time, meta_data)

    # should be a float
    def put_value_cluster_level(self, name, value, unit, task=None, operation=None, operation_type=None, sample_type=SampleType.Normal,
                                absolute_time=None, relative_time=None, meta_data=None):
        """
        Adds a new cluster level value metric.

        :param name: The name of the metric.
        :param value: The metric value. It is expected to be of type float (otherwise use put_count_*).
        :param unit: The unit of this metric value (e.g. ms, docs/s).
        :param task: The task name to which this value applies. Optional. Defaults to None.
        :param operation: The operation name to which this value applies. Optional. Defaults to None.
        :param operation_type: The operation type to which this value applies. Optional. Defaults to None.
        :param sample_type: Whether this is a warmup or a normal measurement sample. Defaults to SampleType.Normal.
        :param absolute_time: The absolute timestamp in seconds since epoch when this metric record is stored. Defaults to None. The metrics
               store will derive the timestamp automatically.
        :param relative_time: The relative timestamp in seconds since the start of the benchmark when this metric record is stored.
               Defaults to None. The metrics store will derive the timestamp automatically.
        :param meta_data: A dict, containing additional key-value pairs. Defaults to None.
        """
        self._put(MetaInfoScope.cluster, None, name, value, unit, task, operation, operation_type, sample_type, absolute_time,
                  relative_time, meta_data)

    def put_value_node_level(self, node_name, name, value, unit, task=None, operation=None, operation_type=None,
                             sample_type=SampleType.Normal, absolute_time=None, relative_time=None, meta_data=None):
        """
        Adds a new node level value metric.

        :param name: The name of the metric.
        :param node_name: The name of the cluster node for which this metric has been determined.
        :param value: The metric value. It is expected to be of type float (otherwise use put_count_*).
        :param unit: The unit of this metric value (e.g. ms, docs/s)
        :param task: The task name to which this value applies. Optional. Defaults to None.
        :param operation: The operation name to which this value applies. Optional. Defaults to None.
        :param operation_type: The operation type to which this value applies. Optional. Defaults to None.
        :param sample_type: Whether this is a warmup or a normal measurement sample. Defaults to SampleType.Normal.
        :param absolute_time: The absolute timestamp in seconds since epoch when this metric record is stored. Defaults to None. The metrics
               store will derive the timestamp automatically.
        :param relative_time: The relative timestamp in seconds since the start of the benchmark when this metric record is stored.
               Defaults to None. The metrics store will derive the timestamp automatically.
        :param meta_data: A dict, containing additional key-value pairs. Defaults to None.
        """
        self._put(MetaInfoScope.node, node_name, name, value, unit, task, operation, operation_type, sample_type, absolute_time,
                  relative_time, meta_data)

    def _put(self, level, level_key, name, value, unit, task, operation, operation_type, sample_type, absolute_time=None,
             relative_time=None, meta_data=None):
        if level == MetaInfoScope.cluster:
            meta = self._meta_info[MetaInfoScope.cluster].copy()
        elif level == MetaInfoScope.node:
            meta = self._meta_info[MetaInfoScope.cluster].copy()
            if level_key in self._meta_info[MetaInfoScope.node]:
                meta.update(self._meta_info[MetaInfoScope.node][level_key])
        else:
            raise exceptions.SystemSetupError("Unknown meta info level [%s] for metric [%s]" % (level, name))
        if meta_data:
            meta.update(meta_data)

        if absolute_time is None:
            absolute_time = self._clock.now()
        if relative_time is None:
            relative_time = self._stop_watch.split_time()

        doc = {
            "@timestamp": time.to_epoch_millis(absolute_time),
            "relative-time": int(relative_time * 1000 * 1000),
            "trial-id": self._trial_id,
            "trial-timestamp": self._trial_timestamp,
            "environment": self._environment_name,
            "track": self._track,
            "lap": self._lap,
            "challenge": self._challenge,
            "car": self._car_name,
            "name": name,
            "value": value,
            "unit": unit,
            "sample-type": sample_type.name.lower(),
            "meta": meta
        }
        if task:
            doc["task"] = task
        if operation:
            doc["operation"] = operation
        if operation_type:
            doc["operation-type"] = operation_type
        if self._track_params:
            doc["track-params"] = self._track_params

        assert self.lap is not None, "Attempting to store [%s] without a lap." % doc
        self._add(doc)

    def bulk_add(self, memento):
        """
        Adds raw metrics store documents previously created with #to_externalizable()

        :param memento: The external representation as returned by #to_externalizable().
        """
        if memento:
            logger.info("Restoring in-memory representation of metrics store.")
            for doc in pickle.loads(zlib.decompress(memento)):
                self._add(doc)

    def to_externalizable(self, clear=False):
        raise NotImplementedError("abstract method")

    def _add(self, doc):
        """
        Adds a new document to the metrics store

        :param doc: The new document.
        """
        raise NotImplementedError("abstract method")

    def get_one(self, name, sample_type=None, lap=None):
        """
        Gets one value for the given metric name (even if there should be more than one).

        :param name: The metric name to query.
        :param sample_type The sample type to query. Optional. By default, all samples are considered.
        :param lap The lap to query. Optional. By default, all laps are considered.
        :return: The corresponding value for the given metric name or None if there is no value.
        """
        return self._first_or_none(self.get(name=name, sample_type=sample_type, lap=lap))

    def _first_or_none(self, values):
        return values[0] if values else None

    def get(self, name, task=None, operation_type=None, sample_type=None, lap=None):
        """
        Gets all raw values for the given metric name.

        :param name: The metric name to query.
        :param task The task name to query. Optional.
        :param operation_type The operation type to query. Optional.
        :param sample_type The sample type to query. Optional. By default, all samples are considered.
        :param lap The lap to query. Optional. By default, all laps are considered.
        :return: A list of all values for the given metric.
        """
        return self._get(name, task, operation_type, sample_type, lap, lambda doc: doc["value"])

    def get_raw(self, name, task=None, operation_type=None, sample_type=None, lap=None, mapper=lambda doc: doc):
        """
        Gets all raw records for the given metric name.

        :param name: The metric name to query.
        :param task The task name to query. Optional.
        :param operation_type The operation type to query. Optional.
        :param sample_type The sample type to query. Optional. By default, all samples are considered.
        :param lap The lap to query. Optional. By default, all laps are considered.
        :param mapper A record mapper. By default, the complete record is returned.
        :return: A list of all raw records for the given metric.
        """
        return self._get(name, task, operation_type, sample_type, lap, mapper)

    def get_unit(self, name, task=None):
        """
        Gets the unit for the given metric name.

        :param name: The metric name to query.
        :param task The task name to query. Optional.
        :return: The corresponding unit for the given metric name or None if no metric record is available.
        """
        # does not make too much sense to ask for a sample type here
        return self._first_or_none(self._get(name, task, None, None, None, lambda doc: doc["unit"]))

    def _get(self, name, task, operation_type, sample_type, lap, mapper):
        raise NotImplementedError("abstract method")

    def get_count(self, name, task=None, operation_type=None, sample_type=None, lap=None):
        """

        :param name: The metric name to query.
        :param task The task name to query. Optional.
        :param operation_type The operation type to query. Optional.
        :param sample_type The sample type to query. Optional. By default, all samples are considered.
        :param lap The lap to query. Optional. By default, all laps are considered.
        :return: The number of samples for this metric.
        """
        stats = self.get_stats(name, task, operation_type, sample_type, lap)
        if stats:
            return stats["count"]
        else:
            return 0

    def get_error_rate(self, task, operation_type=None, sample_type=None, lap=None):
        """
        Gets the error rate for a specific task.

        :param task The task name to query.
        :param operation_type The operation type to query. Optional.
        :param sample_type The sample type to query. Optional. By default, all samples are considered.
        :param lap The lap to query. Optional. By default, all laps are considered.
        :return: A float between 0.0 and 1.0 (inclusive) representing the error rate.
        """
        raise NotImplementedError("abstract method")

    def get_stats(self, name, task=None, operation_type=None, sample_type=None, lap=None):
        """
        Gets standard statistics for the given metric.

        :param name: The metric name to query.
        :param task The task name to query. Optional.
        :param operation_type The operation type to query. Optional.
        :param sample_type The sample type to query. Optional. By default, all samples are considered.
        :param lap The lap to query. Optional. By default, all laps are considered.
        :return: A metric_stats structure.
        """
        raise NotImplementedError("abstract method")

    def get_percentiles(self, name, task=None, operation_type=None, sample_type=None, lap=None, percentiles=None):
        """
        Retrieves percentile metrics for the given metric.

        :param name: The metric name to query.
        :param task The task name to query. Optional.
        :param operation_type The operation type to query. Optional.
        :param sample_type The sample type to query. Optional. By default, all samples are considered.
        :param lap The lap to query. Optional. By default, all laps are considered.
        :param percentiles: An optional list of percentiles to show. If None is provided, by default the 99th, 99.9th and 100th percentile
        are determined. Ensure that there are enough data points in the metrics store (e.g. it makes no sense to retrieve a 99.9999
        percentile when there are only 10 values).
        :return: An ordered dictionary of the determined percentile values in ascending order. Key is the percentile, value is the
        determined value at this percentile. If no percentiles could be determined None is returned.
        """
        raise NotImplementedError("abstract method")

    def get_median(self, name, task=None, operation_type=None, sample_type=None, lap=None):
        """
        Retrieves median value of the given metric.

        :param name: The metric name to query.
        :param task The task name to query. Optional.
        :param operation_type The operation type to query. Optional.
        :param sample_type The sample type to query. Optional. By default, all samples are considered.
        :param lap The lap to query. Optional. By default, all laps are considered.
        :return: The median value.
        """
        median = "50.0"
        percentiles = self.get_percentiles(name, task, operation_type, sample_type, lap, percentiles=[median])
        return percentiles[median] if percentiles else None


class EsMetricsStore(MetricsStore):
    """
    A metrics store backed by Elasticsearch.
    """
    METRICS_DOC_TYPE = "metrics"

    def __init__(self,
                 cfg,
                 client_factory_class=EsClientFactory,
                 index_template_provider_class=IndexTemplateProvider,
                 clock=time.Clock, meta_info=None, lap=None):
        """
        Creates a new metrics store.

        :param cfg: The config object. Mandatory.
        :param client_factory_class: This parameter is optional and needed for testing.
        :param index_template_provider_class: This parameter is optional and needed for testing.
        :param clock: This parameter is optional and needed for testing.
        :param meta_info: This parameter is optional and intended for creating a metrics store with a previously serialized meta-info.
        :param lap: This parameter is optional and intended for creating a metrics store with a previously serialized lap.
        """
        MetricsStore.__init__(self, cfg=cfg, clock=clock, meta_info=meta_info, lap=lap)
        self._index = None
        self._client = client_factory_class(cfg).create()
        self._index_template_provider = index_template_provider_class(cfg)
        self._docs = None

    def open(self, trial_id=None, trial_timestamp=None, track_name=None, challenge_name=None, car_name=None, ctx=None, create=False):
        self._docs = []
        MetricsStore.open(self, trial_id, trial_timestamp, track_name, challenge_name, car_name, ctx, create)
        self._index = self.index_name()
        # reduce a bit of noise in the metrics cluster log
        if create:
            # always update the mapping to the latest version
            self._client.put_template("rally-metrics", self._get_template())
            if not self._client.exists(index=self._index):
                self._client.create_index(index=self._index)
        # ensure we can search immediately after opening
        self._client.refresh(index=self._index)

    def index_name(self):
        ts = time.from_is8601(self._trial_timestamp)
        return "rally-metrics-%04d-%02d" % (ts.year, ts.month)

    def _get_template(self):
        return self._index_template_provider.metrics_template()

    def flush(self, refresh=True):
        if self._docs:
            self._client.bulk_index(index=self._index, doc_type=EsMetricsStore.METRICS_DOC_TYPE, items=self._docs)
            logger.info("Successfully added %d metrics documents for trial timestamp=[%s], track=[%s], challenge=[%s], car=[%s]." %
                        (len(self._docs), self._trial_timestamp, self._track, self._challenge, self._car))
        self._docs = []
        # ensure we can search immediately after flushing
        if refresh:
            self._client.refresh(index=self._index)

    def _add(self, doc):
        self._docs.append(doc)

    def _get(self, name, task, operation_type, sample_type, lap, mapper):
        query = {
            "query": self._query_by_name(name, task, operation_type, sample_type, lap)
        }
        logger.debug("Issuing get against index=[%s], doc_type=[%s], query=[%s]" % (self._index, EsMetricsStore.METRICS_DOC_TYPE, query))
        result = self._client.search(index=self._index, doc_type=EsMetricsStore.METRICS_DOC_TYPE, body=query)
        logger.debug("Metrics query produced [%s] results." % result["hits"]["total"])
        return [mapper(v["_source"]) for v in result["hits"]["hits"]]

    def get_error_rate(self, task, operation_type=None, sample_type=None, lap=None):
        query = {
            "query": self._query_by_name("service_time", task, operation_type, sample_type, lap),
            "size": 0,
            "aggs": {
                "error_rate": {
                    "terms": {
                        "field": "meta.success"
                    }
                }
            }
        }
        logger.debug("Issuing get_error_rate against index=[%s], doc_type=[%s], query=[%s]" %
                     (self._index, EsMetricsStore.METRICS_DOC_TYPE, query))
        result = self._client.search(index=self._index, doc_type=EsMetricsStore.METRICS_DOC_TYPE, body=query)
        buckets = result["aggregations"]["error_rate"]["buckets"]
        logger.debug("Query returned [%d] buckets." % len(buckets))
        count_success = 0
        count_errors = 0
        for bucket in buckets:
            k = bucket["key_as_string"]
            doc_count = int(bucket["doc_count"])
            logger.debug("Processing key [%s] with [%d] docs." % (k, doc_count))
            if k == "true":
                count_success = doc_count
            elif k == "false":
                count_errors = doc_count
            else:
                logger.warning("Unrecognized bucket key [%s] with [%d] docs." % (k, doc_count))

        if count_errors == 0:
            return 0.0
        elif count_success == 0:
            return 1.0
        else:
            return count_errors / (count_errors + count_success)

    def get_stats(self, name, task=None, operation_type=None, sample_type=None, lap=None):
        """
        Gets standard statistics for the given metric name.

        :return: A metric_stats structure. For details please refer to
        https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-stats-aggregation.html
        """
        query = {
            "query": self._query_by_name(name, task, operation_type, sample_type, lap),
            "size": 0,
            "aggs": {
                "metric_stats": {
                    "stats": {
                        "field": "value"
                    }
                }
            }
        }
        logger.debug("Issuing get_stats against index=[%s], doc_type=[%s], query=[%s]" %
                     (self._index, EsMetricsStore.METRICS_DOC_TYPE, query))
        result = self._client.search(index=self._index, doc_type=EsMetricsStore.METRICS_DOC_TYPE, body=query)
        return result["aggregations"]["metric_stats"]

    def get_percentiles(self, name, task=None, operation_type=None, sample_type=None, lap=None, percentiles=None):
        if percentiles is None:
            percentiles = [99, 99.9, 100]
        query = {
            "query": self._query_by_name(name, task, operation_type, sample_type, lap),
            "size": 0,
            "aggs": {
                "percentile_stats": {
                    "percentiles": {
                        "field": "value",
                        "percents": percentiles
                    }
                }
            }
        }
        logger.debug("Issuing get_percentiles against index=[%s], doc_type=[%s], query=[%s]" %
                     (self._index, EsMetricsStore.METRICS_DOC_TYPE, query))
        result = self._client.search(index=self._index, doc_type=EsMetricsStore.METRICS_DOC_TYPE, body=query)
        hits = result["hits"]["total"]
        logger.debug("get_percentiles produced %d hits" % hits)
        if hits > 0:
            raw = result["aggregations"]["percentile_stats"]["values"]
            return collections.OrderedDict(sorted(raw.items(), key=lambda t: float(t[0])))
        else:
            return None

    def _query_by_name(self, name, task, operation_type, sample_type, lap):
        q = {
            "bool": {
                "filter": [
                    {
                        "term": {
                            "trial-id": self._trial_id
                        }
                    },
                    {
                        "term": {
                            "name": name
                        }
                    }
                ]
            }
        }
        if task:
            q["bool"]["filter"].append({
                "term": {
                    "task": task
                }
            })
        if operation_type:
            q["bool"]["filter"].append({
                "term": {
                    "operation-type": operation_type.name
                }
            })
        if sample_type:
            q["bool"]["filter"].append({
                "term": {
                    "sample-type": sample_type.name.lower()
                }
            })
        if lap is not None:
            q["bool"]["filter"].append({
                "term": {
                    "lap": lap
                }
            })
        return q

    def to_externalizable(self, clear=False):
        # no need for an externalizable representation - stores everything directly
        return None

    def __str__(self):
        return "Elasticsearch metrics store"


class InMemoryMetricsStore(MetricsStore):
    def __init__(self, cfg, clock=time.Clock, meta_info=None, lap=None):
        """

        Creates a new metrics store.

        :param cfg: The config object. Mandatory.
        :param clock: This parameter is optional and needed for testing.
        :param meta_info: This parameter is optional and intended for creating a metrics store with a previously serialized meta-info.
        :param lap: This parameter is optional and intended for creating a metrics store with a previously serialized lap.
        """
        super().__init__(cfg=cfg, clock=clock, meta_info=meta_info, lap=lap)
        self.docs = []

    def __del__(self):
        """
        Deletes the metrics store instance.
        """
        del self.docs

    def _add(self, doc):
        self.docs.append(doc)

    def flush(self, refresh=True):
        pass

    def to_externalizable(self, clear=False):
        docs = self.docs
        if clear:
            self.docs = []
        compressed = zlib.compress(pickle.dumps(docs))
        logger.info("Compression changed size of metric store from [%d] bytes to [%d] bytes" %
                    (sys.getsizeof(docs, -1), sys.getsizeof(compressed, -1)))
        return compressed

    def get_percentiles(self, name, task=None, operation_type=None, sample_type=None, lap=None, percentiles=None):
        if percentiles is None:
            percentiles = [99, 99.9, 100]
        result = collections.OrderedDict()
        values = self.get(name, task, operation_type, sample_type, lap)
        if len(values) > 0:
            sorted_values = sorted(values)
            for percentile in percentiles:
                result[percentile] = self.percentile_value(sorted_values, percentile)
        return result

    @staticmethod
    def percentile_value(sorted_values, percentile):
        """
        Calculates a percentile value for a given list of values and a percentile.

        The implementation is based on http://onlinestatbook.com/2/introduction/percentiles.html

        :param sorted_values: A sorted list of raw values for which a percentile should be calculated.
        :param percentile: A percentile between [0, 100]
        :return: the corresponding percentile value.
        """
        rank = float(percentile) / 100.0 * (len(sorted_values) - 1)
        if rank == int(rank):
            return sorted_values[int(rank)]
        else:
            lr = math.floor(rank)
            lr_next = math.ceil(rank)
            fr = rank - lr
            lower_score = sorted_values[lr]
            higher_score = sorted_values[lr_next]
            return lower_score + (higher_score - lower_score) * fr

    def get_error_rate(self, task, operation_type=None, sample_type=None, lap=None):
        error = 0
        total_count = 0
        for doc in self.docs:
            # we can use any request metrics record (i.e. service time or latency)
            if doc["name"] == "service_time" and doc["task"] == task and \
                    (operation_type is None or doc["operation-type"] == operation_type.name) and \
                    (sample_type is None or doc["sample-type"] == sample_type.name.lower()) and \
                    (lap is None or doc["lap"] == lap):
                total_count += 1
                if doc["meta"]["success"] is False:
                    error += 1
        if total_count > 0:
            return error / total_count
        else:
            return 0.0

    def get_stats(self, name, task=None, operation_type=None, sample_type=SampleType.Normal, lap=None):
        values = self.get(name, task, operation_type, sample_type, lap)
        sorted_values = sorted(values)
        if len(sorted_values) > 0:
            return {
                "count": len(sorted_values),
                "min": sorted_values[0],
                "max": sorted_values[-1],
                "avg": statistics.mean(sorted_values),
                "sum": sum(sorted_values)
            }
        else:
            return None

    def _get(self, name, task, operation_type, sample_type, lap, mapper):
        return [mapper(doc)
                for doc in self.docs
                if doc["name"] == name and
                (task is None or doc["task"] == task) and
                (operation_type is None or doc["operation-type"] == operation_type.name) and
                (sample_type is None or doc["sample-type"] == sample_type.name.lower()) and
                (lap is None or doc["lap"] == lap)
                ]

    def __str__(self):
        return "in-memory metrics store"


def race_store(cfg):
    """
    Creates a proper race store based on the current configuration.
    :param cfg: Config object. Mandatory.
    :return: A race store implementation.
    """
    if cfg.opts("reporting", "datastore.type") == "elasticsearch":
        logger.info("Creating ES race store")
        return CompositeRaceStore(EsRaceStore(cfg), EsResultsStore(cfg), FileRaceStore(cfg))
    else:
        logger.info("Creating file race store")
        return FileRaceStore(cfg)


def list_races(cfg):
    def format_dict(d):
        if d:
            return ", ".join(["%s=%s" % (k, v) for k, v in d.items()])
        else:
            return None

    races = []
    for race in race_store(cfg).list():
        races.append([time.to_iso8601(race.trial_timestamp), race.track, format_dict(race.track_params), race.challenge, race.car_name,
                      format_dict(race.user_tags)])

    if len(races) > 0:
        console.println("\nRecent races:\n")
        console.println(tabulate.tabulate(races, headers=["Race Timestamp", "Track", "Track Parameters", "Challenge", "Car", "User Tags"]))
    else:
        console.println("")
        console.println("No recent races found.")


def create_race(cfg, track, challenge):
    car = cfg.opts("mechanic", "car.names")
    environment = cfg.opts("system", "env.name")
    trial_id = cfg.opts("system", "trial.id")
    trial_timestamp = cfg.opts("system", "time.start")
    total_laps = cfg.opts("race", "laps")
    user_tags = extract_user_tags_from_config(cfg)
    pipeline = cfg.opts("race", "pipeline")
    track_params = cfg.opts("track", "params")
    rally_version = version.version()

    return Race(rally_version, environment, trial_id, trial_timestamp, pipeline, user_tags, track, track_params, challenge, car, total_laps)


class Race:
    def __init__(self, rally_version, environment_name, trial_id, trial_timestamp, pipeline, user_tags, track, track_params, challenge, car,
                 total_laps, cluster=None, lap_results=None, results=None):
        if results is None:
            results = {}
        if lap_results is None:
            lap_results = []
        self.rally_version = rally_version
        self.environment_name = environment_name
        self.trial_id = trial_id
        self.trial_timestamp = trial_timestamp
        self.pipeline = pipeline
        self.user_tags = user_tags
        self.track = track
        self.track_params = track_params
        self.challenge = challenge
        self.car = car
        self.total_laps = total_laps
        # will be set later - contains hosts, revision, distribution_version, ...
        self.cluster = cluster
        self.lap_results = lap_results
        self.results = results

    @property
    def track_name(self):
        return str(self.track)

    @property
    def challenge_name(self):
        return str(self.challenge)

    @property
    def car_name(self):
        return "+".join(self.car) if isinstance(self.car, list) else self.car

    @property
    def revision(self):
        # minor simplification for reporter
        return self.cluster.revision

    def add_lap_results(self, results):
        self.lap_results.append(results)

    def add_final_results(self, results):
        self.results = results

    def results_of_lap_number(self, lap):
        # laps are 1 indexed but we store the data zero indexed
        return self.lap_results[lap - 1]

    def as_dict(self):
        """
        :return: A dict representation suitable for persisting this race instance as JSON.
        """
        d = {
            "rally-version": self.rally_version,
            "environment": self.environment_name,
            "trial-id": self.trial_id,
            "trial-timestamp": time.to_iso8601(self.trial_timestamp),
            "pipeline": self.pipeline,
            "user-tags": self.user_tags,
            "track": self.track_name,
            "challenge": self.challenge_name,
            "car": self.car,
            "total-laps": self.total_laps,
            "cluster": self.cluster.as_dict(),
            "results": self.results.as_dict()
        }
        if self.track_params:
            d["track-params"] = self.track_params
        return d

    def to_result_dicts(self):
        """
        :return: a list of dicts, suitable for persisting the results of this race in a format that is Kibana-friendly.
        """
        result_template = {
            "rally-version": self.rally_version,
            "environment": self.environment_name,
            "trial-id": self.trial_id,
            "trial-timestamp": time.to_iso8601(self.trial_timestamp),
            "distribution-version": self.cluster.distribution_version,
            "distribution-major-version": versions.major_version(self.cluster.distribution_version),
            "user-tags": self.user_tags,
            "track": self.track_name,
            "challenge": self.challenge_name,
            "car": self.car_name,
            "node-count": len(self.cluster.nodes),
            # allow to logically delete records, e.g. for UI purposes when we only want to show the latest result on graphs
            "active": True
        }
        plugins = set()
        for node in self.cluster.nodes:
            plugins.update(node.plugins)

        if plugins:
            result_template["plugins"] = list(plugins)

        if self.track_params:
            result_template["track-params"] = self.track_params

        all_results = []

        for item in self.results.as_flat_list():
            result = result_template.copy()
            result.update(item)
            all_results.append(result)

        return all_results

    @classmethod
    def from_dict(cls, d):
        # for backwards compatibility with Rally < 0.9.2
        if "user-tag" in d:
            user_tags = extract_user_tags_from_string(d["user-tag"])
        elif "user-tags" in d:
            user_tags = d["user-tags"]
        else:
            user_tags = {}

        # Don't restore a few properties like cluster because they (a) cannot be reconstructed easily without knowledge of other modules
        # and (b) it is not necessary for this use case.
        return Race(d["rally-version"], d["environment"], d["trial-id"], time.from_is8601(d["trial-timestamp"]), d["pipeline"], user_tags,
                    d["track"], d.get("track-params"), d["challenge"], d["car"], d["total-laps"], results=d["results"])


class RaceStore:
    def __init__(self, cfg):
        self.cfg = cfg
        self.environment_name = cfg.opts("system", "env.name")
        self.trial_timestamp = cfg.opts("system", "time.start")
        self.current_race = None

    def find_by_timestamp(self, timestamp):
        raise NotImplementedError("abstract method")

    def list(self):
        raise NotImplementedError("abstract method")

    def store_race(self, race):
        self._store(race.as_dict())

    def _store(self, doc):
        raise NotImplementedError("abstract method")

    def _max_results(self):
        return int(self.cfg.opts("system", "list.races.max_results"))


# Does not inherit from RaceStore as it is only a delegator with the same API.
class CompositeRaceStore:
    """
    Internal helper class to store races as file and to Elasticsearch in case users want Elasticsearch as a race store.
    
    It provides the same API as RaceStore. It delegates writes to all stores and all read operations only the Elasticsearch race store.
    """
    def __init__(self, es_store, es_results_store, file_store):
        self.es_store = es_store
        self.es_results_store = es_results_store
        self.file_store = file_store

    def find_by_timestamp(self, timestamp):
        return self.es_store.find_by_timestamp(timestamp)

    def store_race(self, race):
        self.file_store.store_race(race)
        self.es_store.store_race(race)
        self.es_results_store.store_results(race)

    def list(self):
        return self.es_store.list()


class FileRaceStore(RaceStore):
    def __init__(self, cfg):
        super().__init__(cfg)
        self.user_provided_start_timestamp = self.cfg.opts("system", "time.start.user_provided", mandatory=False, default_value=False)
        self.races_path = paths.races_root(self.cfg)
        self.race_path = paths.race_root(self.cfg)

    def _store(self, doc):
        import json
        io.ensure_dir(self.race_path)
        # if the user has overridden the effective start date we guarantee a unique file name but do not let them use them for tournaments.
        with open(self._output_file_name(doc), mode="wt", encoding="utf-8") as f:
            f.write(json.dumps(doc, indent=True, ensure_ascii=False))

    def _output_file_name(self, doc):
        if self.user_provided_start_timestamp:
            suffix = "_%s_%s_%s" % (doc["track"], doc["challenge"], "+".join(doc["car"]))
        else:
            suffix = ""
        return "%s/race%s.json" % (self.race_path, suffix)

    def list(self):
        import glob
        results = glob.glob("%s/*/race*.json" % self.races_path)
        all_races = self._to_races(results)
        return all_races[:self._max_results()]

    def find_by_timestamp(self, timestamp):
        race_file = "%s/race.json" % paths.race_root(cfg=self.cfg, start=time.from_is8601(timestamp))
        if io.exists(race_file):
            races = self._to_races([race_file])
            if races:
                return races[0]
        return None

    def _to_races(self, results):
        import json
        races = []
        for result in results:
            # noinspection PyBroadException
            try:
                with open(result, mode="rt", encoding="utf-8") as f:
                    races.append(Race.from_dict(json.loads(f.read())))
            except BaseException:
                logger.exception("Could not load race file [%s] (incompatible format?) Skipping..." % result)
        return sorted(races, key=lambda r: r.trial_timestamp, reverse=True)


class EsRaceStore(RaceStore):
    INDEX_PREFIX = "rally-races-"
    RACE_DOC_TYPE = "races"

    def __init__(self, cfg, client_factory_class=EsClientFactory, index_template_provider_class=IndexTemplateProvider):
        """
        Creates a new metrics store.

        :param cfg: The config object. Mandatory.
        :param client_factory_class: This parameter is optional and needed for testing.
        :param index_template_provider_class: This parameter is optional and needed for testing.
        """
        super().__init__(cfg)
        self.client = client_factory_class(cfg).create()
        self.index_template_provider = index_template_provider_class(cfg)

    def _store(self, doc):
        # always update the mapping to the latest version
        self.client.put_template("rally-races", self.index_template_provider.races_template())
        self.client.index(index=self.index_name(), doc_type=EsRaceStore.RACE_DOC_TYPE, item=doc)

    def index_name(self):
        return "%s%04d-%02d" % (EsRaceStore.INDEX_PREFIX, self.trial_timestamp.year, self.trial_timestamp.month)

    def list(self):
        filters = [{
            "term": {
                "environment": self.environment_name
            }
        }]

        query = {
            "query": {
                "bool": {
                    "filter": filters
                }
            },
            "size": self._max_results(),
            "sort": [
                {
                    "trial-timestamp": {
                        "order": "desc"
                    }
                }
            ]
        }
        result = self.client.search(index="%s*" % EsRaceStore.INDEX_PREFIX, doc_type=EsRaceStore.RACE_DOC_TYPE, body=query)
        if result["hits"]["total"] > 0:
            return [Race.from_dict(v["_source"]) for v in result["hits"]["hits"]]
        else:
            return []

    def find_by_timestamp(self, timestamp):
        filters = [{
                "term": {
                    "environment": self.environment_name
                }
            },
            {
                "term": {
                    "trial-timestamp": timestamp
                }
            }]

        query = {
            "query": {
                "bool": {
                    "filter": filters
                }
            }
        }
        result = self.client.search(index="%s*" % EsRaceStore.INDEX_PREFIX, doc_type=EsRaceStore.RACE_DOC_TYPE, body=query)
        if result["hits"]["total"] == 1:
            return Race.from_dict(result["hits"]["hits"][0]["_source"])
        else:
            return None


class EsResultsStore:
    """
    Stores the results of a race in a format that is better suited for reporting with Kibana.
    """
    INDEX_PREFIX = "rally-results-"
    RESULTS_DOC_TYPE = "results"

    def __init__(self, cfg, client_factory_class=EsClientFactory, index_template_provider_class=IndexTemplateProvider):
        """
        Creates a new results store.

        :param cfg: The config object. Mandatory.
        :param client_factory_class: This parameter is optional and needed for testing.
        :param index_template_provider_class: This parameter is optional and needed for testing.
        """
        self.cfg = cfg
        self.trial_timestamp = cfg.opts("system", "time.start")
        self.client = client_factory_class(cfg).create()
        self.index_template_provider = index_template_provider_class(cfg)

    def store_results(self, race):
        # always update the mapping to the latest version
        self.client.put_template("rally-results", self.index_template_provider.results_template())
        self.client.bulk_index(index=self.index_name(), doc_type=EsResultsStore.RESULTS_DOC_TYPE, items=race.to_result_dicts())

    def index_name(self):
        return "%s%04d-%02d" % (EsResultsStore.INDEX_PREFIX, self.trial_timestamp.year, self.trial_timestamp.month)
back to top