diff --git a/docu/docs/plugin/telegram.md b/docu/docs/plugin/telegram.md
index 45e10c52..84f42fb7 100644
--- a/docu/docs/plugin/telegram.md
+++ b/docu/docs/plugin/telegram.md
@@ -2,12 +2,12 @@
---
## Beschreibung
-Dieses Plugin ermöglicht das Versenden von Telegram-Nachrichten für verschiedene Alarmierungsarten.
-Wenn im eingehenden Paket die Felder `lat` und `lon` vorhanden sind (z. B. durch das [Geocoding](../modul/geocoding.md) Modul), wird zusätzlich automatisch der Standort als Telegram-Location gesendet.
-Das Senden der Nachrichten erfolgt über eine interne Queue mit Retry-Logik und exponentiellem Backoff, um die Vorgaben der Telegram API einzuhalten und Nachrichtenverluste zu verhindern. Die Retry-Parameter (max_retries, initial_delay, max_delay) können in der Konfiguration angepasst werden.
+Dieses Plugin ermöglicht den Versand von Telegram-Nachrichten für verschiedene Alarmierungsarten. Um eine hohe Stabilität im BOS-Betrieb zu gewährleisten, erfolgt der Versand asynchron über eine interne Warteschlange (Queue) mit Überlastschutz.
-## Unterstütze Alarmtypen
+Das Plugin hält die Vorgaben der Telegram API automatisch ein: Eine integrierte Retry-Logik mit exponentiellem Backoff verhindert Nachrichtenverluste bei temporären Netzwerkproblemen. Zudem werden Nachrichten, die das Telegram-Limit von 4.096 Zeichen überschreiten, automatisch gekürzt. Wenn Standortdaten (lat/lon) vorhanden sind, kann das Plugin diese als native Karte senden (erfordert [Geocoding-Modul](../modul/geocoding.md) und Aktivierung via coordinates).
+
+## Unterstützte Alarmtypen
- FMS
- POCSAG
- ZVEI
@@ -26,11 +26,12 @@ Das Senden der Nachrichten erfolgt über eine interne Queue mit Retry-Logik und
|message_fms|Formatvorlage für FMS-Alarm|`{FMS}`|
|message_pocsag|Formatvorlage für POCSAG|`{RIC}({SRIC})\n{MSG}`|
|message_zvei|Formatvorlage für ZVEI|`{TONE}`|
-|message_msg|Formatvorlage für MSG-Nachricht|-|
+|message_msg|Formatvorlage für MSG-Nachricht|`{MSG}`|
|max_retries|Anzahl Wiederholungsversuche bei Fehlern|5|
|initial_delay|Initiale Wartezeit bei Wiederholungsversuchen|2 [Sek.]|
|max_delay|Maximale Retry-Verzögerung|300 [Sek.]|
-|parse_mode|Formatierung ("HTML" oder "MarkdownV2"), Case-sensitive!|leer|
+|parse_mode|Formatierung ("HTML" oder "MarkdownV2"), !Case-sensitive! Empfehlung: HTML|leer|
+|coordinates|Aktiviert die Verarbeitung von Standortdaten|false|
**Beispiel:**
```yaml
@@ -38,6 +39,7 @@ Das Senden der Nachrichten erfolgt über eine interne Queue mit Retry-Logik und
name: Telegram Plugin
res: telegram
config:
+ coordinates: true
message_pocsag: |
POCSAG Alarm:
RIC: {RIC} ({SRIC})
@@ -49,21 +51,44 @@ Das Senden der Nachrichten erfolgt über eine interne Queue mit Retry-Logik und
- "CHAT_ID"
```
-Hinweis:
+### parse_mode
Über parse_mode kannst du Telegram-Formatierungen verwenden:
- HTML: `fett`, `kursiv`, `unterstrichen`, `durchgestrichen`, ...
-- MarkdownV2: `**fett**`, `__unterstrichen__`, `_italic \*text_` usw. (Escape-Regeln beachten)
+- MarkdownV2: `**fett**`, `__unterstrichen__`, `_italic \*text_` usw.
+
+**Wichtig**: Bei MarkdownV2 werden alle Sonderzeichen innerhalb der Wildcards (wie {MSG}) automatisch escaped. Das verhindert zwar API-Fehler, macht aber eine bewusste Formatierung innerhalb des Funktextes unmöglich.
+
+**Nutze HTML**, wenn du fettgedruckte oder kursive Elemente in deinem Template verwenden möchtest, ohne dass der Inhalt der Nachricht verändert wird.
+```yaml
+# EMPFOHLEN: HTML für Formatierung
+parse_mode: "HTML"
+message_pocsag: |
+ POCSAG Alarm:
+ RIC: {RIC}
+ {MSG}
+
+# NICHT EMPFOHLEN: MarkdownV2
+# (alle Sonderzeichen in {MSG} werden escaped)
+parse_mode: "MarkdownV2"
+message_pocsag: "*Alarm*\nRIC: {RIC}"
+```
Block-Strings (|) eignen sich perfekt für mehrzeilige Nachrichten und vermeiden Escape-Zeichen wie \n
+### coordinates
+Der Versand von Standorten ist standardmäßig deaktiviert (`coordinates: false`), um unnötige Warnmeldungen im Log zu vermeiden, wenn keine Koordinaten im Datenpaket enthalten sind. Setze diesen Wert nur auf `true`, wenn du sicherstellst, dass die Alarmierung Koordinaten liefert (z.B. durch einen vorgeschalteten Geocoder).
+
+**Verhalten beim Standortversand:**
+Bei aktivierten Koordinaten sendet das Plugin zusätzlich zum Alarmtext eine native Telegram-Karte als separate Nachricht. Es sind keine Wildcards im Nachrichtentext erforderlich; die Karte wird automatisch unter dem Text gepostet.
+
---
## Modul Abhängigkeiten
OPTIONAL, nur für POCSAG-Locationversand: Aus dem Modul [Geocoding](../modul/geocoding.md):
- `lat`
- `lon`
-
+
---
## Externe Abhängigkeiten
-keine
+requests
diff --git a/plugin/telegram.py b/plugin/telegram.py
index 18029a9b..9b30f7f0 100644
--- a/plugin/telegram.py
+++ b/plugin/telegram.py
@@ -10,9 +10,9 @@
by Bastian Schroll
@file: telegram.py
-@date: 17.11.2025
-@author: Claus Schichl nach der Idee von Jan Speller
-@description: Telegram-Plugin mit Retry-Logik ohne externe Telegram-Abhängigkeiten
+@date: 17.01.2026
+@author: Claus Schichl
+@description: Telegram-Plugin
"""
import logging
@@ -20,13 +20,10 @@
import threading
import queue
import requests
+import re
from plugin.pluginBase import PluginBase
-# Setup Logging
-logging.basicConfig(
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
- level=logging.INFO
-)
+# Setup logging
logger = logging.getLogger(__name__)
@@ -36,181 +33,229 @@
class TelegramSender:
def __init__(self, bot_token, chat_ids, max_retries=None, initial_delay=None, max_delay=None, parse_mode=None):
+ self._stop_event = threading.Event()
self.bot_token = bot_token
self.chat_ids = chat_ids
self.max_retries = max_retries if max_retries is not None else 5
self.initial_delay = initial_delay if initial_delay is not None else 2
self.max_delay = max_delay if max_delay is not None else 300
+ self.msg_queue = queue.Queue(maxsize=100) # max 100 messages
self.parse_mode = parse_mode
- self.msg_queue = queue.Queue()
+
+ # Start Worker Thread
self._worker = threading.Thread(target=self._worker_loop, daemon=True)
self._worker.start()
def send_message(self, text):
+ clean_text = self._escape_text(text, self.parse_mode)
for chat_id in self.chat_ids:
- self.msg_queue.put(("text", chat_id, text, 0)) # retry_count = 0
+ try:
+ self.msg_queue.put(("text", chat_id, clean_text, 0), block=False)
+ except queue.Full:
+ logger.error(f"Message queue full for chat_id {chat_id}, message discarded: {clean_text[:50]}...")
def send_location(self, latitude, longitude):
+ try:
+ # Use validated float values, not original strings
+ lat = float(latitude)
+ lon = float(longitude)
+ # check Telegram API Limits
+ if not (-90 <= lat <= 90) or not (-180 <= lon <= 180):
+ logger.error(f"Invalid coordinates: lat={lat}, lon={lon} (out of range)")
+ return
+ except (TypeError, ValueError) as e:
+ logger.error(f"Invalid coordinate format: {e}, location skipped")
+ return
+
for chat_id in self.chat_ids:
- self.msg_queue.put(("location", chat_id, {"latitude": latitude, "longitude": longitude}, 0))
+ try:
+ self.msg_queue.put(("location", chat_id, {"latitude": lat, "longitude": lon}, 0), block=False)
+ except queue.Full:
+ logger.error(f"Location queue full for chat_id {chat_id}, location discarded: {lat}, {lon}")
+
+ @staticmethod
+ def _escape_text(text, parse_mode):
+ if not text:
+ return ""
+ if parse_mode == "HTML":
+ protected_tags = {}
+ tag_pattern = r'<(/?)(\w+)>'
+ allowed = ["b", "strong", "i", "em", "code", "pre", "u", "s"]
+
+ def protect_tag(match):
+ tag_full = match.group(0)
+ tag_name = match.group(2)
+ if tag_name in allowed:
+ placeholder = f"__TAG_{len(protected_tags)}__"
+ protected_tags[placeholder] = tag_full
+ return placeholder
+ return tag_full
+
+ text = re.sub(tag_pattern, protect_tag, text)
+ text = text.replace("&", "&").replace("<", "<").replace(">", ">")
+ for placeholder, original_tag in protected_tags.items():
+ text = text.replace(placeholder, original_tag)
+ elif parse_mode == "MarkdownV2":
+ # Escape all MarkdownV2 special characters (Telegram API requirement)
+ # See: https://core.telegram.org/bots/api#markdownv2-style
+ escape_chars = r'_*[]()~`>#+-=|{}.!'
+ text = "".join(['\\' + char if char in escape_chars else char for char in text])
+ return text[:4090] + "[...]" if len(text) > 4096 else text
def _worker_loop(self):
delay = self.initial_delay
-
- while True:
+ while not self._stop_event.is_set():
try:
- msg_type, chat_id, content, retry_count = self.msg_queue.get()
-
- success, permanent_failure, custom_delay = self._send_to_telegram(msg_type, chat_id, content)
-
+ msg_type, chat_id, content, retry_count = self.msg_queue.get(timeout=1)
+ success, permanent_failure, custom_delay, error_details = self._send_to_telegram(msg_type, chat_id, content)
if success:
delay = self.initial_delay
-
- elif permanent_failure:
- logger.error("Permanenter Fehler – Nachricht wird verworfen.")
-
- elif retry_count >= self.max_retries:
- logger.error("Maximale Wiederholungsanzahl erreicht – Nachricht wird verworfen.")
-
+ elif permanent_failure or retry_count >= self.max_retries:
+ logger.warning(f"Discarding message for {chat_id}: {error_details}")
else:
- logger.warning(f"Erneutes Einreihen der Nachricht (Versuch {retry_count + 1}).")
- self.msg_queue.put((msg_type, chat_id, content, retry_count + 1))
-
- # use the Telegram-provided value (retry_after) if available
wait_time = custom_delay if custom_delay is not None else delay
time.sleep(wait_time)
-
- # increase delay for the next attempt (exponential backoff)
+ self.msg_queue.put((msg_type, chat_id, content, retry_count + 1))
delay = min(delay * 2, self.max_delay)
-
+ except queue.Empty:
+ continue
except Exception as e:
- logger.exception(f"Fehler im Telegram-Worker: {e}")
+ logger.error(f"Error in Telegram worker: {e}")
time.sleep(5)
def _send_to_telegram(self, msg_type, chat_id, content):
+ url = f"https://api.telegram.org/bot{self.bot_token}/"
+ url += "sendMessage" if msg_type == "text" else "sendLocation"
+ payload = {'chat_id': chat_id}
+
if msg_type == "text":
- url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage"
- payload = {
- 'chat_id': chat_id,
- 'text': content,
- }
+ payload['text'] = content
if self.parse_mode:
payload['parse_mode'] = self.parse_mode
- elif msg_type == "location":
- url = f"https://api.telegram.org/bot{self.bot_token}/sendLocation"
- payload = {
- 'chat_id': chat_id,
- **content
- }
else:
- logger.error("Unbekannter Nachrichtentyp.")
- return False, True, None # unknown message type = permanent failure
+ payload.update(content)
try:
- custom_delay = None # standardvalue for return, except in case of 429
-
response = requests.post(url, data=payload, timeout=10)
+ if response.status_code == 200:
+ logger.info(f"Successfully sent to Chat-ID {chat_id}")
+ return True, False, None, None
+ # Rate limiting
if response.status_code == 429:
- custom_delay = response.json().get("parameters", {}).get("retry_after", 5)
- logger.warning(f"Rate Limit erreicht – warte {custom_delay} Sekunden.")
- return False, False, custom_delay # Telegram gives exact wait time
-
- if response.status_code == 400:
- logger.error("Ungültige Parameter – Nachricht wird nicht erneut gesendet.")
- return False, True, custom_delay # permanent failure
-
- if response.status_code == 401:
- logger.critical("Ungültiger Bot-Token – bitte prüfen!")
- return False, True, custom_delay # permanent failure
-
- response.raise_for_status()
- logger.info(f"Erfolgreich gesendet an Chat-ID {chat_id}")
- return True, False, custom_delay
-
- except requests.RequestException as e:
- logger.warning(f"Fehler beim Senden an Telegram (Chat-ID {chat_id}): {e}")
- return False, False, custom_delay
+ retry_after = response.json().get("parameters", {}).get("retry_after", 5)
+ logger.warning(f"Rate limited for {chat_id}, retry after {retry_after}s")
+ return False, False, retry_after, f"Rate Limit (retry after {retry_after}s)"
+
+ return False, response.status_code < 500, None, f"HTTP {response.status_code}"
+ except Exception as e:
+ return False, False, None, str(e)
+
+ def shutdown(self):
+ r"""Graceful shutdown with queue draining"""
+ logger.info("Shutting down Telegram sender...")
+ self._stop_event.set()
+ timeout = time.time() + 5
+ while not self.msg_queue.empty() and time.time() < timeout:
+ time.sleep(0.1)
+ remaining = self.msg_queue.qsize()
+ if remaining > 0:
+ logger.warning(f"{remaining} messages in queue discarded during shutdown")
+ self._worker.join(timeout=5)
# ===========================
# BoswatchPlugin-Class
# ===========================
+logging.debug("- %s loaded", __name__)
+
class BoswatchPlugin(PluginBase):
+ r"""!Description of the Plugin"""
def __init__(self, config):
r"""!Do not change anything here!"""
super().__init__(__name__, config) # you can access the config class on 'self.config'
+ def _ensure_sender(self):
+ r""" checking with hasattr if self.sender already exists"""
+ if not hasattr(self, 'sender') or self.sender is None:
+ token = self.config.get("botToken")
+ ids = self.config.get("chatIds", default=[])
+
+ if token and ids:
+ self.sender = TelegramSender(
+ bot_token=token,
+ chat_ids=ids,
+ max_retries=self.config.get("max_retries"),
+ initial_delay=self.config.get("initial_delay"),
+ max_delay=self.config.get("max_delay"),
+ parse_mode=self.config.get("parse_mode")
+ )
+ logger.debug("TelegramSender initialized via Self-Healing")
+ else:
+ missing = []
+ if not token:
+ missing.append("botToken")
+ if not ids:
+ missing.append("chatIds")
+ logger.error(f"Telegram configuration incomplete! Missing: {', '.join(missing)}. Plugin will not send messages.")
+
+ def _has_sender(self):
+ r"""Check if sender is available and properly initialized"""
+ return hasattr(self, 'sender') and self.sender is not None
+
def onLoad(self):
r"""!Called by import of the plugin"""
- bot_token = self.config.get("botToken")
- chat_ids = self.config.get("chatIds", default=[])
-
- if not bot_token or not chat_ids:
- logger.error("botToken oder chatIds fehlen in der Konfiguration!")
- return
-
- # configurable parameters with fallback defaults
- max_retries = self.config.get("max_retries")
- initial_delay = self.config.get("initial_delay")
- max_delay = self.config.get("max_delay")
- parse_mode = self.config.get("parse_mode")
-
- self.sender = TelegramSender(
- bot_token=bot_token,
- chat_ids=chat_ids,
- max_retries=max_retries,
- initial_delay=initial_delay,
- max_delay=max_delay,
- parse_mode=parse_mode
- )
-
- startup_message = self.config.get("startup_message")
- if startup_message and startup_message.strip():
- self.sender.send_message(startup_message)
+ self._ensure_sender()
+ if not self._has_sender():
+ logger.warning("Telegram plugin loaded but not configured. Messages will be discarded.")
+ else:
+ startup = self.config.get("startup_message")
+ if startup and startup.strip():
+ self.sender.send_message(startup)
def setup(self):
- r"""!Called before alarm
- Remove if not implemented"""
- pass
+ r"""!Called before alarm"""
+ # ensure sender exists before alarms are processed
+ self._ensure_sender()
def fms(self, bwPacket):
- r"""!Called on FMS alarm
- @param bwPacket: bwPacket instance"""
- msg = self.parseWildcards(self.config.get("message_fms", default="{FMS}"))
- self.sender.send_message(msg)
+ r"""!Called on FMS alarm"""
+ if self._has_sender():
+ msg = self.parseWildcards(self.config.get("message_fms", default="{FMS}"))
+ self.sender.send_message(msg)
def pocsag(self, bwPacket):
- r"""!Called on POCSAG alarm
- @param bwPacket: bwPacket instance"""
- msg = self.parseWildcards(self.config.get("message_pocsag", default="{RIC}({SRIC})\n{MSG}"))
- self.sender.send_message(msg)
+ r"""!Called on POCSAG alarm"""
+ if self._has_sender():
+ msg = self.parseWildcards(self.config.get("message_pocsag", default="{RIC}({SRIC})\n{MSG}"))
+ self.sender.send_message(msg)
- if bwPacket.get("lat") is not None and bwPacket.get("lon") is not None:
- lat, lon = bwPacket.get("lat"), bwPacket.get("lon")
- logger.debug("Koordinaten gefunden – sende Standort.")
- self.sender.send_location(lat, lon)
+ if self.config.get("coordinates", default=False):
+ lat = bwPacket.get("lat")
+ lon = bwPacket.get("lon")
+ if lat is not None and lon is not None:
+ self.sender.send_location(lat, lon)
def zvei(self, bwPacket):
- r"""!Called on ZVEI alarm
- @param bwPacket: bwPacket instance"""
- msg = self.parseWildcards(self.config.get("message_zvei", default="{TONE}"))
- self.sender.send_message(msg)
+ r"""!Called on ZVEI alarm"""
+ if self._has_sender():
+ msg = self.parseWildcards(self.config.get("message_zvei", default="{TONE}"))
+ self.sender.send_message(msg)
def msg(self, bwPacket):
- r"""!Called on MSG packet
- @param bwPacket: bwPacket instance"""
- msg = self.parseWildcards(self.config.get("message_msg"))
- self.sender.send_message(msg)
+ r"""!Called on MSG packet"""
+ if self._has_sender():
+ msg = self.parseWildcards(self.config.get("message_msg", default="{MSG}"))
+ self.sender.send_message(msg)
def teardown(self):
- r"""!Called after alarm
- Remove if not implemented"""
+ r"""!Called after alarm"""
pass
def onUnload(self):
- r"""!Called by destruction of the plugin
- Remove if not implemented"""
- pass
+ r"""!Called by destruction of the plugin"""
+ if self._has_sender():
+ self.sender.shutdown()
+ logger.info("Telegram plugin unloaded")