Revision 0535d452dd426ba62d6d312766906ddd1230c7eb authored by Andrey Zhavoronkov on 02 November 2023, 15:59:39 UTC, committed by GitHub on 02 November 2023, 15:59:39 UTC
This PR speeds up the preparation of chunks by: 
1. loading images once instead of twice in each writer,
2. as well as by allowing simultaneous preparation of more than 1 chunk
using multithreading.
This allows to reduce the time for preparation of chunks for 4895 images
from 0:04:36 to 0:01:20 in case of preparation of 3 chunks in parallel
and 0:02:46 in case of 1 chunk in my environment.

Co-authored-by: Maria Khrustaleva <maya17grd@gmail.com>
1 parent 1f8d5d3
Raw File
frames.ts
// Copyright (C) 2021-2022 Intel Corporation
// Copyright (C) 2022-2023 CVAT.ai Corporation
//
// SPDX-License-Identifier: MIT

import _ from 'lodash';
import {
    FrameDecoder, BlockType, DimensionType, ChunkQuality, decodeContextImages, RequestOutdatedError,
} from 'cvat-data';
import PluginRegistry from './plugins';
import serverProxy, { RawFramesMetaData } from './server-proxy';
import { Exception, ArgumentError, DataError } from './exceptions';

// frame storage by job id
const frameDataCache: Record<string, {
    meta: Omit<RawFramesMetaData, 'deleted_frames'> & { deleted_frames: Record<number, boolean> };
    chunkSize: number;
    mode: 'annotation' | 'interpolation';
    startFrame: number;
    stopFrame: number;
    decodeForward: boolean;
    forwardStep: number;
    latestFrameDecodeRequest: number | null;
    latestContextImagesRequest: number | null;
    provider: FrameDecoder;
    prefetchAnalizer: PrefetchAnalyzer;
    decodedBlocksCacheSize: number;
    activeChunkRequest: Promise<void> | null;
    activeContextRequest: Promise<Record<number, ImageBitmap>> | null;
    contextCache: Record<number, {
        data: Record<number, ImageBitmap>;
        timestamp: number;
        size: number;
    }>;
    getChunk: (chunkNumber: number, quality: ChunkQuality) => Promise<ArrayBuffer>;
}> = {};

export class FramesMetaData {
    public chunkSize: number;
    public deletedFrames: number[];
    public includedFrames: number[];
    public frameFilter: string;
    public frames: {
        width: number;
        height: number;
        name: string;
        related_files: number;
    }[];
    public imageQuality: number;
    public size: number;
    public startFrame: number;
    public stopFrame: number;

    constructor(initialData: RawFramesMetaData) {
        const data: RawFramesMetaData = {
            chunk_size: undefined,
            deleted_frames: [],
            included_frames: [],
            frame_filter: undefined,
            frames: [],
            image_quality: undefined,
            size: undefined,
            start_frame: undefined,
            stop_frame: undefined,
        };

        for (const property in data) {
            if (Object.prototype.hasOwnProperty.call(data, property) && property in initialData) {
                data[property] = initialData[property];
            }
        }

        Object.defineProperties(
            this,
            Object.freeze({
                chunkSize: {
                    get: () => data.chunk_size,
                },
                deletedFrames: {
                    get: () => data.deleted_frames,
                },
                includedFrames: {
                    get: () => data.included_frames,
                },
                frameFilter: {
                    get: () => data.frame_filter,
                },
                frames: {
                    get: () => data.frames,
                },
                imageQuality: {
                    get: () => data.image_quality,
                },
                size: {
                    get: () => data.size,
                },
                startFrame: {
                    get: () => data.start_frame,
                },
                stopFrame: {
                    get: () => data.stop_frame,
                },
            }),
        );
    }
}

export class FrameData {
    public readonly filename: string;
    public readonly width: number;
    public readonly height: number;
    public readonly number: number;
    public readonly relatedFiles: number;
    public readonly deleted: boolean;
    public readonly jobID: number;

    constructor({
        width,
        height,
        name,
        jobID,
        frameNumber,
        deleted,
        related_files: relatedFiles,
    }) {
        Object.defineProperties(
            this,
            Object.freeze({
                filename: {
                    value: name,
                    writable: false,
                },
                width: {
                    value: width,
                    writable: false,
                },
                height: {
                    value: height,
                    writable: false,
                },
                jobID: {
                    value: jobID,
                    writable: false,
                },
                number: {
                    value: frameNumber,
                    writable: false,
                },
                relatedFiles: {
                    value: relatedFiles,
                    writable: false,
                },
                deleted: {
                    value: deleted,
                    writable: false,
                },
            }),
        );
    }

    async data(onServerRequest = () => {}): Promise<ImageBitmap | Blob> {
        const result = await PluginRegistry.apiWrapper.call(this, FrameData.prototype.data, onServerRequest);
        return result;
    }
}

class PrefetchAnalyzer {
    #chunkSize: number;
    #requestedFrames: number[];

    constructor(chunkSize) {
        this.#chunkSize = chunkSize;
        this.#requestedFrames = [];
    }

    shouldPrefetchNext(current: number, isPlaying: boolean, isChunkCached: (chunk) => boolean): boolean {
        if (isPlaying) {
            return true;
        }

        const currentChunk = Math.floor(current / this.#chunkSize);
        const { length } = this.#requestedFrames;
        const isIncreasingOrder = this.#requestedFrames
            .every((val, index) => index === 0 || val > this.#requestedFrames[index - 1]);
        if (
            length && (isIncreasingOrder && current > this.#requestedFrames[length - 1]) &&
            (current % this.#chunkSize) >= Math.ceil(this.#chunkSize / 2) &&
            !isChunkCached(currentChunk + 1)
        ) {
            // is increasing order including the current frame
            // if current is in the middle+ of the chunk
            // if the next chunk was not cached yet
            return true;
        }

        return false;
    }

    addRequested(frame: number): void {
        // latest requested frame is bubbling (array is unique)
        const indexOf = this.#requestedFrames.indexOf(frame);
        if (indexOf !== -1) {
            this.#requestedFrames.splice(indexOf, 1);
        }

        this.#requestedFrames.push(frame);

        // only half of chunk size is considered in this logic
        const limit = Math.ceil(this.#chunkSize / 2);
        if (this.#requestedFrames.length > limit) {
            this.#requestedFrames.shift();
        }
    }
}

Object.defineProperty(FrameData.prototype.data, 'implementation', {
    value(this: FrameData, onServerRequest) {
        return new Promise<{
            renderWidth: number;
            renderHeight: number;
            imageData: ImageBitmap | Blob;
        } | Blob>((resolve, reject) => {
            const {
                provider, prefetchAnalizer, chunkSize, stopFrame, decodeForward, forwardStep, decodedBlocksCacheSize,
            } = frameDataCache[this.jobID];

            const requestId = +_.uniqueId();
            const chunkNumber = Math.floor(this.number / chunkSize);
            const frame = provider.frame(this.number);

            function findTheNextNotDecodedChunk(searchFrom: number): number {
                let firstFrameInNextChunk = searchFrom + forwardStep;
                let nextChunkNumber = Math.floor(firstFrameInNextChunk / chunkSize);
                while (nextChunkNumber === chunkNumber) {
                    firstFrameInNextChunk += forwardStep;
                    nextChunkNumber = Math.floor(firstFrameInNextChunk / chunkSize);
                }

                if (provider.isChunkCached(nextChunkNumber)) {
                    return findTheNextNotDecodedChunk(firstFrameInNextChunk);
                }

                return nextChunkNumber;
            }

            if (frame) {
                if (
                    prefetchAnalizer.shouldPrefetchNext(
                        this.number,
                        decodeForward,
                        (chunk) => provider.isChunkCached(chunk),
                    ) && decodedBlocksCacheSize > 1 && !frameDataCache[this.jobID].activeChunkRequest
                ) {
                    const nextChunkNumber = findTheNextNotDecodedChunk(this.number);
                    const predecodeChunksMax = Math.floor(decodedBlocksCacheSize / 2);
                    if (nextChunkNumber * chunkSize <= stopFrame &&
                        nextChunkNumber <= chunkNumber + predecodeChunksMax) {
                        provider.cleanup(1);
                        frameDataCache[this.jobID].activeChunkRequest = new Promise((resolveForward) => {
                            const releasePromise = (): void => {
                                resolveForward();
                                frameDataCache[this.jobID].activeChunkRequest = null;
                            };

                            frameDataCache[this.jobID].getChunk(
                                nextChunkNumber, ChunkQuality.COMPRESSED,
                            ).then((chunk: ArrayBuffer) => {
                                provider.requestDecodeBlock(
                                    chunk,
                                    nextChunkNumber * chunkSize,
                                    Math.min(stopFrame, (nextChunkNumber + 1) * chunkSize - 1),
                                    () => {},
                                    releasePromise,
                                    releasePromise,
                                );
                            }).catch(() => {
                                releasePromise();
                            });
                        });
                    }
                }

                resolve({
                    renderWidth: this.width,
                    renderHeight: this.height,
                    imageData: frame,
                });
                prefetchAnalizer.addRequested(this.number);
                return;
            }

            onServerRequest();
            frameDataCache[this.jobID].latestFrameDecodeRequest = requestId;
            (frameDataCache[this.jobID].activeChunkRequest || Promise.resolve()).finally(() => {
                if (frameDataCache[this.jobID].latestFrameDecodeRequest !== requestId) {
                    // not relevant request anymore
                    reject(this.number);
                    return;
                }

                // it might appear during decoding, so, check again
                const currentFrame = provider.frame(this.number);
                if (currentFrame) {
                    resolve({
                        renderWidth: this.width,
                        renderHeight: this.height,
                        imageData: currentFrame,
                    });
                    prefetchAnalizer.addRequested(this.number);
                    return;
                }

                frameDataCache[this.jobID].activeChunkRequest = new Promise<void>((
                    resolveLoadAndDecode,
                ) => {
                    let wasResolved = false;
                    frameDataCache[this.jobID].getChunk(
                        chunkNumber, ChunkQuality.COMPRESSED,
                    ).then((chunk: ArrayBuffer) => {
                        try {
                            provider
                                .requestDecodeBlock(
                                    chunk,
                                    chunkNumber * chunkSize,
                                    Math.min(stopFrame, (chunkNumber + 1) * chunkSize - 1),
                                    (_frame: number, bitmap: ImageBitmap | Blob) => {
                                        if (decodeForward) {
                                            // resolve immediately only if is not playing
                                            return;
                                        }

                                        if (frameDataCache[this.jobID].latestFrameDecodeRequest === requestId &&
                                            this.number === _frame
                                        ) {
                                            wasResolved = true;
                                            resolve({
                                                renderWidth: this.width,
                                                renderHeight: this.height,
                                                imageData: bitmap,
                                            });
                                            prefetchAnalizer.addRequested(this.number);
                                        }
                                    }, () => {
                                        frameDataCache[this.jobID].activeChunkRequest = null;
                                        resolveLoadAndDecode();
                                        const decodedFrame = provider.frame(this.number);
                                        if (decodeForward) {
                                            // resolve after decoding everything if playing
                                            resolve({
                                                renderWidth: this.width,
                                                renderHeight: this.height,
                                                imageData: decodedFrame,
                                            });
                                        } else if (!wasResolved) {
                                            reject(this.number);
                                        }
                                    }, (error: Error | RequestOutdatedError) => {
                                        frameDataCache[this.jobID].activeChunkRequest = null;
                                        resolveLoadAndDecode();
                                        if (error instanceof RequestOutdatedError) {
                                            reject(this.number);
                                        } else {
                                            reject(error);
                                        }
                                    },
                                );
                        } catch (error) {
                            reject(error);
                        }
                    }).catch((error) => {
                        reject(error);
                        resolveLoadAndDecode(error);
                    });
                });
            });
        });
    },
    writable: false,
});

function getFrameMeta(jobID, frame): RawFramesMetaData['frames'][0] {
    const { meta, mode, startFrame } = frameDataCache[jobID];
    let frameMeta = null;
    if (mode === 'interpolation' && meta.frames.length === 1) {
        // video tasks have 1 frame info, but image tasks will have many infos
        [frameMeta] = meta.frames;
    } else if (mode === 'annotation' || (mode === 'interpolation' && meta.frames.length > 1)) {
        if (frame > meta.stop_frame) {
            throw new ArgumentError(`Meta information about frame ${frame} can't be received from the server`);
        }
        frameMeta = meta.frames[frame - startFrame];
    } else {
        throw new DataError(`Invalid mode is specified ${mode}`);
    }

    return frameMeta;
}

export function getContextImage(jobID: number, frame: number): Promise<Record<string, ImageBitmap>> {
    return new Promise<Record<string, ImageBitmap>>((resolve, reject) => {
        if (!(jobID in frameDataCache)) {
            reject(new Error(
                'Frame data was not initialized for this job. Try first requesting any frame.',
            ));
        }
        const frameData = frameDataCache[jobID];
        const requestId = frame;
        const { startFrame } = frameData;
        const { related_files: relatedFiles } = frameData.meta.frames[frame - startFrame];

        if (relatedFiles === 0) {
            resolve({});
        } else if (frame in frameData.contextCache) {
            resolve(frameData.contextCache[frame].data);
        } else {
            frameData.latestContextImagesRequest = requestId;
            const executor = (): void => {
                if (frameData.latestContextImagesRequest !== requestId) {
                    reject(frame);
                } else if (frame in frameData.contextCache) {
                    resolve(frameData.contextCache[frame].data);
                } else {
                    frameData.activeContextRequest = serverProxy.frames.getImageContext(jobID, frame)
                        .then((encodedImages) => decodeContextImages(encodedImages, 0, relatedFiles));
                    frameData.activeContextRequest.then((images) => {
                        const size = Object.values(images)
                            .reduce((acc, image) => acc + image.width * image.height * 4, 0);
                        const totalSize = Object.values(frameData.contextCache)
                            .reduce((acc, item) => acc + item.size, 0);
                        if (totalSize > 512 * 1024 * 1024) {
                            const [leastTimestampFrame] = Object.entries(frameData.contextCache)
                                .sort(([, item1], [, item2]) => item1.timestamp - item2.timestamp)[0];
                            delete frameData.contextCache[leastTimestampFrame];
                        }

                        frameData.contextCache[frame] = {
                            data: images,
                            timestamp: Date.now(),
                            size,
                        };

                        if (frameData.latestContextImagesRequest !== requestId) {
                            reject(frame);
                        } else {
                            resolve(images);
                        }
                    }).finally(() => {
                        frameData.activeContextRequest = null;
                    });
                }
            };

            if (!frameData.activeContextRequest) {
                executor();
            } else {
                const checkAndExecute = (): void => {
                    if (frameData.activeContextRequest) {
                        // if we just execute in finally
                        // it might raise multiple server requests for context images
                        // if the promise was pending before and several requests came for the same frame
                        // all these requests will stuck on "finally"
                        // and when the promise fullfilled, it will run all the microtasks
                        // since they all have the same request id, all they will perform in executor()
                        frameData.activeContextRequest.finally(() => setTimeout(checkAndExecute));
                    } else {
                        executor();
                    }
                };

                setTimeout(checkAndExecute);
            }
        }
    });
}

export function decodePreview(preview: Blob): Promise<string> {
    return new Promise((resolve, reject) => {
        const reader = new FileReader();
        reader.onload = () => {
            resolve(reader.result as string);
        };
        reader.onerror = (error) => {
            reject(error);
        };
        reader.readAsDataURL(preview);
    });
}

export async function getFrame(
    jobID: number,
    chunkSize: number,
    chunkType: 'video' | 'imageset',
    mode: 'interpolation' | 'annotation', // todo: obsolete, need to remove
    frame: number,
    startFrame: number,
    stopFrame: number,
    isPlaying: boolean,
    step: number,
    dimension: DimensionType,
    getChunk: (chunkNumber: number, quality: ChunkQuality) => Promise<ArrayBuffer>,
): Promise<FrameData> {
    if (!(jobID in frameDataCache)) {
        const blockType = chunkType === 'video' ? BlockType.MP4VIDEO : BlockType.ARCHIVE;
        const meta = await serverProxy.frames.getMeta('job', jobID);
        const updatedMeta = {
            ...meta,
            deleted_frames: Object.fromEntries(meta.deleted_frames.map((_frame) => [_frame, true])),
        };
        const mean = updatedMeta.frames.reduce((a, b) => a + b.width * b.height, 0) / updatedMeta.frames.length;
        const stdDev = Math.sqrt(
            updatedMeta.frames.map((x) => (x.width * x.height - mean) ** 2).reduce((a, b) => a + b) /
            updatedMeta.frames.length,
        );

        // limit of decoded frames cache by 2GB
        const decodedBlocksCacheSize = Math.min(
            Math.floor((2048 * 1024 * 1024) / ((mean + stdDev) * 4 * chunkSize)) || 1, 10,
        );
        frameDataCache[jobID] = {
            meta: updatedMeta,
            chunkSize,
            mode,
            startFrame,
            stopFrame,
            decodeForward: isPlaying,
            forwardStep: step,
            provider: new FrameDecoder(
                blockType,
                chunkSize,
                decodedBlocksCacheSize,
                dimension,
            ),
            prefetchAnalizer: new PrefetchAnalyzer(chunkSize),
            decodedBlocksCacheSize,
            activeChunkRequest: null,
            activeContextRequest: null,
            latestFrameDecodeRequest: null,
            latestContextImagesRequest: null,
            contextCache: {},
            getChunk,
        };
    }

    const frameMeta = getFrameMeta(jobID, frame);
    frameDataCache[jobID].provider.setRenderSize(frameMeta.width, frameMeta.height);
    frameDataCache[jobID].decodeForward = isPlaying;
    frameDataCache[jobID].forwardStep = step;

    return new FrameData({
        width: frameMeta.width,
        height: frameMeta.height,
        name: frameMeta.name,
        related_files: frameMeta.related_files,
        frameNumber: frame,
        deleted: frame in frameDataCache[jobID].meta.deleted_frames,
        jobID,
    });
}

export async function getDeletedFrames(instanceType: 'job' | 'task', id) {
    if (instanceType === 'job') {
        const { meta } = frameDataCache[id];
        return meta.deleted_frames;
    }

    if (instanceType === 'task') {
        const meta = await serverProxy.frames.getMeta('task', id);
        meta.deleted_frames = Object.fromEntries(meta.deleted_frames.map((_frame) => [_frame, true]));
        return meta;
    }

    throw new Exception(`getDeletedFrames is not implemented for ${instanceType}`);
}

export function deleteFrame(jobID: number, frame: number): void {
    const { meta } = frameDataCache[jobID];
    meta.deleted_frames[frame] = true;
}

export function restoreFrame(jobID: number, frame: number): void {
    const { meta } = frameDataCache[jobID];
    if (frame in meta.deleted_frames) {
        delete meta.deleted_frames[frame];
    }
}

export async function patchMeta(jobID: number): Promise<void> {
    const { meta } = frameDataCache[jobID];
    const newMeta = await serverProxy.frames.saveMeta('job', jobID, {
        deleted_frames: Object.keys(meta.deleted_frames),
    });
    const prevDeletedFrames = meta.deleted_frames;

    // it is important do not overwrite the object, it is why we working on keys in two loops below
    for (const frame of Object.keys(prevDeletedFrames)) {
        delete prevDeletedFrames[frame];
    }
    for (const frame of newMeta.deleted_frames) {
        prevDeletedFrames[frame] = true;
    }

    frameDataCache[jobID].meta = newMeta;
    frameDataCache[jobID].meta.deleted_frames = prevDeletedFrames;
}

export async function findFrame(
    jobID: number, frameFrom: number, frameTo: number, filters: { offset?: number, notDeleted: boolean },
): Promise<number | null> {
    const offset = filters.offset || 1;
    let meta;
    if (!frameDataCache[jobID]) {
        meta = await serverProxy.frames.getMeta('job', jobID);
    } else {
        meta = frameDataCache[jobID].meta;
    }

    const sign = Math.sign(frameTo - frameFrom);
    const predicate = sign > 0 ? (frame) => frame <= frameTo : (frame) => frame >= frameTo;
    const update = sign > 0 ? (frame) => frame + 1 : (frame) => frame - 1;
    let framesCounter = 0;
    let lastUndeletedFrame = null;
    const check = (frame): boolean => {
        if (meta.included_frames) {
            return (meta.included_frames.includes(frame)) &&
            (!filters.notDeleted || !(frame in meta.deleted_frames));
        }
        if (filters.notDeleted) {
            return !(frame in meta.deleted_frames);
        }
        return true;
    };
    for (let frame = frameFrom; predicate(frame); frame = update(frame)) {
        if (check(frame)) {
            lastUndeletedFrame = frame;
            framesCounter++;
            if (framesCounter === offset) {
                return lastUndeletedFrame;
            }
        }
    }

    return lastUndeletedFrame;
}

export function getCachedChunks(jobID): number[] {
    if (!(jobID in frameDataCache)) {
        return [];
    }

    return frameDataCache[jobID].provider.cachedChunks(true);
}

export function clear(jobID: number): void {
    if (jobID in frameDataCache) {
        delete frameDataCache[jobID];
    }
}
back to top