announce.py
"""
byceps.announce.announce
~~~~~~~~~~~~~~~~~~~~~~~~
:Copyright: 2014-2025 Jochen Kupperschmidt
:License: Revised BSD (see `LICENSE` file for details)
"""
from datetime import datetime
from http import HTTPStatus
from typing import Any
import httpx
import structlog
from byceps.services.core.events import _BaseEvent
from byceps.services.webhooks import webhook_service
from byceps.services.webhooks.models import (
AnnouncementRequest,
OutgoingWebhook,
OutgoingWebhookFormat,
)
from byceps.util.jobqueue import enqueue, enqueue_at
from .connections import get_signals, registry
DEFAULT_WEBHOOK_TIMEOUT = 15
log = structlog.get_logger()
class WebhookError(Exception):
pass
def enable_announcements() -> None:
for signal in get_signals():
signal.connect(_receive_signal)
def _receive_signal(sender, *, event: _BaseEvent | None = None) -> None:
if event is None:
return None
event_name = get_name_for_event(event)
webhooks = _get_webhooks(event_name)
for webhook in webhooks:
enqueue(_handle_event, event, webhook)
def get_event_names() -> set[str]:
return registry.get_event_names()
def get_name_for_event(event: _BaseEvent) -> str:
"""Return the name for the event type.
Raise exception if no name is defined for the event type.
"""
return registry.get_event_name(event)
def _get_webhooks(event_name: str) -> list[OutgoingWebhook]:
webhooks = webhook_service.get_enabled_outgoing_webhooks(event_name)
# Stable order is easier to test.
webhooks.sort(key=lambda wh: wh.extra_fields.get('channel', ''))
return webhooks
def _handle_event(event: _BaseEvent, webhook: OutgoingWebhook) -> None:
announcement_request = build_announcement_request(event, webhook)
if announcement_request is None:
return
announce(announcement_request)
def build_announcement_request(
event: _BaseEvent, webhook: OutgoingWebhook
) -> AnnouncementRequest | None:
event_type = type(event)
handler = registry.get_handler_for_event_type(event_type)
if handler is None:
return None
event_name = get_name_for_event(event)
announcement = handler(event_name, event, webhook)
if announcement is None:
return None
return assemble_announcement_request(
webhook, announcement.text, announce_at=announcement.announce_at
)
def assemble_announcement_request(
webhook: OutgoingWebhook, text: str, *, announce_at: datetime | None = None
) -> AnnouncementRequest:
data = _assemble_request_data(webhook, text)
expected_response_status_code = _EXPECTED_RESPONSE_STATUS_CODES.get(
webhook.format
)
return AnnouncementRequest(
webhook_id=webhook.id,
url=webhook.url,
data=data,
expected_response_status_code=expected_response_status_code,
announce_at=announce_at,
)
def _assemble_request_data(
webhook: OutgoingWebhook, text: str
) -> dict[str, Any]:
text_prefix = webhook.text_prefix
if text_prefix:
text = text_prefix + text
match webhook.format:
case OutgoingWebhookFormat.discord:
return {'content': text}
case OutgoingWebhookFormat.matrix_webhook:
key = webhook.extra_fields.get('key')
if not key:
log.warning('No API key specified with Matrix webhook.')
room_id = webhook.extra_fields.get('room_id')
if not room_id:
log.warning('No room ID specified with Matrix webhook.')
return {'key': key, 'room_id': room_id, 'body': text}
case OutgoingWebhookFormat.mattermost:
return {'text': text}
case OutgoingWebhookFormat.weitersager:
channel = webhook.extra_fields.get('channel')
if not channel:
log.warning('No channel specified with IRC webhook.')
return {'channel': channel, 'text': text}
case _:
return {}
def announce(announcement_request: AnnouncementRequest) -> None:
announce_at = announcement_request.announce_at
if announce_at is not None:
# Schedule job to announce later.
enqueue_at(announce_at, call_webhook, announcement_request)
else:
# Announce now.
call_webhook(announcement_request)
def call_webhook(announcement_request: AnnouncementRequest) -> None:
"""Send HTTP request to the webhook."""
response = httpx.post(
announcement_request.url,
json=announcement_request.data,
timeout=DEFAULT_WEBHOOK_TIMEOUT,
)
expected_response_code = announcement_request.expected_response_status_code
if expected_response_code is None:
return
actual_response_code = response.status_code
if actual_response_code != expected_response_code:
raise WebhookError(
f'Endpoint for webhook {announcement_request.webhook_id} '
f'returned unexpected status code {actual_response_code}'
)
_EXPECTED_RESPONSE_STATUS_CODES = {
OutgoingWebhookFormat.discord: HTTPStatus.NO_CONTENT,
OutgoingWebhookFormat.matrix_webhook: HTTPStatus.OK,
OutgoingWebhookFormat.mattermost: HTTPStatus.OK,
OutgoingWebhookFormat.weitersager: HTTPStatus.ACCEPTED,
}