https://forge.softwareheritage.org/source/swh-deposit.git
Tip revision: 36bedfe6162eeffefc581754f94ea98b41660e92 authored by Jenkins for Software Heritage on 07 May 2020, 13:52:51 UTC
Updated debian changelog for version 0.0.84
Updated debian changelog for version 0.0.84
Tip revision: 36bedfe
client.py
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Module in charge of defining an swh-deposit client
"""
import hashlib
import os
import requests
import xmltodict
import logging
from abc import ABCMeta, abstractmethod
from urllib.parse import urljoin
from swh.core.config import SWHConfig
logger = logging.getLogger(__name__)
def _parse(stream, encoding="utf-8"):
"""Given a xml stream, parse the result.
Args:
stream (bytes/text): The stream to parse
encoding (str): The encoding to use if to decode the bytes
stream
Returns:
A dict of values corresponding to the parsed xml
"""
if isinstance(stream, bytes):
stream = stream.decode(encoding)
data = xmltodict.parse(stream, encoding=encoding, process_namespaces=False)
if "entry" in data:
data = data["entry"]
if "sword:error" in data:
data = data["sword:error"]
return dict(data)
def _parse_with_filter(stream, encoding="utf-8", keys=[]):
"""Given a xml stream, parse the result and filter with keys.
Args:
stream (bytes/text): The stream to parse
encoding (str): The encoding to use if to decode the bytes
stream
keys ([str]): Keys to filter the parsed result
Returns:
A dict of values corresponding to the parsed xml filtered by
the keys provided.
"""
data = _parse(stream, encoding=encoding)
m = {}
for key in keys:
m[key] = data.get(key)
return m
class BaseApiDepositClient(SWHConfig):
"""Deposit client base class
"""
CONFIG_BASE_FILENAME = "deposit/client"
DEFAULT_CONFIG = {
"url": ("str", "http://localhost:5006"),
"auth": ("dict", {}), # with optional 'username'/'password' keys
}
def __init__(self, config=None, _client=requests):
super().__init__()
if config is None:
self.config = super().parse_config_file()
else:
self.config = config
self._client = _client
self.base_url = self.config["url"].strip("/") + "/"
auth = self.config["auth"]
if auth == {}:
self.auth = None
else:
self.auth = (auth["username"], auth["password"])
def do(self, method, url, *args, **kwargs):
"""Internal method to deal with requests, possibly with basic http
authentication.
Args:
method (str): supported http methods as in self._methods' keys
Returns:
The request's execution
"""
if hasattr(self._client, method):
method_fn = getattr(self._client, method)
else:
raise ValueError("Development error, unsupported method %s" % (method))
if self.auth:
kwargs["auth"] = self.auth
full_url = urljoin(self.base_url, url.lstrip("/"))
return method_fn(full_url, *args, **kwargs)
class PrivateApiDepositClient(BaseApiDepositClient):
"""Private API deposit client to:
- read a given deposit's archive(s)
- read a given deposit's metadata
- update a given deposit's status
"""
def archive_get(self, archive_update_url, archive):
"""Retrieve the archive from the deposit to a local directory.
Args:
archive_update_url (str): The full deposit archive(s)'s raw content
to retrieve locally
archive (str): the local archive's path where to store
the raw content
Returns:
The archive path to the local archive to load.
Or None if any problem arose.
"""
r = self.do("get", archive_update_url, stream=True)
if r.ok:
with open(archive, "wb") as f:
for chunk in r.iter_content():
f.write(chunk)
return archive
msg = "Problem when retrieving deposit archive at %s" % (archive_update_url,)
logger.error(msg)
raise ValueError(msg)
def metadata_get(self, metadata_url):
"""Retrieve the metadata information on a given deposit.
Args:
metadata_url (str): The full deposit metadata url to retrieve
locally
Returns:
The dictionary of metadata for that deposit or None if any
problem arose.
"""
r = self.do("get", metadata_url)
if r.ok:
return r.json()
msg = "Problem when retrieving metadata at %s" % metadata_url
logger.error(msg)
raise ValueError(msg)
def status_update(
self,
update_status_url,
status,
revision_id=None,
directory_id=None,
origin_url=None,
):
"""Update the deposit's status.
Args:
update_status_url (str): the full deposit's archive
status (str): The status to update the deposit with
revision_id (str/None): the revision's identifier to update to
directory_id (str/None): the directory's identifier to update to
origin_url (str/None): deposit's associated origin url
"""
payload = {"status": status}
if revision_id:
payload["revision_id"] = revision_id
if directory_id:
payload["directory_id"] = directory_id
if origin_url:
payload["origin_url"] = origin_url
self.do("put", update_status_url, json=payload)
def check(self, check_url):
"""Check the deposit's associated data (metadata, archive(s))
Args:
check_url (str): the full deposit's check url
"""
r = self.do("get", check_url)
if r.ok:
data = r.json()
return data["status"]
msg = "Problem when checking deposit %s" % check_url
logger.error(msg)
raise ValueError(msg)
class BaseDepositClient(BaseApiDepositClient, metaclass=ABCMeta):
"""Base Deposit client to access the public api.
"""
def __init__(self, config, error_msg=None, empty_result={}):
super().__init__(config)
self.error_msg = error_msg
self.empty_result = empty_result
@abstractmethod
def compute_url(self, *args, **kwargs):
"""Compute api url endpoint to query."""
pass
@abstractmethod
def compute_method(self, *args, **kwargs):
"""Http method to use on the url"""
pass
@abstractmethod
def parse_result_ok(self, xml_content):
"""Given an xml result from the api endpoint, parse it and returns a
dict.
"""
pass
def compute_information(self, *args, **kwargs):
"""Compute some more information given the inputs (e.g http headers,
...)
"""
return {}
def parse_result_error(self, xml_content):
"""Given an error response in xml, parse it into a dict.
Returns:
dict with following keys:
'error': The error message
'detail': Some more detail about the error if any
"""
return _parse_with_filter(
xml_content, keys=["summary", "detail", "sword:verboseDescription"]
)
def do_execute(self, method, url, info):
"""Execute the http query to url using method and info information.
By default, execute a simple query to url with the http
method. Override this in daughter class to improve the
default behavior if needed.
"""
return self.do(method, url)
def execute(self, *args, **kwargs):
"""Main endpoint to prepare and execute the http query to the api.
"""
url = self.compute_url(*args, **kwargs)
method = self.compute_method(*args, **kwargs)
info = self.compute_information(*args, **kwargs)
try:
r = self.do_execute(method, url, info)
except Exception as e:
msg = self.error_msg % (url, e)
r = self.empty_result
r.update(
{"error": msg,}
)
return r
else:
if r.ok:
if int(r.status_code) == 204: # 204 returns no body
return {"status": r.status_code}
else:
return self.parse_result_ok(r.text)
else:
error = self.parse_result_error(r.text)
empty = self.empty_result
error.update(empty)
error.update(
{"status": r.status_code,}
)
return error
class ServiceDocumentDepositClient(BaseDepositClient):
"""Service Document information retrieval.
"""
def __init__(self, config):
super().__init__(
config,
error_msg="Service document failure at %s: %s",
empty_result={"collection": None},
)
def compute_url(self, *args, **kwargs):
return "/servicedocument/"
def compute_method(self, *args, **kwargs):
return "get"
def parse_result_ok(self, xml_content):
"""Parse service document's success response.
"""
return _parse(xml_content)
class StatusDepositClient(BaseDepositClient):
"""Status information on a deposit.
"""
def __init__(self, config):
super().__init__(
config,
error_msg="Status check failure at %s: %s",
empty_result={
"deposit_status": None,
"deposit_status_detail": None,
"deposit_swh_id": None,
},
)
def compute_url(self, collection, deposit_id):
return "/%s/%s/status/" % (collection, deposit_id)
def compute_method(self, *args, **kwargs):
return "get"
def parse_result_ok(self, xml_content):
"""Given an xml content as string, returns a deposit dict.
"""
return _parse_with_filter(
xml_content,
keys=[
"deposit_id",
"deposit_status",
"deposit_status_detail",
"deposit_swh_id",
"deposit_swh_id_context",
"deposit_swh_anchor_id",
"deposit_swh_anchor_id_context",
"deposit_external_id",
],
)
class BaseCreateDepositClient(BaseDepositClient):
"""Deposit client base class to post new deposit.
"""
def __init__(self, config):
super().__init__(
config,
error_msg="Post Deposit failure at %s: %s",
empty_result={"deposit_id": None, "deposit_status": None,},
)
def compute_url(self, collection, *args, **kwargs):
return "/%s/" % collection
def compute_method(self, *args, **kwargs):
return "post"
def parse_result_ok(self, xml_content):
"""Given an xml content as string, returns a deposit dict.
"""
return _parse_with_filter(
xml_content,
keys=[
"deposit_id",
"deposit_status",
"deposit_status_detail",
"deposit_date",
],
)
def _compute_information(
self, collection, filepath, in_progress, slug, is_archive=True
):
"""Given a filepath, compute necessary information on that file.
Args:
filepath (str): Path to a file
is_archive (bool): is it an archive or not?
Returns:
dict with keys:
'content-type': content type associated
'md5sum': md5 sum
'filename': filename
"""
filename = os.path.basename(filepath)
if is_archive:
md5sum = hashlib.md5(open(filepath, "rb").read()).hexdigest()
extension = filename.split(".")[-1]
if "zip" in extension:
content_type = "application/zip"
else:
content_type = "application/x-tar"
else:
content_type = None
md5sum = None
return {
"slug": slug,
"in_progress": in_progress,
"content-type": content_type,
"md5sum": md5sum,
"filename": filename,
"filepath": filepath,
}
def compute_information(
self, collection, filepath, in_progress, slug, is_archive=True, **kwargs
):
info = self._compute_information(
collection, filepath, in_progress, slug, is_archive=is_archive
)
info["headers"] = self.compute_headers(info)
return info
def do_execute(self, method, url, info):
with open(info["filepath"], "rb") as f:
return self.do(method, url, data=f, headers=info["headers"])
class CreateArchiveDepositClient(BaseCreateDepositClient):
"""Post an archive (binary) deposit client."""
def compute_headers(self, info):
return {
"SLUG": info["slug"],
"CONTENT_MD5": info["md5sum"],
"IN-PROGRESS": str(info["in_progress"]),
"CONTENT-TYPE": info["content-type"],
"CONTENT-DISPOSITION": "attachment; filename=%s" % (info["filename"],),
}
class UpdateArchiveDepositClient(CreateArchiveDepositClient):
"""Update (add/replace) an archive (binary) deposit client."""
def compute_url(self, collection, *args, deposit_id=None, **kwargs):
return "/%s/%s/media/" % (collection, deposit_id)
def compute_method(self, *args, replace=False, **kwargs):
return "put" if replace else "post"
class CreateMetadataDepositClient(BaseCreateDepositClient):
"""Post a metadata deposit client."""
def compute_headers(self, info):
return {
"SLUG": info["slug"],
"IN-PROGRESS": str(info["in_progress"]),
"CONTENT-TYPE": "application/atom+xml;type=entry",
}
class UpdateMetadataDepositClient(CreateMetadataDepositClient):
"""Update (add/replace) a metadata deposit client."""
def compute_url(self, collection, *args, deposit_id=None, **kwargs):
return "/%s/%s/metadata/" % (collection, deposit_id)
def compute_method(self, *args, replace=False, **kwargs):
return "put" if replace else "post"
class CreateMultipartDepositClient(BaseCreateDepositClient):
"""Create a multipart deposit client."""
def _multipart_info(self, info, info_meta):
files = [
(
"file",
(info["filename"], open(info["filepath"], "rb"), info["content-type"]),
),
(
"atom",
(
info_meta["filename"],
open(info_meta["filepath"], "rb"),
"application/atom+xml",
),
),
]
headers = {
"SLUG": info["slug"],
"CONTENT_MD5": info["md5sum"],
"IN-PROGRESS": str(info["in_progress"]),
}
return files, headers
def compute_information(
self, collection, archive, metadata, in_progress, slug, **kwargs
):
info = self._compute_information(collection, archive, in_progress, slug)
info_meta = self._compute_information(
collection, metadata, in_progress, slug, is_archive=False
)
files, headers = self._multipart_info(info, info_meta)
return {"files": files, "headers": headers}
def do_execute(self, method, url, info):
return self.do(method, url, files=info["files"], headers=info["headers"])
class UpdateMultipartDepositClient(CreateMultipartDepositClient):
"""Update a multipart deposit client."""
def compute_url(self, collection, *args, deposit_id=None, **kwargs):
return "/%s/%s/metadata/" % (collection, deposit_id)
def compute_method(self, *args, replace=False, **kwargs):
return "put" if replace else "post"
class PublicApiDepositClient(BaseApiDepositClient):
"""Public api deposit client."""
def service_document(self):
"""Retrieve service document endpoint's information."""
return ServiceDocumentDepositClient(self.config).execute()
def deposit_status(self, collection, deposit_id):
"""Retrieve status information on a deposit."""
return StatusDepositClient(self.config).execute(collection, deposit_id)
def deposit_create(
self, collection, slug, archive=None, metadata=None, in_progress=False
):
"""Create a new deposit (archive, metadata, both as multipart)."""
if archive and not metadata:
return CreateArchiveDepositClient(self.config).execute(
collection, archive, in_progress, slug
)
elif not archive and metadata:
return CreateMetadataDepositClient(self.config).execute(
collection, metadata, in_progress, slug, is_archive=False
)
else:
return CreateMultipartDepositClient(self.config).execute(
collection, archive, metadata, in_progress, slug
)
def deposit_update(
self,
collection,
deposit_id,
slug,
archive=None,
metadata=None,
in_progress=False,
replace=False,
):
"""Update (add/replace) existing deposit (archive, metadata, both)."""
r = self.deposit_status(collection, deposit_id)
if "error" in r:
return r
status = r["deposit_status"]
if status != "partial":
return {
"error": "You can only act on deposit with status 'partial'",
"detail": "The deposit %s has status '%s'" % (deposit_id, status),
"deposit_status": status,
"deposit_id": deposit_id,
}
if archive and not metadata:
r = UpdateArchiveDepositClient(self.config).execute(
collection,
archive,
in_progress,
slug,
deposit_id=deposit_id,
replace=replace,
)
elif not archive and metadata:
r = UpdateMetadataDepositClient(self.config).execute(
collection,
metadata,
in_progress,
slug,
deposit_id=deposit_id,
replace=replace,
)
else:
r = UpdateMultipartDepositClient(self.config).execute(
collection,
archive,
metadata,
in_progress,
slug,
deposit_id=deposit_id,
replace=replace,
)
if "error" in r:
return r
return self.deposit_status(collection, deposit_id)