https://forge.softwareheritage.org/source/swh-deposit.git
Tip revision: 67662501bfe74494e6d466d19618a2af0f6cc41a authored by Jenkins for Software Heritage on 13 January 2020, 11:07:54 UTC
Updated debian changelog for version 0.0.81
Updated debian changelog for version 0.0.81
Tip revision: 6766250
client.py
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Module in charge of defining an swh-deposit client
"""
import hashlib
import os
import requests
import xmltodict
import logging
from abc import ABCMeta, abstractmethod
from urllib.parse import urljoin
from swh.core.config import SWHConfig
logger = logging.getLogger(__name__)
def _parse(stream, encoding='utf-8'):
"""Given a xml stream, parse the result.
Args:
stream (bytes/text): The stream to parse
encoding (str): The encoding to use if to decode the bytes
stream
Returns:
A dict of values corresponding to the parsed xml
"""
if isinstance(stream, bytes):
stream = stream.decode(encoding)
data = xmltodict.parse(stream, encoding=encoding, process_namespaces=False)
if 'entry' in data:
data = data['entry']
if 'sword:error' in data:
data = data['sword:error']
return dict(data)
def _parse_with_filter(stream, encoding='utf-8', keys=[]):
"""Given a xml stream, parse the result and filter with keys.
Args:
stream (bytes/text): The stream to parse
encoding (str): The encoding to use if to decode the bytes
stream
keys ([str]): Keys to filter the parsed result
Returns:
A dict of values corresponding to the parsed xml filtered by
the keys provided.
"""
data = _parse(stream, encoding=encoding)
m = {}
for key in keys:
m[key] = data.get(key)
return m
class BaseApiDepositClient(SWHConfig):
"""Deposit client base class
"""
CONFIG_BASE_FILENAME = 'deposit/client'
DEFAULT_CONFIG = {
'url': ('str', 'http://localhost:5006'),
'auth': ('dict', {}), # with optional 'username'/'password' keys
}
def __init__(self, config=None, _client=requests):
super().__init__()
if config is None:
self.config = super().parse_config_file()
else:
self.config = config
self._client = _client
self.base_url = self.config['url'].strip('/') + '/'
auth = self.config['auth']
if auth == {}:
self.auth = None
else:
self.auth = (auth['username'], auth['password'])
def do(self, method, url, *args, **kwargs):
"""Internal method to deal with requests, possibly with basic http
authentication.
Args:
method (str): supported http methods as in self._methods' keys
Returns:
The request's execution
"""
if hasattr(self._client, method):
method_fn = getattr(self._client, method)
else:
raise ValueError('Development error, unsupported method %s' % (
method))
if self.auth:
kwargs['auth'] = self.auth
full_url = urljoin(self.base_url, url.lstrip('/'))
return method_fn(full_url, *args, **kwargs)
class PrivateApiDepositClient(BaseApiDepositClient):
"""Private API deposit client to:
- read a given deposit's archive(s)
- read a given deposit's metadata
- update a given deposit's status
"""
def archive_get(self, archive_update_url, archive):
"""Retrieve the archive from the deposit to a local directory.
Args:
archive_update_url (str): The full deposit archive(s)'s raw content
to retrieve locally
archive (str): the local archive's path where to store
the raw content
Returns:
The archive path to the local archive to load.
Or None if any problem arose.
"""
r = self.do('get', archive_update_url, stream=True)
if r.ok:
with open(archive, 'wb') as f:
for chunk in r.iter_content():
f.write(chunk)
return archive
msg = 'Problem when retrieving deposit archive at %s' % (
archive_update_url, )
logger.error(msg)
raise ValueError(msg)
def metadata_get(self, metadata_url):
"""Retrieve the metadata information on a given deposit.
Args:
metadata_url (str): The full deposit metadata url to retrieve
locally
Returns:
The dictionary of metadata for that deposit or None if any
problem arose.
"""
r = self.do('get', metadata_url)
if r.ok:
return r.json()
msg = 'Problem when retrieving metadata at %s' % metadata_url
logger.error(msg)
raise ValueError(msg)
def status_update(self, update_status_url, status,
revision_id=None, directory_id=None, origin_url=None):
"""Update the deposit's status.
Args:
update_status_url (str): the full deposit's archive
status (str): The status to update the deposit with
revision_id (str/None): the revision's identifier to update to
directory_id (str/None): the directory's identifier to update to
origin_url (str/None): deposit's associated origin url
"""
payload = {'status': status}
if revision_id:
payload['revision_id'] = revision_id
if directory_id:
payload['directory_id'] = directory_id
if origin_url:
payload['origin_url'] = origin_url
self.do('put', update_status_url, json=payload)
def check(self, check_url):
"""Check the deposit's associated data (metadata, archive(s))
Args:
check_url (str): the full deposit's check url
"""
r = self.do('get', check_url)
if r.ok:
data = r.json()
return data['status']
msg = 'Problem when checking deposit %s' % check_url
logger.error(msg)
raise ValueError(msg)
class BaseDepositClient(BaseApiDepositClient, metaclass=ABCMeta):
"""Base Deposit client to access the public api.
"""
def __init__(self, config, error_msg=None, empty_result={}):
super().__init__(config)
self.error_msg = error_msg
self.empty_result = empty_result
@abstractmethod
def compute_url(self, *args, **kwargs):
"""Compute api url endpoint to query."""
pass
@abstractmethod
def compute_method(self, *args, **kwargs):
"""Http method to use on the url"""
pass
@abstractmethod
def parse_result_ok(self, xml_content):
"""Given an xml result from the api endpoint, parse it and returns a
dict.
"""
pass
def compute_information(self, *args, **kwargs):
"""Compute some more information given the inputs (e.g http headers,
...)
"""
return {}
def parse_result_error(self, xml_content):
"""Given an error response in xml, parse it into a dict.
Returns:
dict with following keys:
'error': The error message
'detail': Some more detail about the error if any
"""
return _parse_with_filter(xml_content, keys=[
'summary', 'detail', 'sword:verboseDescription'])
def do_execute(self, method, url, info):
"""Execute the http query to url using method and info information.
By default, execute a simple query to url with the http
method. Override this in daughter class to improve the
default behavior if needed.
"""
return self.do(method, url)
def execute(self, *args, **kwargs):
"""Main endpoint to prepare and execute the http query to the api.
"""
url = self.compute_url(*args, **kwargs)
method = self.compute_method(*args, **kwargs)
info = self.compute_information(*args, **kwargs)
try:
r = self.do_execute(method, url, info)
except Exception as e:
msg = self.error_msg % (url, e)
r = self.empty_result
r.update({
'error': msg,
})
return r
else:
if r.ok:
if int(r.status_code) == 204: # 204 returns no body
return {'status': r.status_code}
else:
return self.parse_result_ok(r.text)
else:
error = self.parse_result_error(r.text)
empty = self.empty_result
error.update(empty)
error.update({
'status': r.status_code,
})
return error
class ServiceDocumentDepositClient(BaseDepositClient):
"""Service Document information retrieval.
"""
def __init__(self, config):
super().__init__(config,
error_msg='Service document failure at %s: %s',
empty_result={'collection': None})
def compute_url(self, *args, **kwargs):
return '/servicedocument/'
def compute_method(self, *args, **kwargs):
return 'get'
def parse_result_ok(self, xml_content):
"""Parse service document's success response.
"""
return _parse(xml_content)
class StatusDepositClient(BaseDepositClient):
"""Status information on a deposit.
"""
def __init__(self, config):
super().__init__(config,
error_msg='Status check failure at %s: %s',
empty_result={
'deposit_status': None,
'deposit_status_detail': None,
'deposit_swh_id': None,
})
def compute_url(self, collection, deposit_id):
return '/%s/%s/status/' % (collection, deposit_id)
def compute_method(self, *args, **kwargs):
return 'get'
def parse_result_ok(self, xml_content):
"""Given an xml content as string, returns a deposit dict.
"""
return _parse_with_filter(xml_content, keys=[
'deposit_id',
'deposit_status',
'deposit_status_detail',
'deposit_swh_id',
'deposit_swh_id_context',
'deposit_swh_anchor_id',
'deposit_swh_anchor_id_context',
'deposit_external_id',
])
class BaseCreateDepositClient(BaseDepositClient):
"""Deposit client base class to post new deposit.
"""
def __init__(self, config):
super().__init__(config,
error_msg='Post Deposit failure at %s: %s',
empty_result={
'deposit_id': None,
'deposit_status': None,
})
def compute_url(self, collection, *args, **kwargs):
return '/%s/' % collection
def compute_method(self, *args, **kwargs):
return 'post'
def parse_result_ok(self, xml_content):
"""Given an xml content as string, returns a deposit dict.
"""
return _parse_with_filter(xml_content, keys=['deposit_id',
'deposit_status',
'deposit_status_detail',
'deposit_date'])
def _compute_information(self, collection, filepath, in_progress, slug,
is_archive=True):
"""Given a filepath, compute necessary information on that file.
Args:
filepath (str): Path to a file
is_archive (bool): is it an archive or not?
Returns:
dict with keys:
'content-type': content type associated
'md5sum': md5 sum
'filename': filename
"""
filename = os.path.basename(filepath)
if is_archive:
md5sum = hashlib.md5(open(filepath, 'rb').read()).hexdigest()
extension = filename.split('.')[-1]
if 'zip' in extension:
content_type = 'application/zip'
else:
content_type = 'application/x-tar'
else:
content_type = None
md5sum = None
return {
'slug': slug,
'in_progress': in_progress,
'content-type': content_type,
'md5sum': md5sum,
'filename': filename,
'filepath': filepath,
}
def compute_information(self, collection, filepath, in_progress, slug,
is_archive=True, **kwargs):
info = self._compute_information(collection, filepath, in_progress,
slug, is_archive=is_archive)
info['headers'] = self.compute_headers(info)
return info
def do_execute(self, method, url, info):
with open(info['filepath'], 'rb') as f:
return self.do(method, url, data=f, headers=info['headers'])
class CreateArchiveDepositClient(BaseCreateDepositClient):
"""Post an archive (binary) deposit client."""
def compute_headers(self, info):
return {
'SLUG': info['slug'],
'CONTENT_MD5': info['md5sum'],
'IN-PROGRESS': str(info['in_progress']),
'CONTENT-TYPE': info['content-type'],
'CONTENT-DISPOSITION': 'attachment; filename=%s' % (
info['filename'], ),
}
class UpdateArchiveDepositClient(CreateArchiveDepositClient):
"""Update (add/replace) an archive (binary) deposit client."""
def compute_url(self, collection, *args, deposit_id=None, **kwargs):
return '/%s/%s/media/' % (collection, deposit_id)
def compute_method(self, *args, replace=False, **kwargs):
return 'put' if replace else 'post'
class CreateMetadataDepositClient(BaseCreateDepositClient):
"""Post a metadata deposit client."""
def compute_headers(self, info):
return {
'SLUG': info['slug'],
'IN-PROGRESS': str(info['in_progress']),
'CONTENT-TYPE': 'application/atom+xml;type=entry',
}
class UpdateMetadataDepositClient(CreateMetadataDepositClient):
"""Update (add/replace) a metadata deposit client."""
def compute_url(self, collection, *args, deposit_id=None, **kwargs):
return '/%s/%s/metadata/' % (collection, deposit_id)
def compute_method(self, *args, replace=False, **kwargs):
return 'put' if replace else 'post'
class CreateMultipartDepositClient(BaseCreateDepositClient):
"""Create a multipart deposit client."""
def _multipart_info(self, info, info_meta):
files = [
('file',
(info['filename'],
open(info['filepath'], 'rb'),
info['content-type'])),
('atom',
(info_meta['filename'],
open(info_meta['filepath'], 'rb'),
'application/atom+xml')),
]
headers = {
'SLUG': info['slug'],
'CONTENT_MD5': info['md5sum'],
'IN-PROGRESS': str(info['in_progress']),
}
return files, headers
def compute_information(self, collection, archive, metadata,
in_progress, slug, **kwargs):
info = self._compute_information(
collection, archive, in_progress, slug)
info_meta = self._compute_information(
collection, metadata, in_progress, slug, is_archive=False)
files, headers = self._multipart_info(info, info_meta)
return {'files': files, 'headers': headers}
def do_execute(self, method, url, info):
return self.do(
method, url, files=info['files'], headers=info['headers'])
class UpdateMultipartDepositClient(CreateMultipartDepositClient):
"""Update a multipart deposit client."""
def compute_url(self, collection, *args, deposit_id=None, **kwargs):
return '/%s/%s/metadata/' % (collection, deposit_id)
def compute_method(self, *args, replace=False, **kwargs):
return 'put' if replace else 'post'
class PublicApiDepositClient(BaseApiDepositClient):
"""Public api deposit client."""
def service_document(self):
"""Retrieve service document endpoint's information."""
return ServiceDocumentDepositClient(self.config).execute()
def deposit_status(self, collection, deposit_id):
"""Retrieve status information on a deposit."""
return StatusDepositClient(self.config).execute(
collection, deposit_id)
def deposit_create(self, collection, slug, archive=None,
metadata=None, in_progress=False):
"""Create a new deposit (archive, metadata, both as multipart)."""
if archive and not metadata:
return CreateArchiveDepositClient(self.config).execute(
collection, archive, in_progress, slug)
elif not archive and metadata:
return CreateMetadataDepositClient(self.config).execute(
collection, metadata, in_progress, slug,
is_archive=False)
else:
return CreateMultipartDepositClient(self.config).execute(
collection, archive, metadata, in_progress,
slug)
def deposit_update(self, collection, deposit_id, slug, archive=None,
metadata=None, in_progress=False,
replace=False):
"""Update (add/replace) existing deposit (archive, metadata, both)."""
r = self.deposit_status(collection, deposit_id)
if 'error' in r:
return r
status = r['deposit_status']
if status != 'partial':
return {
'error': "You can only act on deposit with status 'partial'",
'detail': "The deposit %s has status '%s'" % (
deposit_id, status),
'deposit_status': status,
'deposit_id': deposit_id,
}
if archive and not metadata:
r = UpdateArchiveDepositClient(self.config).execute(
collection, archive, in_progress, slug,
deposit_id=deposit_id, replace=replace)
elif not archive and metadata:
r = UpdateMetadataDepositClient(self.config).execute(
collection, metadata, in_progress, slug,
deposit_id=deposit_id, replace=replace)
else:
r = UpdateMultipartDepositClient(self.config).execute(
collection, archive, metadata, in_progress,
slug, deposit_id=deposit_id, replace=replace)
if 'error' in r:
return r
return self.deposit_status(collection, deposit_id)