Revision 8810125fd84828d4283032103bbc79d1fb112aee authored by Andrey Zhavoronkov on 14 March 2022, 11:32:18 UTC, committed by GitHub on 14 March 2022, 11:32:18 UTC
1 parent fc520a3
Raw File
lambda-manager.js
// Copyright (C) 2019-2021 Intel Corporation
//
// SPDX-License-Identifier: MIT

const serverProxy = require('./server-proxy');
const { ArgumentError } = require('./exceptions');
const MLModel = require('./ml-model');
const { RQStatus } = require('./enums');

class LambdaManager {
    constructor() {
        this.listening = {};
        this.cachedList = null;
    }

    async list() {
        if (Array.isArray(this.cachedList)) {
            return [...this.cachedList];
        }

        const result = await serverProxy.lambda.list();
        const models = [];

        for (const model of result) {
            models.push(
                new MLModel({
                    ...model,
                    type: model.kind,
                }),
            );
        }

        this.cachedList = models;
        return models;
    }

    async run(taskID, model, args) {
        if (!Number.isInteger(taskID) || taskID < 0) {
            throw new ArgumentError(`Argument taskID must be a positive integer. Got "${taskID}"`);
        }

        if (!(model instanceof MLModel)) {
            throw new ArgumentError(
                `Argument model is expected to be an instance of MLModel class, but got ${typeof model}`,
            );
        }

        if (args && typeof args !== 'object') {
            throw new ArgumentError(`Argument args is expected to be an object, but got ${typeof model}`);
        }

        const body = {
            ...args,
            task: taskID,
            function: model.id,
        };

        const result = await serverProxy.lambda.run(body);
        return result.id;
    }

    async call(taskID, model, args) {
        if (!Number.isInteger(taskID) || taskID < 0) {
            throw new ArgumentError(`Argument taskID must be a positive integer. Got "${taskID}"`);
        }

        const body = {
            ...args,
            task: taskID,
        };

        const result = await serverProxy.lambda.call(model.id, body);
        return result;
    }

    async requests() {
        const result = await serverProxy.lambda.requests();
        return result.filter((request) => ['queued', 'started'].includes(request.status));
    }

    async cancel(requestID) {
        if (typeof requestID !== 'string') {
            throw new ArgumentError(`Request id argument is required to be a string. But got ${requestID}`);
        }

        if (this.listening[requestID]) {
            clearTimeout(this.listening[requestID].timeout);
            delete this.listening[requestID];
        }
        await serverProxy.lambda.cancel(requestID);
    }

    async listen(requestID, onUpdate) {
        const timeoutCallback = async () => {
            try {
                this.listening[requestID].timeout = null;
                const response = await serverProxy.lambda.status(requestID);

                if (response.status === RQStatus.QUEUED || response.status === RQStatus.STARTED) {
                    onUpdate(response.status, response.progress || 0);
                    this.listening[requestID].timeout = setTimeout(timeoutCallback, 2000);
                } else {
                    if (response.status === RQStatus.FINISHED) {
                        onUpdate(response.status, response.progress || 100);
                    } else {
                        onUpdate(response.status, response.progress || 0, response.exc_info || '');
                    }

                    delete this.listening[requestID];
                }
            } catch (error) {
                onUpdate(
                    RQStatus.UNKNOWN,
                    0,
                    `Could not get a status of the request ${requestID}. ${error.toString()}`,
                );
            }
        };

        this.listening[requestID] = {
            onUpdate,
            timeout: setTimeout(timeoutCallback, 2000),
        };
    }
}

module.exports = new LambdaManager();
back to top