swh:1:snp:eb70f1f85391e4b077c211bec36af0061c4bf937
Raw File
Tip revision: 5e8bba58e8df4b321ada88b7959c4cd27c579418 authored by Antoine R. Dumont (@ardumont) on 20 October 2016, 13:53:19 UTC
storage: Improve index on content_ctags
Tip revision: 5e8bba5
test_archiver.py
# Copyright (C) 2015  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import tempfile
import unittest
import os
import time
import json

from nose.tools import istest
from nose.plugins.attrib import attr

from swh.core import hashutil
from swh.core.tests.db_testing import DbsTestFixture
from server_testing import ServerTestFixture

from swh.storage.archiver import ArchiverWithRetentionPolicyDirector
from swh.storage.archiver import ArchiverWithRetentionPolicyWorker
from swh.objstorage import get_objstorage
from swh.objstorage.exc import ObjNotFoundError
from swh.objstorage.api.server import app

TEST_DIR = os.path.dirname(os.path.abspath(__file__))
TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata')


@attr('db')
class TestArchiver(DbsTestFixture, ServerTestFixture,
                   unittest.TestCase):
    """ Test the objstorage archiver.
    """

    TEST_DB_NAMES = [
        'softwareheritage-archiver-test',
    ]
    TEST_DB_DUMPS = [
        os.path.join(TEST_DATA_DIR, 'dumps/swh-archiver.dump'),
    ]
    TEST_DB_DUMP_TYPES = [
        'pg_dump',
    ]

    def setUp(self):
        # Launch the backup server
        dest_root = tempfile.mkdtemp(prefix='remote')
        self.config = {
            'cls': 'pathslicing',
            'args': {
                'root': dest_root,
                'slicing': '0:2/2:4/4:6',
            }
        }
        self.app = app
        super().setUp()

        # Retrieve connection (depends on the order in TEST_DB_NAMES)
        self.conn = self.conns[0]          # archiver db's connection
        self.cursor = self.cursors[0]

        # Create source storage
        src_root = tempfile.mkdtemp()
        src_config = {
            'cls': 'pathslicing',
            'args': {
                'root': src_root,
                'slicing': '0:2/2:4/4:6'
            }
        }
        self.src_storage = get_objstorage(**src_config)

        # Create destination storage
        dest_config = {
            'cls': 'remote',
            'args': {
                'base_url': self.url()
            }
        }
        self.dest_storage = get_objstorage(**dest_config)

        # Keep mapped the id to the storages
        self.storages = {
            'uffizi': self.src_storage,
            'banco': self.dest_storage
        }

        # Override configurations
        src_archiver_conf = {'host': 'uffizi'}
        dest_archiver_conf = {'host': 'banco'}
        src_archiver_conf.update(src_config)
        dest_archiver_conf.update(dest_config)
        self.archiver_storages = [src_archiver_conf, dest_archiver_conf]
        self._override_director_config()
        self._override_worker_config()
        # Create the base archiver
        self.archiver = self._create_director()

    def tearDown(self):
        self.empty_tables()
        super().tearDown()

    def empty_tables(self):
        # Remove all content
        self.cursor.execute('DELETE FROM content_archive')
        self.conn.commit()

    def _override_director_config(self, retention_policy=2):
        """ Override the default config of the Archiver director
        to allow the tests to use the *-test db instead of the default one as
        there is no configuration file for now.
        """
        ArchiverWithRetentionPolicyDirector.parse_config_file = lambda obj, additional_configs: {  # noqa
            'dbconn': self.conn,
            'batch_max_size': 5000,
            'archival_max_age': 3600,
            'retention_policy': retention_policy,
            'asynchronous': False,
        }

    def _override_worker_config(self):
        """ Override the default config of the Archiver worker
        to allow the tests to use the *-test db instead of the default one as
        there is no configuration file for now.
        """
        ArchiverWithRetentionPolicyWorker.parse_config_file = lambda obj, additional_configs: {  # noqa
            'retention_policy': 2,
            'archival_max_age': 3600,
            'dbconn': self.conn,
            'storages': self.archiver_storages,
            'source': 'uffizi',
        }

    def _create_director(self):
        return ArchiverWithRetentionPolicyDirector()

    def _create_worker(self, batch={}):
        return ArchiverWithRetentionPolicyWorker(batch)

    def _add_content(self, storage_name, content_data):
        """ Add really a content to the given objstorage

        This put an empty status for the added content.

        Args:
            storage_name: the concerned storage
            content_data: the data to insert
            with_row_insert: to insert a row entry in the db or not

        """
        # Add the content to the storage
        obj_id = self.storages[storage_name].add(content_data)
        db_obj_id = r'\x' + hashutil.hash_to_hex(obj_id)
        self.cursor.execute(""" INSERT INTO content_archive
                                VALUES('%s', '{}')
                            """ % (db_obj_id))
        return obj_id

    def _update_status(self, obj_id, storage_name, status, date=None):
        """ Update the db status for the given id/storage_name.

        This does not create the content in the storage.
        """
        db_obj_id = r'\x' + hashutil.hash_to_hex(obj_id)
        self.archiver.archiver_storage.content_archive_update(
            db_obj_id, storage_name, status
        )

    def _add_dated_content(self, obj_id, copies={}):
        """ Fully erase the previous copies field for the given content id

        This does not alter the contents into the objstorages.
        """
        db_obj_id = r'\x' + hashutil.hash_to_hex(obj_id)
        self.cursor.execute(""" UPDATE TABLE content_archive
                                SET copies='%s'
                                WHERE content_id='%s'
                            """ % (json.dumps(copies), db_obj_id))

    # Integration test
    @istest
    def archive_missing_content(self):
        """ Run archiver on a missing content should archive it.
        """
        obj_data = b'archive_missing_content'
        obj_id = self._add_content('uffizi', obj_data)
        self._update_status(obj_id, 'uffizi', 'present')
        # Content is missing on banco (entry not present in the db)
        try:
            self.dest_storage.get(obj_id)
        except ObjNotFoundError:
            pass
        else:
            self.fail('Content should not be present before archival')
        self.archiver.run()
        # now the content should be present on remote objstorage
        remote_data = self.dest_storage.get(obj_id)
        self.assertEquals(obj_data, remote_data)

    @istest
    def archive_present_content(self):
        """ A content that is not 'missing' shouldn't be archived.
        """
        obj_id = self._add_content('uffizi', b'archive_present_content')
        self._update_status(obj_id, 'uffizi', 'present')
        self._update_status(obj_id, 'banco', 'present')
        # After the run, the content should NOT be in the archive.
        # As the archiver believe it was already in.
        self.archiver.run()
        with self.assertRaises(ObjNotFoundError):
            self.dest_storage.get(obj_id)

    @istest
    def archive_already_enough(self):
        """ A content missing with enough copies shouldn't be archived.
        """
        obj_id = self._add_content('uffizi', b'archive_alread_enough')
        self._update_status(obj_id, 'uffizi', 'present')
        self._override_director_config(retention_policy=1)
        director = self._create_director()
        # Obj is present in only one archive but only one copy is required.
        director.run()
        with self.assertRaises(ObjNotFoundError):
            self.dest_storage.get(obj_id)

    # Unit tests for archive worker

    def archival_elapsed(self, mtime):
        return self._create_worker()._is_archival_delay_elapsed(mtime)

    @istest
    def vstatus_ongoing_remaining(self):
        self.assertFalse(self.archival_elapsed(time.time()))

    @istest
    def vstatus_ongoing_elapsed(self):
        past_time = (
            time.time() - self._create_worker().archival_max_age
        )
        self.assertTrue(self.archival_elapsed(past_time))

    def _status(self, status, mtime=None):
        """ Get a dict that match the copies structure
        """
        return {'status': status, 'mtime': mtime or time.time()}

    @istest
    def need_archival_missing(self):
        """ A content should need archival when it is missing.
        """
        status_copies = {'present': ['uffizi'], 'missing': ['banco']}
        worker = self._create_worker()
        self.assertEqual(worker.need_archival(status_copies),
                         True)

    @istest
    def need_archival_present(self):
        """ A content present everywhere shouldn't need archival
        """
        status_copies = {'present': ['uffizi', 'banco']}
        worker = self._create_worker()
        self.assertEqual(worker.need_archival(status_copies),
                         False)

    def _compute_copies_status(self, status):
        """ A content with a given status should be detected correctly
        """
        obj_id = self._add_content(
            'banco', b'compute_copies_' + bytes(status, 'utf8'))
        self._update_status(obj_id, 'banco', status)
        worker = self._create_worker()
        self.assertIn('banco', worker.compute_copies(
            set(worker.objstorages), obj_id)[status])

    @istest
    def compute_copies_present(self):
        """ A present content should be detected with correct status
        """
        self._compute_copies_status('present')

    @istest
    def compute_copies_missing(self):
        """ A missing content should be detected with correct status
        """
        self._compute_copies_status('missing')

    def _get_backups(self, present, missing):
        """ Return a list of the pair src/dest from the present and missing
        """
        worker = self._create_worker()
        return list(worker.choose_backup_servers(present, missing))

    @istest
    def choose_backup_servers(self):
        self.assertEqual(len(self._get_backups(['uffizi', 'banco'], [])), 0)
        self.assertEqual(len(self._get_backups(['uffizi'], ['banco'])), 1)
        # Even with more possible destinations, do not take more than the
        # retention_policy require
        self.assertEqual(
            len(self._get_backups(['uffizi'], ['banco', 's3'])),
            1
        )

    # This cannot be tested with ArchiverWithRetentionPolicyDirector
    # (it reads from archiver db)
    # @istest
    # def archive_missing_content__without_row_entry_in_archive_db(self):
    #     """ Run archiver on a missing content should archive it.
    #     """
    #     obj_data = b'archive_missing_content_without_row_entry_in_archive_db'
    #     obj_id = self._add_content('uffizi', obj_data)
    #     # One entry in archiver db but no status about its whereabouts
    #     # Content is actually missing on banco but present on uffizi
    #     try:
    #         self.dest_storage.get(obj_id)
    #     except ObjNotFoundError:
    #         pass
    #     else:
    #         self.fail('Content should not be present before archival')
    #     self.archiver.run()
    #     # now the content should be present on remote objstorage
    #     remote_data = self.dest_storage.get(obj_id)
    #     self.assertEquals(obj_data, remote_data)
back to top