Revision cbd5f433f3ff478fb26e9c09b9e3b5c74512e28d authored by Jonas Otten on 13 December 2023, 10:29:49 UTC, committed by GitHub on 13 December 2023, 10:29:49 UTC
It is not possible to attach a GCS cloud storage bucket whose name contains a dot. As stated [here](https://cloud.google.com/storage/docs/buckets#naming), GCS bucket names allow dots. This is a major problem for us, and this PR fixes this issue.

---------

Co-authored-by: Jonas Otten <jootten@uos.de>
Co-authored-by: Boris Sekachev <sekachev.bs@gmail.com>
Co-authored-by: Maria Khrustaleva <maria@cvat.ai>
1 parent 2aae952
Raw File
rq_patching.py
# Copyright (C) 2023 CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT

import traceback
from datetime import datetime
from typing import Optional

import rq.registry
from rq.exceptions import AbandonedJobError, NoSuchJobError
from rq.job import JobStatus
from rq.utils import current_timestamp
from rq.version import VERSION


# NOTE: we should patch implementation of original method because
# there is no enqueuing dependent jobs in original function
# https://github.com/rq/rq/issues/2006
# Link to PR: https://github.com/rq/rq/pull/2008
def custom_started_job_registry_cleanup(self, timestamp: Optional[float] = None):
    """Remove abandoned jobs from registry and add them to FailedJobRegistry.

    Removes jobs with an expiry time earlier than timestamp, specified as
    seconds since the Unix epoch. timestamp defaults to call time if
    unspecified. Removed jobs are added to the global failed job queue.

    Args:
        timestamp (datetime): The datetime to use as the limit.
    """

    score = timestamp if timestamp is not None else current_timestamp()
    job_ids = self.get_expired_job_ids(score)

    if job_ids:
        failed_job_registry = rq.registry.FailedJobRegistry(self.name, self.connection, serializer=self.serializer)
        queue = self.get_queue()

        with self.connection.pipeline() as pipeline:
            for job_id in job_ids:
                try:
                    job = self.job_class.fetch(job_id, connection=self.connection, serializer=self.serializer)
                except NoSuchJobError:
                    continue

                job.execute_failure_callback(
                    self.death_penalty_class, AbandonedJobError, AbandonedJobError(), traceback.extract_stack()
                )

                retry = job.retries_left and job.retries_left > 0

                if retry:
                    job.retry(queue, pipeline)

                else:
                    exc_string = f"due to {AbandonedJobError.__name__}"
                    rq.registry.logger.warning(
                        f'{self.__class__.__name__} cleanup: Moving job to {rq.registry.FailedJobRegistry.__name__} '
                        f'({exc_string})'
                    )
                    job.set_status(JobStatus.FAILED)
                    job._exc_info = f"Moved to {rq.registry.FailedJobRegistry.__name__}, {exc_string}, at {datetime.now()}"
                    job.save(pipeline=pipeline, include_meta=False)
                    job.cleanup(ttl=-1, pipeline=pipeline)
                    failed_job_registry.add(job, job.failure_ttl)
                    queue.enqueue_dependents(job)

            pipeline.zremrangebyscore(self.key, 0, score)
            pipeline.execute()

    return job_ids

def update_started_job_registry_cleanup() -> None:
    # don't forget to check if the issue https://github.com/rq/rq/issues/2006 has been resolved in upstream
    assert VERSION == '1.15.1'
    rq.registry.StartedJobRegistry.cleanup = custom_started_job_registry_cleanup
back to top