Revision cbd5f433f3ff478fb26e9c09b9e3b5c74512e28d authored by Jonas Otten on 13 December 2023, 10:29:49 UTC, committed by GitHub on 13 December 2023, 10:29:49 UTC
It is not possible to attach a GCS cloud storage bucket whose name contains a dot. As stated [here](https://cloud.google.com/storage/docs/buckets#naming), GCS bucket names allow dots. This is a major problem for us, and this PR fixes this issue.

---------

Co-authored-by: Jonas Otten <jootten@uos.de>
Co-authored-by: Boris Sekachev <sekachev.bs@gmail.com>
Co-authored-by: Maria Khrustaleva <maria@cvat.ai>
1 parent 2aae952
Raw File
mixins.py
# Copyright (C) 2021-2022 Intel Corporation
# Copyright (C) 2022-2023 CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT

import base64
import json
import os
import os.path
import uuid
from dataclasses import asdict, dataclass
from distutils.util import strtobool
from pathlib import Path
from tempfile import NamedTemporaryFile
from unittest import mock

import django_rq
from django.conf import settings
from rest_framework import mixins, status
from rest_framework.response import Response

from cvat.apps.engine.location import StorageType, get_location_configuration
from cvat.apps.engine.log import ServerLogManager
from cvat.apps.engine.models import Location
from cvat.apps.engine.serializers import DataSerializer
from cvat.apps.engine.handlers import clear_import_cache
from cvat.apps.engine.utils import get_import_rq_id

slogger = ServerLogManager(__name__)

class TusFile:
    @dataclass
    class TusMeta:
        metadata: dict
        filename: str
        file_size: int
        offset: int = 0

    class TusMetaFile():
        def __init__(self, path) -> None:
            self._path = path
            self._meta = None
            if os.path.exists(self._path):
                self._meta = self._read()

        @property
        def meta(self):
            return self._meta

        @meta.setter
        def meta(self, meta):
            self._meta = meta

        def _read(self):
            with open(self._path, "r") as fp:
                data = json.load(fp)
            return TusFile.TusMeta(**data)

        def save(self):
            if self._meta is not None:
                os.makedirs(os.path.dirname(self._path), exist_ok=True)
                with open(self._path, "w") as fp:
                    json.dump(asdict(self._meta), fp)

        def exists(self):
            return os.path.exists(self._path)

        def delete(self):
            os.remove(self._path)

    def __init__(self, file_id, upload_dir, meta=None):
        self.file_id = file_id
        self.upload_dir = upload_dir
        self.file_path = os.path.join(self.upload_dir, self.file_id)
        self.meta_file = self.TusMetaFile(self._get_tus_meta_file_path(file_id, upload_dir))
        if meta is not None:
            self.meta_file.meta = meta
            self.meta_file.save()

    @property
    def filename(self):
        return self.meta_file.meta.filename

    @property
    def file_size(self):
        return self.meta_file.meta.file_size

    @property
    def offset(self):
        return self.meta_file.meta.offset

    def exists(self):
        return self.meta_file.exists()

    @staticmethod
    def _get_tus_meta_file_path(file_id, upload_dir):
        return os.path.join(upload_dir, f"{file_id}.meta")

    def init_file(self):
        os.makedirs(self.upload_dir, exist_ok=True)
        file_path = os.path.join(self.upload_dir, self.file_id)
        with open(file_path, 'wb') as file:
            file.seek(self.file_size - 1)
            file.write(b'\0')

    def write_chunk(self, chunk):
        with open(self.file_path, 'r+b') as file:
            file.seek(chunk.offset)
            file.write(chunk.content)
        self.meta_file.meta.offset += chunk.size
        self.meta_file.save()

    def is_complete(self):
        return self.offset == self.file_size

    def rename(self):
        file_path = os.path.join(self.upload_dir, self.filename)
        if os.path.lexists(file_path):
            original_file_name, extension = os.path.splitext(self.filename)
            file_amount = 1
            while os.path.lexists(os.path.join(self.upload_dir, self.filename)):
                self.meta_file.meta.filename = "{}_{}{}".format(original_file_name, file_amount, extension)
                file_path = os.path.join(self.upload_dir, self.filename)
                file_amount += 1
        os.rename(self.file_path, file_path)

    def clean(self):
        self.meta_file.delete()

    @staticmethod
    def create_file(metadata, file_size, upload_dir):
        file_id = str(uuid.uuid4())
        filename = metadata.get("filename")

        tus_file = TusFile(
            file_id,
            upload_dir,
            TusFile.TusMeta(
                filename=filename,
                file_size=file_size,
                offset=0,
                metadata=metadata,
            ),
        )
        tus_file.init_file()

        return tus_file

class TusChunk:
    def __init__(self, request):
        self.META = request.META
        self.offset = int(request.META.get("HTTP_UPLOAD_OFFSET", 0))
        self.size = int(request.META.get("CONTENT_LENGTH", settings.TUS_DEFAULT_CHUNK_SIZE))
        self.content = request.body

class UploadMixin:
    """
    Implements file uploads to the server. Allows to upload single and multiple files, suspend
    and resume uploading. Uses the TUS open file uploading protocol (https://tus.io/).

    Implements the following protocols:
    a. A single Data request

    and

    b.1. An Upload-Start request
    b.2.a. The regular TUS protocol requests (Upload-Length + Chunks)
    b.2.b. Upload-Multiple requests
    b.3. An Upload-Finish request

    Requests:
    - Data - POST, no extra headers or 'Upload-Start' + 'Upload-Finish' headers
    - Upload-Start - POST, has an 'Upload-Start' header
    - Upload-Length - POST, has an 'Upload-Length' header (read the TUS protocol)
    - Chunk - HEAD/PATCH (read the TUS protocol)
    - Upload-Finish - POST, has an 'Upload-Finish' header
    - Upload-Multiple - POST, has a 'Upload-Multiple' header
    """

    _tus_api_version = '1.0.0'
    _tus_api_version_supported = ['1.0.0']
    _tus_api_extensions = []
    _tus_max_file_size = str(settings.TUS_MAX_FILE_SIZE)
    _base_tus_headers = {
        'Tus-Resumable': _tus_api_version,
        'Tus-Version': ",".join(_tus_api_version_supported),
        'Tus-Extension': ",".join(_tus_api_extensions),
        'Tus-Max-Size': _tus_max_file_size,
        'Access-Control-Allow-Origin': "*",
        'Access-Control-Allow-Methods': "PATCH,HEAD,GET,POST,OPTIONS",
        'Access-Control-Expose-Headers': "Tus-Resumable,upload-length,upload-metadata,Location,Upload-Offset",
        'Access-Control-Allow-Headers': "Tus-Resumable,upload-length,upload-metadata,Location,Upload-Offset,content-type",
        'Cache-Control': 'no-store'
    }
    file_id_regex = r'(?P<file_id>\b[0-9a-f]{8}\b-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-\b[0-9a-f]{12}\b)'

    def _tus_response(self, status, data=None, extra_headers=None):
        response = Response(data, status)
        for key, value in self._base_tus_headers.items():
            response.__setitem__(key, value)
        if extra_headers:
            for key, value in extra_headers.items():
                response.__setitem__(key, value)
        return response

    def _get_metadata(self, request):
        metadata = {}
        if request.META.get("HTTP_UPLOAD_METADATA"):
            for kv in request.META.get("HTTP_UPLOAD_METADATA").split(","):
                splited_metadata = kv.split(" ")
                if len(splited_metadata) == 2:
                    key, value = splited_metadata
                    value = base64.b64decode(value)
                    if isinstance(value, bytes):
                        value = value.decode()
                    metadata[key] = value
                else:
                    metadata[splited_metadata[0]] = ""
        return metadata

    def upload_data(self, request):
        tus_request = request.headers.get('Upload-Length', None) is not None or request.method == 'OPTIONS'
        bulk_file_upload = request.headers.get('Upload-Multiple', None) is not None
        start_upload = request.headers.get('Upload-Start', None) is not None
        finish_upload = request.headers.get('Upload-Finish', None) is not None
        one_request_upload = start_upload and finish_upload
        if one_request_upload or finish_upload:
            return self.upload_finished(request)
        elif start_upload:
            return self.upload_started(request)
        elif tus_request:
            return self.init_tus_upload(request)
        elif bulk_file_upload:
            return self.append_files(request)
        else: # backward compatibility case - no upload headers were found
            return self.upload_finished(request)

    def init_tus_upload(self, request):
        if request.method == 'OPTIONS':
            return self._tus_response(status=status.HTTP_204)
        else:
            metadata = self._get_metadata(request)
            filename = metadata.get('filename', '')
            if not self.is_valid_uploaded_file_name(filename):
                return self._tus_response(status=status.HTTP_400_BAD_REQUEST,
                    data="File name {} is not allowed".format(filename))


            message_id = request.META.get("HTTP_MESSAGE_ID")
            if message_id:
                metadata["message_id"] = base64.b64decode(message_id)

            import_type = request.path.strip('/').split('/')[-1]
            if import_type == 'backup':
                # we need to create unique temp file here because
                # users can try to import backups with the same name at the same time
                with NamedTemporaryFile(prefix=f'cvat-backup-{filename}-by-{request.user}', suffix='.zip', dir=self.get_upload_dir()) as tmp_file:
                    filename = os.path.relpath(tmp_file.name, self.get_upload_dir())
                metadata['filename'] = filename
            file_path = os.path.join(self.get_upload_dir(), filename)
            file_exists = os.path.lexists(file_path) and import_type != 'backup'

            if file_exists:
                # check whether the rq_job is in progress or has been finished/failed
                object_class_name = self._object.__class__.__name__.lower()
                template = get_import_rq_id(object_class_name, self._object.pk, import_type, request.user)
                queue = django_rq.get_queue(settings.CVAT_QUEUES.IMPORT_DATA.value)
                finished_job_ids = queue.finished_job_registry.get_job_ids()
                failed_job_ids = queue.failed_job_registry.get_job_ids()
                if template in finished_job_ids or template in failed_job_ids:
                    os.remove(file_path)
                    file_exists = False

            if file_exists:
                return self._tus_response(status=status.HTTP_409_CONFLICT,
                    data="File with same name already exists")

            file_size = int(request.META.get("HTTP_UPLOAD_LENGTH", "0"))
            if file_size > int(self._tus_max_file_size):
                return self._tus_response(status=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
                    data="File size exceeds max limit of {} bytes".format(self._tus_max_file_size))


            tus_file = TusFile.create_file(metadata, file_size, self.get_upload_dir())

            location = request.build_absolute_uri()
            if 'HTTP_X_FORWARDED_HOST' not in request.META:
                location = request.META.get('HTTP_ORIGIN') + request.META.get('PATH_INFO')

            if import_type in ('backup', 'annotations', 'datasets'):
                scheduler = django_rq.get_scheduler(settings.CVAT_QUEUES.CLEANING.value)
                path = Path(self.get_upload_dir()) / tus_file.filename
                cleaning_job = scheduler.enqueue_in(time_delta=settings.IMPORT_CACHE_CLEAN_DELAY,
                    func=clear_import_cache,
                    path=path,
                    creation_time=Path(tus_file.file_path).stat().st_ctime
                )
                slogger.glob.info(
                    f'The cleaning job {cleaning_job.id} is queued.'
                    f'The check that the file {path} is deleted will be carried out after '
                    f'{settings.IMPORT_CACHE_CLEAN_DELAY}.'
                )

            return self._tus_response(
                status=status.HTTP_201_CREATED,
                extra_headers={'Location': '{}{}'.format(location, tus_file.file_id),
                               'Upload-Filename': tus_file.filename})

    def append_tus_chunk(self, request, file_id):
        tus_file = TusFile(str(file_id), self.get_upload_dir())
        if request.method == 'HEAD':
            if tus_file.exists():
                return self._tus_response(status=status.HTTP_200_OK, extra_headers={
                               'Upload-Offset': tus_file.offset,
                               'Upload-Length': tus_file.file_size})
            return self._tus_response(status=status.HTTP_404_NOT_FOUND)

        chunk = TusChunk(request)

        if chunk.offset != tus_file.offset:
            return self._tus_response(status=status.HTTP_409_CONFLICT)

        if chunk.offset > tus_file.file_size:
            return self._tus_response(status=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE)

        tus_file.write_chunk(chunk)

        if tus_file.is_complete():
            tus_file.rename()
            tus_file.clean()

        return self._tus_response(status=status.HTTP_204_NO_CONTENT,
                                    extra_headers={'Upload-Offset': tus_file.offset,
                                                    'Upload-Filename': tus_file.filename})

    def is_valid_uploaded_file_name(self, filename: str) -> bool:
        """
        Checks the file name to be valid.
        Returns True if the filename is valid, otherwise returns False.
        """

        upload_dir = self.get_upload_dir()
        file_path = os.path.join(upload_dir, filename)
        return os.path.commonprefix((os.path.realpath(file_path), upload_dir)) == upload_dir

    def get_upload_dir(self) -> str:
        return self._object.data.get_upload_dirname()

    def _get_request_client_files(self, request):
        serializer = DataSerializer(self._object, data=request.data)
        serializer.is_valid(raise_exception=True)
        return serializer.validated_data.get('client_files')

    def append_files(self, request):
        """
        Processes a single or multiple files sent in a single request inside
        a file uploading session.
        """

        client_files = self._get_request_client_files(request)
        if client_files:
            upload_dir = self.get_upload_dir()
            for client_file in client_files:
                filename = client_file['file'].name
                if not self.is_valid_uploaded_file_name(filename):
                    return Response(status=status.HTTP_400_BAD_REQUEST,
                        data=f"File name {filename} is not allowed", content_type="text/plain")

                with open(os.path.join(upload_dir, filename), 'ab+') as destination:
                    destination.write(client_file['file'].read())
        return Response(status=status.HTTP_200_OK)

    def upload_started(self, request):
        """
        Allows to do actions before upcoming file uploading.
        """
        return Response(status=status.HTTP_202_ACCEPTED)

    def upload_finished(self, request):
        """
        Allows to process uploaded files.
        """

        raise NotImplementedError('Must be implemented in the derived class')

class AnnotationMixin:
    def export_annotations(self, request, db_obj, export_func, callback, get_data=None):
        format_name = request.query_params.get("format", "")
        action = request.query_params.get("action", "").lower()
        filename = request.query_params.get("filename", "")

        use_default_location = request.query_params.get("use_default_location", True)
        use_settings = strtobool(str(use_default_location))
        obj = db_obj if use_settings else request.query_params
        location_conf = get_location_configuration(
            obj=obj,
            use_settings=use_settings,
            field_name=StorageType.TARGET,
        )

        object_name = self._object.__class__.__name__.lower()
        rq_id = f"export:annotations-for-{object_name}.id{self._object.pk}-in-{format_name.replace(' ', '_')}-format"

        if format_name:
            return export_func(db_instance=self._object,
                rq_id=rq_id,
                request=request,
                action=action,
                callback=callback,
                format_name=format_name,
                filename=filename,
                location_conf=location_conf,
            )

        if not get_data:
            return Response("Format is not specified",status=status.HTTP_400_BAD_REQUEST)

        data = get_data(self._object.pk)
        return Response(data)

    def import_annotations(self, request, db_obj, import_func, rq_func, rq_id_template):
        is_tus_request = request.headers.get('Upload-Length', None) is not None or \
            request.method == 'OPTIONS'
        if is_tus_request:
            return self.init_tus_upload(request)

        use_default_location = request.query_params.get('use_default_location', True)
        conv_mask_to_poly = strtobool(request.query_params.get('conv_mask_to_poly', 'True'))
        use_settings = strtobool(str(use_default_location))
        obj = db_obj if use_settings else request.query_params
        location_conf = get_location_configuration(
            obj=obj,
            use_settings=use_settings,
            field_name=StorageType.SOURCE,
        )

        if location_conf['location'] == Location.CLOUD_STORAGE:
            format_name = request.query_params.get('format')
            file_name = request.query_params.get('filename')

            return import_func(
                request=request,
                rq_id_template=rq_id_template,
                rq_func=rq_func,
                db_obj=self._object,
                format_name=format_name,
                location_conf=location_conf,
                filename=file_name,
                conv_mask_to_poly=conv_mask_to_poly,
            )

        return self.upload_data(request)

class SerializeMixin:
    def serialize(self, request, export_func):
        db_object = self.get_object() # force to call check_object_permissions
        return export_func(
            db_object,
            request,
            queue_name=settings.CVAT_QUEUES.EXPORT_DATA.value,
        )

    def deserialize(self, request, import_func):
        location = request.query_params.get("location", Location.LOCAL)
        if location == Location.CLOUD_STORAGE:
            file_name = request.query_params.get("filename", "")
            return import_func(
                request,
                queue_name=settings.CVAT_QUEUES.IMPORT_DATA.value,
                filename=file_name,
            )
        return self.upload_data(request)


class PartialUpdateModelMixin:
    """
    Update fields of a model instance.

    Almost the same as UpdateModelMixin, but has no public PUT / update() method.
    """

    def _update(self, request, *args, **kwargs):
        # This method must not be named "update" not to be matched with the PUT method
        return mixins.UpdateModelMixin.update(self, request, *args, **kwargs)

    def perform_update(self, serializer):
        mixins.UpdateModelMixin.perform_update(self, serializer=serializer)

    def partial_update(self, request, *args, **kwargs):
        with mock.patch.object(self, 'update', new=self._update, create=True):
            return mixins.UpdateModelMixin.partial_update(self, request=request, *args, **kwargs)
back to top