Revision 6e32868127bf0c51e49efabf8b68ad544cae9a3c authored by Maria Khrustaleva on 01 December 2023, 12:05:17 UTC, committed by GitHub on 01 December 2023, 12:05:17 UTC
This PR resolves 2 issues related to rq jobs:

1. In some scenarios it is possible to reach a situation when `X2` rq
   job depends on `X1` -> running `X1` job is moved to FailedJobRegistry
   and not deleted -> user creates one more `X1` job that is enqueued after
   `X2` execution. It was possible due to the second issue when a user
   tried to export annotations for task 1, then for task 2, and after
   restarting the worker container user tried again to export annotations
   for task 1. (cyclic dependence)

2. Looks like in rq implementation rq jobs that depend on `X` job will
   never be enqueued when `X` is moved to FailedJobRegistry due to
   AbandonedJobError. I've submitted the
   [issue](https://github.com/rq/rq/issues/2006) to the rq repository.
1 parent 81e88e1
Raw File
rqworker.py
# Copyright (C) 2018-2022 Intel Corporation
# Copyright (C) 2022-2023 CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT

import os

from rq import Worker

import cvat.utils.remote_debugger as debug


DefaultWorker = Worker


class BaseDeathPenalty:
    def __init__(self, timeout, exception, **kwargs):
        pass

    def __enter__(self):
        pass

    def __exit__(self, exc_type, exc_value, traceback):
        pass


class SimpleWorker(Worker):
    """
    Allows to work with at most 1 worker thread. Useful for debugging.
    """

    death_penalty_class = BaseDeathPenalty

    def main_work_horse(self, *args, **kwargs):
        raise NotImplementedError("Test worker does not implement this method")

    def execute_job(self, *args, **kwargs):
        """Execute job in same thread/process, do not fork()"""

        # Resolves problems with
        # django.db.utils.OperationalError: server closed the connection unexpectedly
        # errors during debugging
        # https://stackoverflow.com/questions/8242837/django-multiprocessing-and-database-connections/10684672#10684672
        from django import db
        db.connections.close_all()

        return self.perform_job(*args, **kwargs)


if debug.is_debugging_enabled():
    class RemoteDebugWorker(SimpleWorker):
        """
        Support for VS code debugger
        """

        def __init__(self, *args, **kwargs):
            self.__debugger = debug.RemoteDebugger()
            super().__init__(*args, **kwargs)

        def execute_job(self, *args, **kwargs):
            """Execute job in same thread/process, do not fork()"""
            self.__debugger.attach_current_thread()

            return super().execute_job(*args, **kwargs)

    DefaultWorker = RemoteDebugWorker


if os.environ.get("COVERAGE_PROCESS_START"):
    import coverage
    default_exit = os._exit

    def coverage_exit(*args, **kwargs):
        cov = coverage.Coverage.current()
        if cov:
            cov.stop()
            cov.save()
        default_exit(*args, **kwargs)

    os._exit = coverage_exit
back to top