1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190 | """
byceps.announce.announce
~~~~~~~~~~~~~~~~~~~~~~~~
:Copyright: 2014-2025 Jochen Kupperschmidt
:License: Revised BSD (see `LICENSE` file for details)
"""
from datetime import datetime
from http import HTTPStatus
from typing import Any
import httpx
import structlog
from byceps.services.core.events import _BaseEvent
from byceps.services.webhooks import webhook_service
from byceps.services.webhooks.models import (
AnnouncementRequest,
OutgoingWebhook,
OutgoingWebhookFormat,
)
from byceps.util.jobqueue import enqueue, enqueue_at
from .connections import get_signals, registry
DEFAULT_WEBHOOK_TIMEOUT = 15
log = structlog.get_logger()
class WebhookError(Exception):
pass
def enable_announcements() -> None:
for signal in get_signals():
signal.connect(_receive_signal)
def _receive_signal(sender, *, event: _BaseEvent | None = None) -> None:
if event is None:
return None
event_name = get_name_for_event(event)
webhooks = _get_webhooks(event_name)
for webhook in webhooks:
enqueue(_handle_event, event, webhook)
def get_event_names() -> set[str]:
return registry.get_event_names()
def get_name_for_event(event: _BaseEvent) -> str:
"""Return the name for the event type.
Raise exception if no name is defined for the event type.
"""
return registry.get_event_name(event)
def _get_webhooks(event_name: str) -> list[OutgoingWebhook]:
webhooks = webhook_service.get_enabled_outgoing_webhooks(event_name)
# Stable order is easier to test.
webhooks.sort(key=lambda wh: wh.extra_fields.get('channel', ''))
return webhooks
def _handle_event(event: _BaseEvent, webhook: OutgoingWebhook) -> None:
announcement_request = build_announcement_request(event, webhook)
if announcement_request is None:
return
announce(announcement_request)
def build_announcement_request(
event: _BaseEvent, webhook: OutgoingWebhook
) -> AnnouncementRequest | None:
event_type = type(event)
handler = registry.get_handler_for_event_type(event_type)
if handler is None:
return None
event_name = get_name_for_event(event)
announcement = handler(event_name, event, webhook)
if announcement is None:
return None
return assemble_announcement_request(
webhook, announcement.text, announce_at=announcement.announce_at
)
def assemble_announcement_request(
webhook: OutgoingWebhook, text: str, *, announce_at: datetime | None = None
) -> AnnouncementRequest:
data = _assemble_request_data(webhook, text)
expected_response_status_code = _EXPECTED_RESPONSE_STATUS_CODES.get(
webhook.format
)
return AnnouncementRequest(
webhook_id=webhook.id,
url=webhook.url,
data=data,
expected_response_status_code=expected_response_status_code,
announce_at=announce_at,
)
def _assemble_request_data(
webhook: OutgoingWebhook, text: str
) -> dict[str, Any]:
text_prefix = webhook.text_prefix
if text_prefix:
text = text_prefix + text
match webhook.format:
case OutgoingWebhookFormat.discord:
return {'content': text}
case OutgoingWebhookFormat.matrix_webhook:
key = webhook.extra_fields.get('key')
if not key:
log.warning('No API key specified with Matrix webhook.')
room_id = webhook.extra_fields.get('room_id')
if not room_id:
log.warning('No room ID specified with Matrix webhook.')
return {'key': key, 'room_id': room_id, 'body': text}
case OutgoingWebhookFormat.mattermost:
return {'text': text}
case OutgoingWebhookFormat.weitersager:
channel = webhook.extra_fields.get('channel')
if not channel:
log.warning('No channel specified with IRC webhook.')
return {'channel': channel, 'text': text}
case _:
return {}
def announce(announcement_request: AnnouncementRequest) -> None:
announce_at = announcement_request.announce_at
if announce_at is not None:
# Schedule job to announce later.
enqueue_at(announce_at, call_webhook, announcement_request)
else:
# Announce now.
call_webhook(announcement_request)
def call_webhook(announcement_request: AnnouncementRequest) -> None:
"""Send HTTP request to the webhook."""
response = httpx.post(
announcement_request.url,
json=announcement_request.data,
timeout=DEFAULT_WEBHOOK_TIMEOUT,
)
expected_response_code = announcement_request.expected_response_status_code
if expected_response_code is None:
return
actual_response_code = response.status_code
if actual_response_code != expected_response_code:
raise WebhookError(
f'Endpoint for webhook {announcement_request.webhook_id} '
f'returned unexpected status code {actual_response_code}'
)
_EXPECTED_RESPONSE_STATUS_CODES = {
OutgoingWebhookFormat.discord: HTTPStatus.NO_CONTENT,
OutgoingWebhookFormat.matrix_webhook: HTTPStatus.OK,
OutgoingWebhookFormat.mattermost: HTTPStatus.OK,
OutgoingWebhookFormat.weitersager: HTTPStatus.ACCEPTED,
}
|