swh:1:snp:eb70f1f85391e4b077c211bec36af0061c4bf937
Raw File
Tip revision: 9676aa929976899276f30fa43e5da94faf2ceedc authored by Nicolas Dandrimont on 06 January 2016, 14:12:32 UTC
sql/upgrades/034: properly order function modifications
Tip revision: 9676aa9
test_objstorage.py
# Copyright (C) 2015  The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information

import gzip
import os
import shutil
import stat
import tempfile
import unittest

from io import BytesIO
from nose.tools import istest

from swh.core import hashutil
from swh.storage import objstorage


class TestObjStorage(unittest.TestCase):

    def setUp(self):
        self.content = b'42\n'

        # sha1
        self.hex_obj_id = '34973274ccef6ab4dfaaf86599792fa9c3fe4689'

        # sha1_git
        # self.hex_obj_id = 'd81cc0710eb6cf9efd5b920a8453e1e07157b6cd'

        self.obj_id = hashutil.hex_to_hash(self.hex_obj_id)
        self.obj_steps = [self.hex_obj_id[0:2], self.hex_obj_id[2:4],
                          self.hex_obj_id[4:6]]
        self.obj_relpath = os.path.join(*(self.obj_steps + [self.hex_obj_id]))

        self.tmpdir = tempfile.mkdtemp()
        self.obj_dirs = [
            os.path.join(self.tmpdir, *self.obj_steps[:i])
            for i in range(1, len(self.obj_steps))
        ]
        self.obj_path = os.path.join(self.tmpdir, self.obj_relpath)

        self.storage = objstorage.ObjStorage(root=self.tmpdir, depth=3)

        self.missing_obj_id = hashutil.hex_to_hash(
            'f1d2d2f924e986ac86fdf7b36c94bcdf32beec15')

    def tearDown(self):
        shutil.rmtree(self.tmpdir)

    def assertGzipContains(self, gzip_path, content):  # noqa
        self.assertEqual(gzip.open(gzip_path, 'rb').read(), content)

    @istest
    def add_bytes_w_id(self):
        r = self.storage.add_bytes(self.content, obj_id=self.obj_id)
        self.assertEqual(r, self.obj_id)
        self.assertGzipContains(self.obj_path, self.content)

    @istest
    def add_bytes_wo_id(self):
        r = self.storage.add_bytes(self.content)
        self.assertEqual(r, self.obj_id)
        self.assertGzipContains(self.obj_path, self.content)

    @istest
    def add_file_w_id(self):
        r = self.storage.add_file(BytesIO(self.content),
                                  len(self.content),
                                  obj_id=self.obj_id)
        self.assertEqual(r, self.obj_id)
        self.assertGzipContains(self.obj_path, self.content)

    @istest
    def add_file_wo_id(self):
        r = self.storage.add_file(BytesIO(self.content), len(self.content))
        self.assertEqual(r, self.obj_id)
        self.assertGzipContains(self.obj_path, self.content)

    @istest
    def contains(self):
        self.storage.add_bytes(self.content, obj_id=self.obj_id)
        self.assertIn(self.obj_id, self.storage)
        self.assertNotIn(self.missing_obj_id, self.storage)

    @istest
    def check_ok(self):
        self.storage.add_bytes(self.content, obj_id=self.obj_id)
        try:
            self.storage.check(self.obj_id)
        except:
            self.fail('integrity check failed')

    @istest
    def check_missing(self):
        with self.assertRaises(objstorage.Error):
            self.storage.check(self.obj_id)

    @istest
    def check_file_and_dirs_mode(self):
        old_umask = os.umask(0)
        self.storage.add_bytes(self.content, obj_id=self.obj_id)
        for dir in self.obj_dirs:
            stat_dir = os.stat(dir)
            self.assertEquals(stat.S_IMODE(stat_dir.st_mode),
                              objstorage.DIR_MODE)
        stat_res = os.stat(self.obj_path)
        self.assertEquals(stat.S_IMODE(stat_res.st_mode), objstorage.FILE_MODE)
        os.umask(old_umask)

    @istest
    def check_not_gzip(self):
        self.storage.add_bytes(self.content, obj_id=self.obj_id)
        with open(self.obj_path, 'ab') as f:  # add trailing garbage
            f.write(b'garbage')
        with self.assertRaises(objstorage.Error):
            self.storage.check(self.obj_id)

    @istest
    def check_id_mismatch(self):
        self.storage.add_bytes(self.content, obj_id=self.obj_id)
        with gzip.open(self.obj_path, 'wb') as f:  # replace gzipped content
            f.write(b'unexpected content')
        with self.assertRaises(objstorage.Error):
            self.storage.check(self.obj_id)

    @istest
    def get_bytes(self):
        self.storage.add_bytes(self.content, obj_id=self.obj_id)
        self.assertEqual(self.storage.get_bytes(self.obj_id),
                         self.content)

    @istest
    def get_file_path(self):
        self.storage.add_bytes(self.content, obj_id=self.obj_id)
        path = self.storage._get_file_path(self.obj_id)
        self.assertEqual(os.path.basename(path), self.hex_obj_id)
        self.assertEqual(gzip.open(path, 'rb').read(), self.content)

    @istest
    def get_missing(self):
        with self.assertRaises(objstorage.Error):
            with self.storage.get_file_obj(self.missing_obj_id) as f:
                f.read()

    @istest
    def iter(self):
        self.assertEqual(list(iter(self.storage)), [])
        self.storage.add_bytes(self.content, obj_id=self.obj_id)
        self.assertEqual(list(iter(self.storage)), [self.obj_id])

    @istest
    def len(self):
        self.assertEqual(len(self.storage), 0)
        self.storage.add_bytes(self.content, obj_id=self.obj_id)
        self.assertEqual(len(self.storage), 1)
back to top