Revision bbdc4c3fd60c1febcb663f3b6cbfd90febeb23a2 authored by Andrey Zhavoronkov on 29 June 2023, 11:23:12 UTC, committed by Andrey Zhavoronkov on 29 June 2023, 11:23:12 UTC
1 parent 75c1e6f
Raw File
test_tasks.py
# Copyright (C) 2022-2023 CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT

import io
import json
import os.path as osp
import zipfile
from logging import Logger
from pathlib import Path
from typing import Tuple

import pytest
from cvat_sdk import Client, models
from cvat_sdk.api_client import exceptions
from cvat_sdk.core.proxies.tasks import ResourceType, Task
from cvat_sdk.core.uploading import Uploader, _MyTusUploader
from PIL import Image

from shared.utils.helpers import generate_image_files

from .util import make_pbar


class TestTaskUsecases:
    @pytest.fixture(autouse=True)
    def setup(
        self,
        tmp_path: Path,
        fxt_login: Tuple[Client, str],
        fxt_logger: Tuple[Logger, io.StringIO],
        fxt_stdout: io.StringIO,
    ):
        self.tmp_path = tmp_path
        logger, self.logger_stream = fxt_logger
        self.stdout = fxt_stdout
        self.client, self.user = fxt_login
        self.client.logger = logger

        api_client = self.client.api_client
        for k in api_client.configuration.logger:
            api_client.configuration.logger[k] = logger

        yield

    @pytest.fixture
    def fxt_backup_file(self, fxt_new_task: Task, fxt_coco_file: str):
        backup_path = self.tmp_path / "backup.zip"

        fxt_new_task.import_annotations("COCO 1.0", filename=fxt_coco_file)
        fxt_new_task.download_backup(backup_path)

        yield backup_path

    @pytest.fixture
    def fxt_new_task(self, fxt_image_file: Path):
        task = self.client.tasks.create_from_data(
            spec={
                "name": "test_task",
                "labels": [{"name": "car"}, {"name": "person"}],
            },
            resources=[fxt_image_file],
            data_params={"image_quality": 80},
        )

        return task

    @pytest.fixture
    def fxt_new_task_without_data(self):
        task = self.client.tasks.create(
            spec={
                "name": "test_task",
                "labels": [{"name": "car"}, {"name": "person"}],
            },
        )

        return task

    @pytest.fixture
    def fxt_task_with_shapes(self, fxt_new_task: Task):
        labels = fxt_new_task.get_labels()
        fxt_new_task.set_annotations(
            models.LabeledDataRequest(
                shapes=[
                    models.LabeledShapeRequest(
                        frame=0,
                        label_id=labels[0].id,
                        type="rectangle",
                        points=[1, 1, 2, 2],
                    ),
                ],
            )
        )

        return fxt_new_task

    def test_can_create_task_with_local_data(self):
        pbar_out = io.StringIO()
        pbar = make_pbar(file=pbar_out)

        task_spec = {
            "name": f"test {self.user} to create a task with local data",
            "labels": [
                {
                    "name": "car",
                    "color": "#ff00ff",
                    "attributes": [
                        {
                            "name": "a",
                            "mutable": True,
                            "input_type": "number",
                            "default_value": "5",
                            "values": ["4", "5", "6"],
                        }
                    ],
                }
            ],
        }

        data_params = {
            "image_quality": 75,
        }

        task_files = generate_image_files(7)
        for i, f in enumerate(task_files):
            fname = self.tmp_path / f.name
            fname.write_bytes(f.getvalue())
            task_files[i] = fname

        task = self.client.tasks.create_from_data(
            spec=task_spec,
            data_params=data_params,
            resources=task_files,
            pbar=pbar,
        )

        assert task.size == 7
        assert "100%" in pbar_out.getvalue().strip("\r").split("\r")[-1]
        assert self.stdout.getvalue() == ""

    def test_can_create_task_with_local_data_and_predefined_sorting(
        self, fxt_new_task_without_data: Task
    ):
        task = fxt_new_task_without_data

        task_files = generate_image_files(6)
        task_filenames = []
        for f in task_files:
            fname = self.tmp_path / osp.basename(f.name)
            fname.write_bytes(f.getvalue())
            task_filenames.append(fname)

        task_filenames = [task_filenames[i] for i in [2, 4, 1, 5, 0, 3]]

        task.upload_data(
            resources=task_filenames,
            params={"sorting_method": "predefined"},
        )

        assert [f.name for f in task.get_frames_info()] == [f.name for f in task_filenames]

    def test_can_create_task_with_remote_data(self):
        task = self.client.tasks.create_from_data(
            spec={
                "name": "test_task",
                "labels": [{"name": "car"}, {"name": "person"}],
            },
            resource_type=ResourceType.SHARE,
            resources=["images/image_1.jpg", "images/image_2.jpg"],
            # make sure string fields are transferred correctly;
            # see https://github.com/opencv/cvat/issues/4962
            data_params={"sorting_method": "lexicographical"},
        )

        assert task.size == 2
        assert task.get_frames_info()[0].name == "images/image_1.jpg"
        assert self.stdout.getvalue() == ""

    def test_cant_create_task_with_no_data(self):
        pbar_out = io.StringIO()
        pbar = make_pbar(file=pbar_out)

        task_spec = {
            "name": f"test {self.user} to create a task with no data",
            "labels": [
                {
                    "name": "car",
                }
            ],
        }

        with pytest.raises(exceptions.ApiException) as capture:
            self.client.tasks.create_from_data(
                spec=task_spec,
                resource_type=ResourceType.LOCAL,
                resources=[],
                pbar=pbar,
            )

        assert capture.match("No media data found")
        assert self.stdout.getvalue() == ""

    @pytest.mark.with_external_services
    def test_can_create_task_with_git_repo(self, fxt_image_file: Path):
        pbar_out = io.StringIO()
        pbar = make_pbar(file=pbar_out)

        task_spec = {
            "name": f"task with Git repo",
            "labels": [{"name": "car"}],
        }

        repository_url = "root@gitserver:repos/repo.git [annotations/annot.zip]"

        task = self.client.tasks.create_from_data(
            spec=task_spec,
            resource_type=ResourceType.LOCAL,
            resources=[fxt_image_file],
            pbar=pbar,
            dataset_repository_url=repository_url,
        )

        assert task.size == 1
        assert "100%" in pbar_out.getvalue().strip("\r").split("\r")[-1]
        assert self.stdout.getvalue() == ""

        git_get_response = self.client.api_client.rest_client.GET(
            self.client.api_map.git_get(task.id),
            headers=self.client.api_client.get_common_headers(),
        )

        response_json = json.loads(git_get_response.data)
        assert response_json["url"]["value"] == repository_url
        assert response_json["format"] == "CVAT for images 1.1"
        assert response_json["lfs"] is False

    def test_can_upload_data_to_empty_task(self):
        pbar_out = io.StringIO()
        pbar = make_pbar(file=pbar_out)

        task = self.client.tasks.create(
            {
                "name": f"test task",
                "labels": [{"name": "car"}],
            }
        )

        data_params = {
            "image_quality": 75,
        }

        task_files = generate_image_files(7)
        for i, f in enumerate(task_files):
            fname = self.tmp_path / f.name
            fname.write_bytes(f.getvalue())
            task_files[i] = fname

        task.upload_data(
            resources=task_files,
            resource_type=ResourceType.LOCAL,
            params=data_params,
            pbar=pbar,
        )

        assert task.size == 7
        assert "100%" in pbar_out.getvalue().strip("\r").split("\r")[-1]
        assert self.stdout.getvalue() == ""

    def test_can_retrieve_task(self, fxt_new_task: Task):
        task_id = fxt_new_task.id

        task = self.client.tasks.retrieve(task_id)

        assert task.id == task_id
        assert self.stdout.getvalue() == ""

    def test_can_list_tasks(self, fxt_new_task: Task):
        task_id = fxt_new_task.id

        tasks = self.client.tasks.list()

        assert any(t.id == task_id for t in tasks)
        assert self.stdout.getvalue() == ""

    def test_can_update_task(self, fxt_new_task: Task):
        fxt_new_task.update(models.PatchedTaskWriteRequest(name="foo"))

        retrieved_task = self.client.tasks.retrieve(fxt_new_task.id)
        assert retrieved_task.name == "foo"
        assert fxt_new_task.name == retrieved_task.name
        assert self.stdout.getvalue() == ""

    def test_can_delete_task(self, fxt_new_task: Task):
        fxt_new_task.remove()

        with pytest.raises(exceptions.NotFoundException):
            fxt_new_task.fetch()
        assert self.stdout.getvalue() == ""

    def test_can_delete_tasks_by_ids(self, fxt_new_task: Task):
        task_id = fxt_new_task.id
        old_tasks = self.client.tasks.list()

        self.client.tasks.remove_by_ids([task_id])

        new_tasks = self.client.tasks.list()
        assert any(t.id == task_id for t in old_tasks)
        assert all(t.id != task_id for t in new_tasks)
        assert self.logger_stream.getvalue(), f".*Task ID {task_id} deleted.*"
        assert self.stdout.getvalue() == ""

    @pytest.mark.parametrize("include_images", (True, False))
    def test_can_download_dataset(self, fxt_new_task: Task, include_images: bool):
        pbar_out = io.StringIO()
        pbar = make_pbar(file=pbar_out)

        task_id = fxt_new_task.id
        path = self.tmp_path / f"task_{task_id}-cvat.zip"
        task = self.client.tasks.retrieve(task_id)
        task.export_dataset(
            format_name="CVAT for images 1.1",
            filename=path,
            pbar=pbar,
            include_images=include_images,
        )

        assert "100%" in pbar_out.getvalue().strip("\r").split("\r")[-1]
        assert path.is_file()
        assert self.stdout.getvalue() == ""

    def test_can_download_backup(self, fxt_new_task: Task):
        pbar_out = io.StringIO()
        pbar = make_pbar(file=pbar_out)

        task_id = fxt_new_task.id
        path = self.tmp_path / f"task_{task_id}-backup.zip"
        task = self.client.tasks.retrieve(task_id)
        task.download_backup(filename=path, pbar=pbar)

        assert "100%" in pbar_out.getvalue().strip("\r").split("\r")[-1]
        assert path.is_file()
        assert self.stdout.getvalue() == ""

    def test_can_download_preview(self, fxt_new_task: Task):
        frame_encoded = fxt_new_task.get_preview()
        (width, height) = Image.open(frame_encoded).size

        assert width > 0 and height > 0
        assert self.stdout.getvalue() == ""

    @pytest.mark.parametrize("quality", ("compressed", "original"))
    def test_can_download_frame(self, fxt_new_task: Task, quality: str):
        frame_encoded = fxt_new_task.get_frame(0, quality=quality)
        (width, height) = Image.open(frame_encoded).size

        assert width > 0 and height > 0
        assert self.stdout.getvalue() == ""

    @pytest.mark.parametrize("quality", ("compressed", "original"))
    def test_can_download_frames(self, fxt_new_task: Task, quality: str):
        fxt_new_task.download_frames(
            [0],
            quality=quality,
            outdir=self.tmp_path,
            filename_pattern="frame-{frame_id}{frame_ext}",
        )

        assert (self.tmp_path / "frame-0.jpg").is_file()
        assert self.stdout.getvalue() == ""

    @pytest.mark.parametrize("quality", ("compressed", "original"))
    def test_can_download_chunk(self, fxt_new_task: Task, quality: str):
        chunk_path = self.tmp_path / "chunk.zip"

        with open(chunk_path, "wb") as chunk_file:
            fxt_new_task.download_chunk(0, chunk_file, quality=quality)

        with zipfile.ZipFile(chunk_path, "r") as chunk_zip:
            assert chunk_zip.testzip() is None
            assert len(chunk_zip.infolist()) == 1
        assert self.stdout.getvalue() == ""

    def test_can_upload_annotations(self, fxt_new_task: Task, fxt_coco_file: Path):
        pbar_out = io.StringIO()
        pbar = make_pbar(file=pbar_out)

        fxt_new_task.import_annotations(format_name="COCO 1.0", filename=fxt_coco_file, pbar=pbar)

        assert "uploaded" in self.logger_stream.getvalue()
        assert "100%" in pbar_out.getvalue().strip("\r").split("\r")[-1]
        assert self.stdout.getvalue() == ""

    def _test_can_create_from_backup(self, fxt_new_task: Task, fxt_backup_file: Path):
        pbar_out = io.StringIO()
        pbar = make_pbar(file=pbar_out)

        task = self.client.tasks.create_from_backup(fxt_backup_file, pbar=pbar)

        assert task.id
        assert task.id != fxt_new_task.id
        assert task.size == fxt_new_task.size
        assert "imported successfully" in self.logger_stream.getvalue()
        assert "100%" in pbar_out.getvalue().strip("\r").split("\r")[-1]
        assert self.stdout.getvalue() == ""

    def test_can_create_from_backup(self, fxt_new_task: Task, fxt_backup_file: Path):
        self._test_can_create_from_backup(fxt_new_task, fxt_backup_file)

    def test_can_create_from_backup_in_chunks(
        self, monkeypatch: pytest.MonkeyPatch, fxt_new_task: Task, fxt_backup_file: Path
    ):
        monkeypatch.setattr(Uploader, "_CHUNK_SIZE", 100)

        num_requests = 0
        original_do_request = _MyTusUploader._do_request

        def counting_do_request(uploader):
            nonlocal num_requests
            num_requests += 1
            original_do_request(uploader)

        monkeypatch.setattr(_MyTusUploader, "_do_request", counting_do_request)

        self._test_can_create_from_backup(fxt_new_task, fxt_backup_file)

        # make sure the upload was actually chunked
        assert num_requests > 1

    def test_can_get_labels(self, fxt_new_task: Task):
        expected_labels = {"car", "person"}

        received_labels = fxt_new_task.get_labels()

        assert {obj.name for obj in received_labels} == expected_labels
        assert self.stdout.getvalue() == ""

    def test_can_get_jobs(self, fxt_new_task: Task):
        jobs = fxt_new_task.get_jobs()

        assert len(jobs) != 0
        assert self.stdout.getvalue() == ""

    def test_can_get_meta(self, fxt_new_task: Task):
        meta = fxt_new_task.get_meta()

        assert meta.image_quality == 80
        assert meta.size == 1
        assert not meta.deleted_frames
        assert self.stdout.getvalue() == ""

    def test_can_get_frame_info(self, fxt_new_task: Task):
        meta = fxt_new_task.get_meta()
        frames = fxt_new_task.get_frames_info()

        assert len(frames) == meta.size
        assert frames[0].name == "img.png"
        assert frames[0].width == 5
        assert frames[0].height == 10
        assert self.stdout.getvalue() == ""

    def test_can_remove_frames(self, fxt_new_task: Task):
        fxt_new_task.remove_frames_by_ids([0])

        meta = fxt_new_task.get_meta()
        assert meta.deleted_frames == [0]
        assert self.stdout.getvalue() == ""

    def test_can_get_annotations(self, fxt_task_with_shapes: Task):
        anns = fxt_task_with_shapes.get_annotations()

        assert len(anns.shapes) == 1
        assert anns.shapes[0].type.value == "rectangle"
        assert self.stdout.getvalue() == ""

    def test_can_set_annotations(self, fxt_new_task: Task):
        labels = fxt_new_task.get_labels()
        fxt_new_task.set_annotations(
            models.LabeledDataRequest(
                tags=[models.LabeledImageRequest(frame=0, label_id=labels[0].id)],
            )
        )

        anns = fxt_new_task.get_annotations()

        assert len(anns.tags) == 1
        assert self.stdout.getvalue() == ""

    def test_can_clear_annotations(self, fxt_task_with_shapes: Task):
        fxt_task_with_shapes.remove_annotations()

        anns = fxt_task_with_shapes.get_annotations()
        assert len(anns.tags) == 0
        assert len(anns.tracks) == 0
        assert len(anns.shapes) == 0
        assert self.stdout.getvalue() == ""

    def test_can_remove_annotations(self, fxt_new_task: Task):
        labels = fxt_new_task.get_labels()
        fxt_new_task.set_annotations(
            models.LabeledDataRequest(
                shapes=[
                    models.LabeledShapeRequest(
                        frame=0,
                        label_id=labels[0].id,
                        type="rectangle",
                        points=[1, 1, 2, 2],
                    ),
                    models.LabeledShapeRequest(
                        frame=0,
                        label_id=labels[0].id,
                        type="rectangle",
                        points=[2, 2, 3, 3],
                    ),
                ],
            )
        )
        anns = fxt_new_task.get_annotations()

        fxt_new_task.remove_annotations(ids=[anns.shapes[0].id])

        anns = fxt_new_task.get_annotations()
        assert len(anns.tags) == 0
        assert len(anns.tracks) == 0
        assert len(anns.shapes) == 1
        assert self.stdout.getvalue() == ""

    def test_can_update_annotations(self, fxt_task_with_shapes: Task):
        labels = fxt_task_with_shapes.get_labels()
        fxt_task_with_shapes.update_annotations(
            models.PatchedLabeledDataRequest(
                shapes=[
                    models.LabeledShapeRequest(
                        frame=0,
                        label_id=labels[0].id,
                        type="rectangle",
                        points=[0, 1, 2, 3],
                    ),
                ],
                tracks=[
                    models.LabeledTrackRequest(
                        frame=0,
                        label_id=labels[0].id,
                        shapes=[
                            models.TrackedShapeRequest(
                                frame=0, type="polygon", points=[3, 2, 2, 3, 3, 4]
                            ),
                        ],
                    )
                ],
                tags=[models.LabeledImageRequest(frame=0, label_id=labels[0].id)],
            )
        )

        anns = fxt_task_with_shapes.get_annotations()
        assert len(anns.shapes) == 2
        assert len(anns.tracks) == 1
        assert len(anns.tags) == 1
        assert self.stdout.getvalue() == ""
back to top