jobqueue.py
"""
byceps.util.jobqueue
~~~~~~~~~~~~~~~~~~~~
An asynchronously processed job queue based on Redis_ and RQ_.
.. _Redis: https://redis.io/
.. _RQ: https://python-rq.org/
:Copyright: 2014-2025 Jochen Kupperschmidt
:License: Revised BSD (see `LICENSE` file for details)
"""
from collections.abc import Callable
from datetime import datetime, UTC
from flask import current_app
from rq import Queue
from byceps.byceps_app import BycepsApp
def get_queue(app: BycepsApp) -> Queue:
is_async = app.byceps_config.jobs.asynchronous
return Queue(connection=app.redis_client, is_async=is_async)
def enqueue(func: Callable, *args, **kwargs) -> None:
"""Add the function call to the queue as a job."""
queue = get_queue(current_app)
queue.enqueue(func, *args, **kwargs)
def enqueue_at(dt: datetime, func: Callable, *args, **kwargs) -> None:
"""Add the function call to the queue as a job to be executed at the
specific time.
"""
if dt.tzinfo is None:
# Set UTC as timezine in naive datetime objects
# to prevent rq from assuming local timezone.
dt = dt.replace(tzinfo=UTC)
queue = get_queue(current_app)
queue.enqueue_at(dt, func, *args, **kwargs)