swh:1:snp:eb70f1f85391e4b077c211bec36af0061c4bf937
Tip revision: 9676aa929976899276f30fa43e5da94faf2ceedc authored by Nicolas Dandrimont on 06 January 2016, 14:12:32 UTC
sql/upgrades/034: properly order function modifications
sql/upgrades/034: properly order function modifications
Tip revision: 9676aa9
test_storage.py
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import os
import psycopg2
import shutil
import tempfile
import unittest
from unittest.mock import patch
from nose.tools import istest
from nose.plugins.attrib import attr
from swh.core.tests.db_testing import DbTestFixture
from swh.core.hashutil import hex_to_hash
from swh.storage import Storage
TEST_DIR = os.path.dirname(os.path.abspath(__file__))
TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata')
@attr('db')
class AbstractTestStorage(DbTestFixture):
"""Base class for Storage testing.
This class is used as-is to test local storage (see TestStorage
below) and remote storage (see TestRemoteStorage in
test_remote_storage.py.
We need to have the two classes inherit from this base class
separately to avoid nosetests running the tests from the base
class twice.
"""
TEST_DB_DUMP = os.path.join(TEST_DATA_DIR, 'dumps/swh.dump')
def setUp(self):
super().setUp()
self.maxDiff = None
self.objroot = tempfile.mkdtemp()
self.storage = Storage(self.conn, self.objroot)
self.cont = {
'data': b'42\n',
'length': 3,
'sha1': hex_to_hash(
'34973274ccef6ab4dfaaf86599792fa9c3fe4689'),
'sha1_git': hex_to_hash(
'd81cc0710eb6cf9efd5b920a8453e1e07157b6cd'),
'sha256': hex_to_hash(
'673650f936cb3b0a2f93ce09d81be107'
'48b1b203c19e8176b4eefc1964a0cf3a'),
}
self.cont2 = {
'data': b'4242\n',
'length': 5,
'sha1': hex_to_hash(
'61c2b3a30496d329e21af70dd2d7e097046d07b7'),
'sha1_git': hex_to_hash(
'36fade77193cb6d2bd826161a0979d64c28ab4fa'),
'sha256': hex_to_hash(
'859f0b154fdb2d630f45e1ecae4a8629'
'15435e663248bb8461d914696fc047cd'),
}
self.missing_cont = {
'data': b'missing\n',
'length': 8,
'sha1': hex_to_hash(
'f9c24e2abb82063a3ba2c44efd2d3c797f28ac90'),
'sha1_git': hex_to_hash(
'33e45d56f88993aae6a0198013efa80716fd8919'),
'sha256': hex_to_hash(
'6bbd052ab054ef222c1c87be60cd191a'
'ddedd24cc882d1f5f7f7be61dc61bb3a'),
}
self.skipped_cont = {
'length': 1024 * 1024 * 200,
'sha1_git': hex_to_hash(
'33e45d56f88993aae6a0198013efa80716fd8920'),
'reason': 'Content too long',
'status': 'absent',
}
self.skipped_cont2 = {
'length': 1024 * 1024 * 300,
'sha1_git': hex_to_hash(
'33e45d56f88993aae6a0198013efa80716fd8921'),
'reason': 'Content too long',
'status': 'absent',
}
self.dir = {
'id': b'4\x013\x422\x531\x000\xf51\xe62\xa73\xff7\xc3\xa90',
'entries': [
{
'name': b'foo',
'type': 'file',
'target': self.cont['sha1_git'],
'perms': 0o644,
},
{
'name': b'bar\xc3',
'type': 'dir',
'target': b'12345678901234567890',
'perms': 0o2000,
},
],
}
self.dir2 = {
'id': b'4\x013\x422\x531\x000\xf51\xe62\xa73\xff7\xc3\xa95',
'entries': [
{
'name': b'oof',
'type': 'file',
'target': self.cont2['sha1_git'],
'perms': 0o644,
}
],
}
self.minus_offset = datetime.timezone(datetime.timedelta(minutes=-120))
self.plus_offset = datetime.timezone(datetime.timedelta(minutes=120))
self.revision = {
'id': b'56789012345678901234',
'message': b'hello',
'author': {
'name': b'Nicolas Dandrimont',
'email': b'nicolas@example.com',
},
'date': datetime.datetime(2015, 1, 1, 22, 0, 0,
tzinfo=self.plus_offset),
'committer': {
'name': b'St\xc3fano Zacchiroli',
'email': b'stefano@example.com',
},
'committer_date': datetime.datetime(2015, 1, 2, 22, 0, 0,
tzinfo=self.minus_offset),
'parents': [b'01234567890123456789', b'23434512345123456789'],
'type': 'git',
'directory': self.dir['id'],
'metadata': {'checksums': {'sha1': 'tarball-sha1',
'sha256': 'tarball-sha256'},
'signed-off-by': 'some-dude'},
'synthetic': True
}
self.revision2 = {
'id': b'87659012345678904321',
'message': b'hello again',
'author': {
'name': b'Roberto Dicosmo',
'email': b'roberto@example.com',
},
'date': datetime.datetime(2015, 1, 1, 22, 0, 0,
tzinfo=self.plus_offset),
'committer': {
'name': b'tony',
'email': b'ar@dumont.fr',
},
'committer_date': datetime.datetime(2015, 1, 2, 22, 0, 0,
tzinfo=self.minus_offset),
'parents': [b'01234567890123456789'],
'type': 'git',
'directory': self.dir2['id'],
'metadata': None,
'synthetic': False
}
self.revision3 = {
'id': hex_to_hash('7026b7c1a2af56521e951c01ed20f255fa054238'),
'message': b'a simple revision with no parents this time',
'author': {
'name': b'Roberto Dicosmo',
'email': b'roberto@example.com',
},
'date': datetime.datetime(2015, 10, 1, 22, 0, 0,
tzinfo=self.plus_offset),
'committer': {
'name': b'tony',
'email': b'ar@dumont.fr',
},
'committer_date': datetime.datetime(2015, 10, 2, 22, 0, 0,
tzinfo=self.minus_offset),
'parents': [],
'type': 'git',
'directory': self.dir2['id'],
'metadata': None,
'synthetic': True
}
self.revision4 = {
'id': hex_to_hash('368a48fe15b7db2383775f97c6b247011b3f14f4'),
'message': b'parent of self.revision2',
'author': {
'name': b'me',
'email': b'me@soft.heri',
},
'date': datetime.datetime(2015, 1, 1, 20, 0, 0,
tzinfo=self.plus_offset),
'committer': {
'name': b'committer-dude',
'email': b'committer@dude.com',
},
'committer_date': datetime.datetime(2015, 1, 2, 20, 0, 0,
tzinfo=self.minus_offset),
'parents': [self.revision3['id']],
'type': 'git',
'directory': self.dir['id'],
'metadata': None,
'synthetic': False
}
self.origin = {
'url': 'file:///dev/null',
'type': 'git',
}
self.origin2 = {
'url': 'file:///dev/zero',
'type': 'git',
}
self.occurrence = {
'branch': 'master',
'revision': b'67890123456789012345',
'authority': '5f4d4c51-498a-4e28-88b3-b3e4e8396cba',
'validity': datetime.datetime(2015, 1, 1, 23, 0, 0,
tzinfo=datetime.timezone.utc),
}
self.occurrence2 = {
'branch': 'master',
'revision': self.revision2['id'],
'authority': '5f4d4c51-498a-4e28-88b3-b3e4e8396cba',
'validity': datetime.datetime(2015, 1, 1, 23, 0, 0,
tzinfo=datetime.timezone.utc),
}
self.release = {
'id': b'87659012345678901234',
'name': 'v0.0.1',
'date': datetime.datetime(2015, 1, 1, 22, 0, 0,
tzinfo=self.plus_offset),
'author': {
'name': b'olasd',
'email': b'nic@olasd.fr',
},
'target': b'43210987654321098765',
'target_type': 'revision',
'message': b'synthetic release',
'synthetic': True,
}
self.release2 = {
'id': b'56789012348765901234',
'name': 'v0.0.2',
'date': datetime.datetime(2015, 1, 2, 23, 0, 0,
tzinfo=self.minus_offset),
'author': {
'name': b'tony',
'email': b'ar@dumont.fr',
},
'target': b'432109\xa9765432\xc309\x00765',
'target_type': 'revision',
'message': b'v0.0.2\nMisc performance improvments + bug fixes',
'synthetic': False
}
self.fetch_history_date = datetime.datetime(
2015, 1, 2, 21, 0, 0,
tzinfo=datetime.timezone.utc)
self.fetch_history_end = datetime.datetime(
2015, 1, 2, 23, 0, 0,
tzinfo=datetime.timezone.utc)
self.fetch_history_duration = (self.fetch_history_end -
self.fetch_history_date)
self.fetch_history_data = {
'status': True,
'result': {'foo': 'bar'},
'stdout': 'blabla',
'stderr': 'blablabla',
}
self.entity1 = {
'uuid': 'f96a7ec1-0058-4920-90cc-7327e4b5a4bf',
# GitHub users
'parent': 'ad6df473-c1d2-4f40-bc58-2b091d4a750e',
'name': 'github:user:olasd',
'type': 'person',
'description': 'Nicolas Dandrimont',
'homepage': 'http://example.com',
'active': True,
'generated': True,
# swh.lister.github
'lister': '34bd6b1b-463f-43e5-a697-785107f598e4',
'lister_metadata': {
'id': 12877,
'type': 'user',
'last_activity': '2015-11-03',
},
'doap': None,
'validity': [
datetime.datetime(2015, 11, 3, 11, 0, 0,
tzinfo=datetime.timezone.utc),
]
}
self.entity1_query = {
'lister': '34bd6b1b-463f-43e5-a697-785107f598e4',
'lister_metadata': {
'id': 12877,
'type': 'user',
},
}
self.entity2 = {
'uuid': '3903d075-32d6-46d4-9e29-0aef3612c4eb',
# GitHub users
'parent': 'ad6df473-c1d2-4f40-bc58-2b091d4a750e',
'name': 'github:user:zacchiro',
'type': 'person',
'description': 'Stefano Zacchiroli',
'homepage': 'http://example.com',
'active': True,
'generated': True,
# swh.lister.github
'lister': '34bd6b1b-463f-43e5-a697-785107f598e4',
'lister_metadata': {
'id': 216766,
'type': 'user',
'last_activity': '2015-11-03',
},
'doap': None,
'validity': [
datetime.datetime(2015, 11, 3, 11, 0, 0,
tzinfo=datetime.timezone.utc),
]
}
self.entity2_query = {
'lister': '34bd6b1b-463f-43e5-a697-785107f598e4',
'lister_metadata': {
'id': 216766,
'type': 'user',
},
}
def tearDown(self):
shutil.rmtree(self.objroot)
self.cursor.execute("""SELECT table_name FROM information_schema.tables
WHERE table_schema = %s""", ('public',))
tables = set(table for (table,) in self.cursor.fetchall())
tables -= {'dbversion', 'entity', 'entity_history', 'listable_entity'}
for table in tables:
self.cursor.execute('truncate table %s cascade' % table)
self.cursor.execute('delete from entity where generated=true')
self.cursor.execute('delete from entity_history where generated=true')
self.conn.commit()
super().tearDown()
@istest
def content_add(self):
cont = self.cont
self.storage.content_add([cont])
if hasattr(self.storage, 'objstorage'):
self.assertIn(cont['sha1'], self.storage.objstorage)
self.cursor.execute('SELECT sha1, sha1_git, sha256, length, status'
' FROM content WHERE sha1 = %s',
(cont['sha1'],))
datum = self.cursor.fetchone()
self.assertEqual(
(datum[0].tobytes(), datum[1].tobytes(), datum[2].tobytes(),
datum[3], datum[4]),
(cont['sha1'], cont['sha1_git'], cont['sha256'],
cont['length'], 'visible'))
@istest
def content_add_collision(self):
cont1 = self.cont
# create (corrupted) content with same sha1{,_git} but != sha256
cont1b = cont1.copy()
sha256_array = bytearray(cont1b['sha256'])
sha256_array[0] += 1
cont1b['sha256'] = bytes(sha256_array)
with self.assertRaises(psycopg2.IntegrityError):
self.storage.content_add([cont1, cont1b])
@istest
def skipped_content_add(self):
cont = self.skipped_cont
cont2 = self.skipped_cont2
self.storage.content_add([cont])
self.storage.content_add([cont2])
self.cursor.execute('SELECT sha1, sha1_git, sha256, length, status,'
'reason FROM skipped_content ORDER BY sha1_git')
datum = self.cursor.fetchone()
self.assertEqual(
(datum[0], datum[1].tobytes(), datum[2],
datum[3], datum[4], datum[5]),
(None, cont['sha1_git'], None,
cont['length'], 'absent', 'Content too long'))
datum2 = self.cursor.fetchone()
self.assertEqual(
(datum2[0], datum2[1].tobytes(), datum2[2],
datum2[3], datum2[4], datum2[5]),
(None, cont2['sha1_git'], None,
cont2['length'], 'absent', 'Content too long'))
@istest
def content_missing(self):
cont2 = self.cont2
missing_cont = self.missing_cont
self.storage.content_add([cont2])
gen = self.storage.content_missing([cont2, missing_cont])
self.assertEqual(list(gen), [missing_cont['sha1']])
@istest
def directory_add(self):
init_missing = list(self.storage.directory_missing([self.dir['id']]))
self.assertEqual([self.dir['id']], init_missing)
self.storage.directory_add([self.dir])
stored_data = list(self.storage.directory_get(self.dir['id']))
data_to_store = [{
'dir_id': self.dir['id'],
'type': ent['type'],
'target': ent['target'],
'name': ent['name'],
'perms': ent['perms'],
'status': None,
'sha1': None,
'sha1_git': None,
'sha256': None,
}
for ent in sorted(self.dir['entries'], key=lambda ent: ent['name'])
]
self.assertEqual(data_to_store, stored_data)
after_missing = list(self.storage.directory_missing([self.dir['id']]))
self.assertEqual([], after_missing)
@istest
def revision_add(self):
init_missing = self.storage.revision_missing([self.revision['id']])
self.assertEqual([self.revision['id']], list(init_missing))
self.storage.revision_add([self.revision])
end_missing = self.storage.revision_missing([self.revision['id']])
self.assertEqual([], list(end_missing))
@istest
def revision_log(self):
# given
# self.revision4 -is-child-of-> self.revision3
self.storage.revision_add([self.revision3,
self.revision4])
# when
actual_result = self.storage.revision_log(self.revision4['id'])
res = list(actual_result)
self.assertEqual(len(res), 2) # revision4 is a child of revision3
self.assertEquals(res[0], self.revision4)
self.assertEquals(res[1], self.revision3)
@istest
def revision_log_with_limit(self):
# given
# self.revision4 -is-child-of-> self.revision3
self.storage.revision_add([self.revision3,
self.revision4])
actual_result = self.storage.revision_log(self.revision4['id'], 1)
res = list(actual_result)
self.assertEqual(len(res), 1)
self.assertEquals(res[0], self.revision4)
@istest
def revision_get(self):
self.storage.revision_add([self.revision])
get = list(self.storage.revision_get([self.revision['id'],
self.revision2['id']]))
self.assertEqual(len(get), 2)
self.assertEqual(get[0], self.revision)
self.assertEqual(get[0]['date'].utcoffset(),
self.revision['date'].utcoffset())
self.assertEqual(get[0]['committer_date'].utcoffset(),
self.revision['committer_date'].utcoffset())
self.assertEqual(get[1], None)
@istest
def revision_get_no_parents(self):
self.storage.revision_add([self.revision3])
get = list(self.storage.revision_get([self.revision3['id']]))
self.assertEqual(len(get), 1)
self.assertEqual(get[0]['parents'], []) # no parents on this one
@istest
def release_add(self):
init_missing = self.storage.release_missing([self.release['id'],
self.release2['id']])
self.assertEqual([self.release['id'], self.release2['id']],
list(init_missing))
self.storage.release_add([self.release, self.release2])
end_missing = self.storage.release_missing([self.release['id'],
self.release2['id']])
self.assertEqual([], list(end_missing))
@istest
def release_get(self):
# given
self.storage.release_add([self.release, self.release2])
# when
actual_releases = self.storage.release_get([self.release['id'],
self.release2['id']])
actual_releases = list(actual_releases)
# then
self.assertEquals([self.release, self.release2],
[actual_releases[0], actual_releases[1]])
@istest
def origin_add_one(self):
origin0 = self.storage.origin_get(self.origin)
self.assertIsNone(origin0)
id = self.storage.origin_add_one(self.origin)
actual_origin = self.storage.origin_get({'url': self.origin['url'],
'type': self.origin['type']})
self.assertEqual(actual_origin['id'], id)
id2 = self.storage.origin_add_one(self.origin)
self.assertEqual(id, id2)
@istest
def origin_get(self):
self.assertIsNone(self.storage.origin_get(self.origin))
id = self.storage.origin_add_one(self.origin)
# lookup per type and url (returns id)
actual_origin0 = self.storage.origin_get({'url': self.origin['url'],
'type': self.origin['type']})
self.assertEqual(actual_origin0['id'], id)
# lookup per id (returns dict)
actual_origin1 = self.storage.origin_get({'id': id})
self.assertEqual(actual_origin1, {'id': id,
'type': self.origin['type'],
'url': self.origin['url'],
'lister': None,
'project': None})
@istest
def occurrence_add(self):
origin_id = self.storage.origin_add_one(self.origin2)
revision = self.revision.copy()
revision['id'] = self.occurrence['revision']
self.storage.revision_add([revision])
occur = self.occurrence
occur['origin'] = origin_id
self.storage.occurrence_add([occur])
self.storage.occurrence_add([occur])
test_query = '''select origin, branch, revision, authority, validity
from occurrence_history
order by origin, validity'''
self.cursor.execute(test_query)
ret = self.cursor.fetchall()
self.assertEqual(len(ret), 1)
self.assertEqual((ret[0][0], ret[0][1], ret[0][2].tobytes(),
ret[0][3]), (occur['origin'],
occur['branch'], occur['revision'],
occur['authority']))
self.assertEqual(ret[0][4].lower, occur['validity'])
self.assertEqual(ret[0][4].lower_inc, True)
self.assertEqual(ret[0][4].upper, datetime.datetime.max)
orig_validity = occur['validity']
occur['validity'] += datetime.timedelta(hours=10)
self.storage.occurrence_add([occur])
self.cursor.execute(test_query)
ret = self.cursor.fetchall()
self.assertEqual(len(ret), 2)
self.assertEqual(ret[0][4].lower, orig_validity)
self.assertEqual(ret[0][4].lower_inc, True)
self.assertEqual(ret[0][4].upper, occur['validity'])
self.assertEqual(ret[0][4].upper_inc, False)
self.assertEqual(ret[1][4].lower, occur['validity'])
self.assertEqual(ret[1][4].lower_inc, True)
self.assertEqual(ret[1][4].upper, datetime.datetime.max)
@istest
def content_find_occurrence_with_present_content(self):
# 1. with something to find
# given
self.storage.content_add([self.cont2])
self.storage.directory_add([self.dir2]) # point to self.cont
self.storage.revision_add([self.revision2]) # points to self.dir
origin_id = self.storage.origin_add_one(self.origin2)
occurrence = self.occurrence2
occurrence.update({'origin': origin_id})
self.storage.occurrence_add([occurrence])
# when
occ = self.storage.content_find_occurrence(
{'sha1': self.cont2['sha1']})
# then
self.assertEquals(occ['origin_type'], self.origin2['type'])
self.assertEquals(occ['origin_url'], self.origin2['url'])
self.assertEquals(occ['branch'], self.occurrence2['branch'])
self.assertEquals(occ['revision'], self.revision2['id'])
self.assertEquals(occ['path'], self.dir2['entries'][0]['name'])
occ2 = self.storage.content_find_occurrence(
{'sha1_git': self.cont2['sha1_git']})
self.assertEquals(occ2['origin_type'], self.origin2['type'])
self.assertEquals(occ2['origin_url'], self.origin2['url'])
self.assertEquals(occ2['branch'], self.occurrence2['branch'])
self.assertEquals(occ2['revision'], self.revision2['id'])
self.assertEquals(occ2['path'], self.dir2['entries'][0]['name'])
occ3 = self.storage.content_find_occurrence(
{'sha256': self.cont2['sha256']})
self.assertEquals(occ3['origin_type'], self.origin2['type'])
self.assertEquals(occ3['origin_url'], self.origin2['url'])
self.assertEquals(occ3['branch'], self.occurrence2['branch'])
self.assertEquals(occ3['revision'], self.revision2['id'])
self.assertEquals(occ3['path'], self.dir2['entries'][0]['name'])
@istest
def content_find_occurrence_with_non_present_content(self):
# 1. with something that does not exist
missing_cont = self.missing_cont
occ = self.storage.content_find_occurrence(
{'sha1': missing_cont['sha1']})
self.assertEquals(occ, None,
"Content does not exist so no occurrence")
# 2. with something that does not exist
occ = self.storage.content_find_occurrence(
{'sha1_git': missing_cont['sha1_git']})
self.assertEquals(occ, None,
"Content does not exist so no occurrence")
# 3. with something that does not exist
occ = self.storage.content_find_occurrence(
{'sha256': missing_cont['sha256']})
self.assertEquals(occ, None,
"Content does not exist so no occurrence")
@istest
def content_find_occurrence_bad_input(self):
# 1. with bad input
with self.assertRaises(ValueError) as cm:
self.storage.content_find_occurrence({}) # empty is bad
self.assertIn('content keys', cm.exception.args[0])
# 2. with bad input
with self.assertRaises(ValueError) as cm:
self.storage.content_find_occurrence(
{'unknown-sha1': 'something'}) # not the right key
self.assertIn('content keys', cm.exception.args[0])
@istest
def entity_get_from_lister_metadata(self):
self.storage.entity_add([self.entity1])
fetched_entities = list(
self.storage.entity_get_from_lister_metadata(
[self.entity1_query, self.entity2_query]))
# Entity 1 should have full metadata, with last_seen/last_id instead
# of validity
entity1 = self.entity1.copy()
entity1['last_seen'] = entity1['validity'][0]
del fetched_entities[0]['last_id']
del entity1['validity']
# Entity 2 should have no metadata
entity2 = self.entity2_query.copy()
entity2['uuid'] = None
self.assertEquals(fetched_entities, [entity1, entity2])
@istest
def stat_counters(self):
expected_keys = ['content', 'directory', 'directory_entry_dir',
'occurrence', 'origin', 'person', 'revision']
counters = self.storage.stat_counters()
self.assertTrue(set(expected_keys) <= set(counters))
self.assertIsInstance(counters[expected_keys[0]], int)
class TestStorage(AbstractTestStorage, unittest.TestCase):
"""Test the local storage"""
# Can only be tested with local storage as you can't mock
# datetimes for the remote server
@istest
def fetch_history(self):
origin = self.storage.origin_add_one(self.origin)
with patch('datetime.datetime'):
datetime.datetime.now.return_value = self.fetch_history_date
fetch_history_id = self.storage.fetch_history_start(origin)
datetime.datetime.now.assert_called_with(tz=datetime.timezone.utc)
with patch('datetime.datetime'):
datetime.datetime.now.return_value = self.fetch_history_end
self.storage.fetch_history_end(fetch_history_id,
self.fetch_history_data)
fetch_history = self.storage.fetch_history_get(fetch_history_id)
expected_fetch_history = self.fetch_history_data.copy()
expected_fetch_history['id'] = fetch_history_id
expected_fetch_history['origin'] = origin
expected_fetch_history['date'] = self.fetch_history_date
expected_fetch_history['duration'] = self.fetch_history_duration
self.assertEqual(expected_fetch_history, fetch_history)
@istest
def content_find_with_present_content(self):
# 1. with something to find
cont = self.cont
self.storage.content_add([cont])
actually_present = self.storage.content_find({'sha1': cont['sha1']})
actually_present.pop('ctime')
self.assertEqual(actually_present, {
'sha1': cont['sha1'],
'sha256': cont['sha256'],
'sha1_git': cont['sha1_git'],
'length': cont['length'],
'status': 'visible'
})
# 2. with something to find
actually_present = self.storage.content_find(
{'sha1_git': cont['sha1_git']})
actually_present.pop('ctime')
self.assertEqual(actually_present, {
'sha1': cont['sha1'],
'sha256': cont['sha256'],
'sha1_git': cont['sha1_git'],
'length': cont['length'],
'status': 'visible'
})
# 3. with something to find
actually_present = self.storage.content_find(
{'sha256': cont['sha256']})
actually_present.pop('ctime')
self.assertEqual(actually_present, {
'sha1': cont['sha1'],
'sha256': cont['sha256'],
'sha1_git': cont['sha1_git'],
'length': cont['length'],
'status': 'visible'
})
# 4. with something to find
actually_present = self.storage.content_find(
{'sha1': cont['sha1'],
'sha1_git': cont['sha1_git'],
'sha256': cont['sha256']})
actually_present.pop('ctime')
self.assertEqual(actually_present, {
'sha1': cont['sha1'],
'sha256': cont['sha256'],
'sha1_git': cont['sha1_git'],
'length': cont['length'],
'status': 'visible'
})
@istest
def content_find_with_non_present_content(self):
# 1. with something that does not exist
missing_cont = self.missing_cont
actually_present = self.storage.content_find(
{'sha1': missing_cont['sha1']})
self.assertIsNone(actually_present)
# 2. with something that does not exist
actually_present = self.storage.content_find(
{'sha1_git': missing_cont['sha1_git']})
self.assertIsNone(actually_present)
# 3. with something that does not exist
actually_present = self.storage.content_find(
{'sha256': missing_cont['sha256']})
self.assertIsNone(actually_present)
@istest
def content_find_bad_input(self):
# 1. with bad input
with self.assertRaises(ValueError):
self.storage.content_find({}) # empty is bad
# 2. with bad input
with self.assertRaises(ValueError):
self.storage.content_find(
{'unknown-sha1': 'something'}) # not the right key
@istest
def person_get(self):
# given
person0 = {'name': b'bob', 'email': b'alice@bob'}
id0 = self.storage._person_add(person0)
person1 = {'name': b'tony', 'email': b'tony@bob'}
id1 = self.storage._person_add(person1)
# when
actual_persons = self.storage.person_get([id0, id1])
# given (person injection through release for example)
self.assertEqual(list(actual_persons), [{'id': id0,
'name': person0['name'],
'email': person0['email']},
{'id': id1,
'name': person1['name'],
'email': person1['email']}])