From edf7a6afe8346fa672d62b489d05bc0707be8c25 Mon Sep 17 00:00:00 2001 From: KoenigMjr <135820716+KoenigMjr@users.noreply.github.com> Date: Fri, 19 Dec 2025 18:56:35 +0100 Subject: [PATCH] [upd/telegram]: improve stability, safety and production robustness Updated the Telegram plugin to handle high-load scenarios and prevent resource exhaustion. Key focus areas were message formatting, concurrency management, and configuration resilience. - Implement bounded message queue (max 100) with non-blocking drops to prevent memory leaks - Add graceful shutdown logic with worker thread joining and queue draining - Add self-healing initialization (`_ensure_sender`) to handle race conditions during startup - Implement robust escaping/sanitization for HTML and MarkdownV2 parse modes - Enforce Telegram's 4096 character limit with graceful truncation - Enhance error diagnostics for API responses (Rate limiting, 4xx/5xx errors) - Validate and sanitize GPS coordinates (range and type checking) - Decouple logging from global config by using module-level logger Behavioral Changes: - BREAKING: Location messages now require `coordinates: true` in config (previously default) - Messages are dropped with an error log when the queue is full (prevents system hang) - Invalid HTML/Markdown characters are now automatically escaped to prevent API errors --- docu/docs/plugin/telegram.md | 45 ++++-- plugin/telegram.py | 283 ++++++++++++++++++++--------------- 2 files changed, 199 insertions(+), 129 deletions(-) diff --git a/docu/docs/plugin/telegram.md b/docu/docs/plugin/telegram.md index 45e10c52..84f42fb7 100644 --- a/docu/docs/plugin/telegram.md +++ b/docu/docs/plugin/telegram.md @@ -2,12 +2,12 @@ --- ## Beschreibung -Dieses Plugin ermöglicht das Versenden von Telegram-Nachrichten für verschiedene Alarmierungsarten. -Wenn im eingehenden Paket die Felder `lat` und `lon` vorhanden sind (z. B. durch das [Geocoding](../modul/geocoding.md) Modul), wird zusätzlich automatisch der Standort als Telegram-Location gesendet. -Das Senden der Nachrichten erfolgt über eine interne Queue mit Retry-Logik und exponentiellem Backoff, um die Vorgaben der Telegram API einzuhalten und Nachrichtenverluste zu verhindern. Die Retry-Parameter (max_retries, initial_delay, max_delay) können in der Konfiguration angepasst werden. +Dieses Plugin ermöglicht den Versand von Telegram-Nachrichten für verschiedene Alarmierungsarten. Um eine hohe Stabilität im BOS-Betrieb zu gewährleisten, erfolgt der Versand asynchron über eine interne Warteschlange (Queue) mit Überlastschutz. -## Unterstütze Alarmtypen +Das Plugin hält die Vorgaben der Telegram API automatisch ein: Eine integrierte Retry-Logik mit exponentiellem Backoff verhindert Nachrichtenverluste bei temporären Netzwerkproblemen. Zudem werden Nachrichten, die das Telegram-Limit von 4.096 Zeichen überschreiten, automatisch gekürzt. Wenn Standortdaten (lat/lon) vorhanden sind, kann das Plugin diese als native Karte senden (erfordert [Geocoding-Modul](../modul/geocoding.md) und Aktivierung via coordinates). + +## Unterstützte Alarmtypen - FMS - POCSAG - ZVEI @@ -26,11 +26,12 @@ Das Senden der Nachrichten erfolgt über eine interne Queue mit Retry-Logik und |message_fms|Formatvorlage für FMS-Alarm|`{FMS}`| |message_pocsag|Formatvorlage für POCSAG|`{RIC}({SRIC})\n{MSG}`| |message_zvei|Formatvorlage für ZVEI|`{TONE}`| -|message_msg|Formatvorlage für MSG-Nachricht|-| +|message_msg|Formatvorlage für MSG-Nachricht|`{MSG}`| |max_retries|Anzahl Wiederholungsversuche bei Fehlern|5| |initial_delay|Initiale Wartezeit bei Wiederholungsversuchen|2 [Sek.]| |max_delay|Maximale Retry-Verzögerung|300 [Sek.]| -|parse_mode|Formatierung ("HTML" oder "MarkdownV2"), Case-sensitive!|leer| +|parse_mode|Formatierung ("HTML" oder "MarkdownV2"), !Case-sensitive! Empfehlung: HTML|leer| +|coordinates|Aktiviert die Verarbeitung von Standortdaten|false| **Beispiel:** ```yaml @@ -38,6 +39,7 @@ Das Senden der Nachrichten erfolgt über eine interne Queue mit Retry-Logik und name: Telegram Plugin res: telegram config: + coordinates: true message_pocsag: | POCSAG Alarm: RIC: {RIC} ({SRIC}) @@ -49,21 +51,44 @@ Das Senden der Nachrichten erfolgt über eine interne Queue mit Retry-Logik und - "CHAT_ID" ``` -Hinweis: +### parse_mode Über parse_mode kannst du Telegram-Formatierungen verwenden: - HTML: `fett`, `kursiv`, `unterstrichen`, `durchgestrichen`, ... -- MarkdownV2: `**fett**`, `__unterstrichen__`, `_italic \*text_` usw. (Escape-Regeln beachten) +- MarkdownV2: `**fett**`, `__unterstrichen__`, `_italic \*text_` usw. + +**Wichtig**: Bei MarkdownV2 werden alle Sonderzeichen innerhalb der Wildcards (wie {MSG}) automatisch escaped. Das verhindert zwar API-Fehler, macht aber eine bewusste Formatierung innerhalb des Funktextes unmöglich. + +**Nutze HTML**, wenn du fettgedruckte oder kursive Elemente in deinem Template verwenden möchtest, ohne dass der Inhalt der Nachricht verändert wird. +```yaml +# EMPFOHLEN: HTML für Formatierung +parse_mode: "HTML" +message_pocsag: | + POCSAG Alarm: + RIC: {RIC} + {MSG} + +# NICHT EMPFOHLEN: MarkdownV2 +# (alle Sonderzeichen in {MSG} werden escaped) +parse_mode: "MarkdownV2" +message_pocsag: "*Alarm*\nRIC: {RIC}" +``` Block-Strings (|) eignen sich perfekt für mehrzeilige Nachrichten und vermeiden Escape-Zeichen wie \n +### coordinates +Der Versand von Standorten ist standardmäßig deaktiviert (`coordinates: false`), um unnötige Warnmeldungen im Log zu vermeiden, wenn keine Koordinaten im Datenpaket enthalten sind. Setze diesen Wert nur auf `true`, wenn du sicherstellst, dass die Alarmierung Koordinaten liefert (z.B. durch einen vorgeschalteten Geocoder). + +**Verhalten beim Standortversand:** +Bei aktivierten Koordinaten sendet das Plugin zusätzlich zum Alarmtext eine native Telegram-Karte als separate Nachricht. Es sind keine Wildcards im Nachrichtentext erforderlich; die Karte wird automatisch unter dem Text gepostet. + --- ## Modul Abhängigkeiten OPTIONAL, nur für POCSAG-Locationversand: Aus dem Modul [Geocoding](../modul/geocoding.md): - `lat` - `lon` - + --- ## Externe Abhängigkeiten -keine +requests diff --git a/plugin/telegram.py b/plugin/telegram.py index 18029a9b..9b30f7f0 100644 --- a/plugin/telegram.py +++ b/plugin/telegram.py @@ -10,9 +10,9 @@ by Bastian Schroll @file: telegram.py -@date: 17.11.2025 -@author: Claus Schichl nach der Idee von Jan Speller -@description: Telegram-Plugin mit Retry-Logik ohne externe Telegram-Abhängigkeiten +@date: 17.01.2026 +@author: Claus Schichl +@description: Telegram-Plugin """ import logging @@ -20,13 +20,10 @@ import threading import queue import requests +import re from plugin.pluginBase import PluginBase -# Setup Logging -logging.basicConfig( - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - level=logging.INFO -) +# Setup logging logger = logging.getLogger(__name__) @@ -36,181 +33,229 @@ class TelegramSender: def __init__(self, bot_token, chat_ids, max_retries=None, initial_delay=None, max_delay=None, parse_mode=None): + self._stop_event = threading.Event() self.bot_token = bot_token self.chat_ids = chat_ids self.max_retries = max_retries if max_retries is not None else 5 self.initial_delay = initial_delay if initial_delay is not None else 2 self.max_delay = max_delay if max_delay is not None else 300 + self.msg_queue = queue.Queue(maxsize=100) # max 100 messages self.parse_mode = parse_mode - self.msg_queue = queue.Queue() + + # Start Worker Thread self._worker = threading.Thread(target=self._worker_loop, daemon=True) self._worker.start() def send_message(self, text): + clean_text = self._escape_text(text, self.parse_mode) for chat_id in self.chat_ids: - self.msg_queue.put(("text", chat_id, text, 0)) # retry_count = 0 + try: + self.msg_queue.put(("text", chat_id, clean_text, 0), block=False) + except queue.Full: + logger.error(f"Message queue full for chat_id {chat_id}, message discarded: {clean_text[:50]}...") def send_location(self, latitude, longitude): + try: + # Use validated float values, not original strings + lat = float(latitude) + lon = float(longitude) + # check Telegram API Limits + if not (-90 <= lat <= 90) or not (-180 <= lon <= 180): + logger.error(f"Invalid coordinates: lat={lat}, lon={lon} (out of range)") + return + except (TypeError, ValueError) as e: + logger.error(f"Invalid coordinate format: {e}, location skipped") + return + for chat_id in self.chat_ids: - self.msg_queue.put(("location", chat_id, {"latitude": latitude, "longitude": longitude}, 0)) + try: + self.msg_queue.put(("location", chat_id, {"latitude": lat, "longitude": lon}, 0), block=False) + except queue.Full: + logger.error(f"Location queue full for chat_id {chat_id}, location discarded: {lat}, {lon}") + + @staticmethod + def _escape_text(text, parse_mode): + if not text: + return "" + if parse_mode == "HTML": + protected_tags = {} + tag_pattern = r'<(/?)(\w+)>' + allowed = ["b", "strong", "i", "em", "code", "pre", "u", "s"] + + def protect_tag(match): + tag_full = match.group(0) + tag_name = match.group(2) + if tag_name in allowed: + placeholder = f"__TAG_{len(protected_tags)}__" + protected_tags[placeholder] = tag_full + return placeholder + return tag_full + + text = re.sub(tag_pattern, protect_tag, text) + text = text.replace("&", "&").replace("<", "<").replace(">", ">") + for placeholder, original_tag in protected_tags.items(): + text = text.replace(placeholder, original_tag) + elif parse_mode == "MarkdownV2": + # Escape all MarkdownV2 special characters (Telegram API requirement) + # See: https://core.telegram.org/bots/api#markdownv2-style + escape_chars = r'_*[]()~`>#+-=|{}.!' + text = "".join(['\\' + char if char in escape_chars else char for char in text]) + return text[:4090] + "[...]" if len(text) > 4096 else text def _worker_loop(self): delay = self.initial_delay - - while True: + while not self._stop_event.is_set(): try: - msg_type, chat_id, content, retry_count = self.msg_queue.get() - - success, permanent_failure, custom_delay = self._send_to_telegram(msg_type, chat_id, content) - + msg_type, chat_id, content, retry_count = self.msg_queue.get(timeout=1) + success, permanent_failure, custom_delay, error_details = self._send_to_telegram(msg_type, chat_id, content) if success: delay = self.initial_delay - - elif permanent_failure: - logger.error("Permanenter Fehler – Nachricht wird verworfen.") - - elif retry_count >= self.max_retries: - logger.error("Maximale Wiederholungsanzahl erreicht – Nachricht wird verworfen.") - + elif permanent_failure or retry_count >= self.max_retries: + logger.warning(f"Discarding message for {chat_id}: {error_details}") else: - logger.warning(f"Erneutes Einreihen der Nachricht (Versuch {retry_count + 1}).") - self.msg_queue.put((msg_type, chat_id, content, retry_count + 1)) - - # use the Telegram-provided value (retry_after) if available wait_time = custom_delay if custom_delay is not None else delay time.sleep(wait_time) - - # increase delay for the next attempt (exponential backoff) + self.msg_queue.put((msg_type, chat_id, content, retry_count + 1)) delay = min(delay * 2, self.max_delay) - + except queue.Empty: + continue except Exception as e: - logger.exception(f"Fehler im Telegram-Worker: {e}") + logger.error(f"Error in Telegram worker: {e}") time.sleep(5) def _send_to_telegram(self, msg_type, chat_id, content): + url = f"https://api.telegram.org/bot{self.bot_token}/" + url += "sendMessage" if msg_type == "text" else "sendLocation" + payload = {'chat_id': chat_id} + if msg_type == "text": - url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage" - payload = { - 'chat_id': chat_id, - 'text': content, - } + payload['text'] = content if self.parse_mode: payload['parse_mode'] = self.parse_mode - elif msg_type == "location": - url = f"https://api.telegram.org/bot{self.bot_token}/sendLocation" - payload = { - 'chat_id': chat_id, - **content - } else: - logger.error("Unbekannter Nachrichtentyp.") - return False, True, None # unknown message type = permanent failure + payload.update(content) try: - custom_delay = None # standardvalue for return, except in case of 429 - response = requests.post(url, data=payload, timeout=10) + if response.status_code == 200: + logger.info(f"Successfully sent to Chat-ID {chat_id}") + return True, False, None, None + # Rate limiting if response.status_code == 429: - custom_delay = response.json().get("parameters", {}).get("retry_after", 5) - logger.warning(f"Rate Limit erreicht – warte {custom_delay} Sekunden.") - return False, False, custom_delay # Telegram gives exact wait time - - if response.status_code == 400: - logger.error("Ungültige Parameter – Nachricht wird nicht erneut gesendet.") - return False, True, custom_delay # permanent failure - - if response.status_code == 401: - logger.critical("Ungültiger Bot-Token – bitte prüfen!") - return False, True, custom_delay # permanent failure - - response.raise_for_status() - logger.info(f"Erfolgreich gesendet an Chat-ID {chat_id}") - return True, False, custom_delay - - except requests.RequestException as e: - logger.warning(f"Fehler beim Senden an Telegram (Chat-ID {chat_id}): {e}") - return False, False, custom_delay + retry_after = response.json().get("parameters", {}).get("retry_after", 5) + logger.warning(f"Rate limited for {chat_id}, retry after {retry_after}s") + return False, False, retry_after, f"Rate Limit (retry after {retry_after}s)" + + return False, response.status_code < 500, None, f"HTTP {response.status_code}" + except Exception as e: + return False, False, None, str(e) + + def shutdown(self): + r"""Graceful shutdown with queue draining""" + logger.info("Shutting down Telegram sender...") + self._stop_event.set() + timeout = time.time() + 5 + while not self.msg_queue.empty() and time.time() < timeout: + time.sleep(0.1) + remaining = self.msg_queue.qsize() + if remaining > 0: + logger.warning(f"{remaining} messages in queue discarded during shutdown") + self._worker.join(timeout=5) # =========================== # BoswatchPlugin-Class # =========================== +logging.debug("- %s loaded", __name__) + class BoswatchPlugin(PluginBase): + r"""!Description of the Plugin""" def __init__(self, config): r"""!Do not change anything here!""" super().__init__(__name__, config) # you can access the config class on 'self.config' + def _ensure_sender(self): + r""" checking with hasattr if self.sender already exists""" + if not hasattr(self, 'sender') or self.sender is None: + token = self.config.get("botToken") + ids = self.config.get("chatIds", default=[]) + + if token and ids: + self.sender = TelegramSender( + bot_token=token, + chat_ids=ids, + max_retries=self.config.get("max_retries"), + initial_delay=self.config.get("initial_delay"), + max_delay=self.config.get("max_delay"), + parse_mode=self.config.get("parse_mode") + ) + logger.debug("TelegramSender initialized via Self-Healing") + else: + missing = [] + if not token: + missing.append("botToken") + if not ids: + missing.append("chatIds") + logger.error(f"Telegram configuration incomplete! Missing: {', '.join(missing)}. Plugin will not send messages.") + + def _has_sender(self): + r"""Check if sender is available and properly initialized""" + return hasattr(self, 'sender') and self.sender is not None + def onLoad(self): r"""!Called by import of the plugin""" - bot_token = self.config.get("botToken") - chat_ids = self.config.get("chatIds", default=[]) - - if not bot_token or not chat_ids: - logger.error("botToken oder chatIds fehlen in der Konfiguration!") - return - - # configurable parameters with fallback defaults - max_retries = self.config.get("max_retries") - initial_delay = self.config.get("initial_delay") - max_delay = self.config.get("max_delay") - parse_mode = self.config.get("parse_mode") - - self.sender = TelegramSender( - bot_token=bot_token, - chat_ids=chat_ids, - max_retries=max_retries, - initial_delay=initial_delay, - max_delay=max_delay, - parse_mode=parse_mode - ) - - startup_message = self.config.get("startup_message") - if startup_message and startup_message.strip(): - self.sender.send_message(startup_message) + self._ensure_sender() + if not self._has_sender(): + logger.warning("Telegram plugin loaded but not configured. Messages will be discarded.") + else: + startup = self.config.get("startup_message") + if startup and startup.strip(): + self.sender.send_message(startup) def setup(self): - r"""!Called before alarm - Remove if not implemented""" - pass + r"""!Called before alarm""" + # ensure sender exists before alarms are processed + self._ensure_sender() def fms(self, bwPacket): - r"""!Called on FMS alarm - @param bwPacket: bwPacket instance""" - msg = self.parseWildcards(self.config.get("message_fms", default="{FMS}")) - self.sender.send_message(msg) + r"""!Called on FMS alarm""" + if self._has_sender(): + msg = self.parseWildcards(self.config.get("message_fms", default="{FMS}")) + self.sender.send_message(msg) def pocsag(self, bwPacket): - r"""!Called on POCSAG alarm - @param bwPacket: bwPacket instance""" - msg = self.parseWildcards(self.config.get("message_pocsag", default="{RIC}({SRIC})\n{MSG}")) - self.sender.send_message(msg) + r"""!Called on POCSAG alarm""" + if self._has_sender(): + msg = self.parseWildcards(self.config.get("message_pocsag", default="{RIC}({SRIC})\n{MSG}")) + self.sender.send_message(msg) - if bwPacket.get("lat") is not None and bwPacket.get("lon") is not None: - lat, lon = bwPacket.get("lat"), bwPacket.get("lon") - logger.debug("Koordinaten gefunden – sende Standort.") - self.sender.send_location(lat, lon) + if self.config.get("coordinates", default=False): + lat = bwPacket.get("lat") + lon = bwPacket.get("lon") + if lat is not None and lon is not None: + self.sender.send_location(lat, lon) def zvei(self, bwPacket): - r"""!Called on ZVEI alarm - @param bwPacket: bwPacket instance""" - msg = self.parseWildcards(self.config.get("message_zvei", default="{TONE}")) - self.sender.send_message(msg) + r"""!Called on ZVEI alarm""" + if self._has_sender(): + msg = self.parseWildcards(self.config.get("message_zvei", default="{TONE}")) + self.sender.send_message(msg) def msg(self, bwPacket): - r"""!Called on MSG packet - @param bwPacket: bwPacket instance""" - msg = self.parseWildcards(self.config.get("message_msg")) - self.sender.send_message(msg) + r"""!Called on MSG packet""" + if self._has_sender(): + msg = self.parseWildcards(self.config.get("message_msg", default="{MSG}")) + self.sender.send_message(msg) def teardown(self): - r"""!Called after alarm - Remove if not implemented""" + r"""!Called after alarm""" pass def onUnload(self): - r"""!Called by destruction of the plugin - Remove if not implemented""" - pass + r"""!Called by destruction of the plugin""" + if self._has_sender(): + self.sender.shutdown() + logger.info("Telegram plugin unloaded")