31 lines
1 KiB
Python
31 lines
1 KiB
Python
import os
|
|
from asyncio import sleep
|
|
|
|
class TelegramNotificationClient:
|
|
def __init__(self):
|
|
self.token = os.environ.get("TELEGRAM_BOT_TOKEN", "")
|
|
self.chat_id = os.environ.get("TELEGRAM_CHAT_ID", "")
|
|
|
|
async def send_bot_message(self, message: str):
|
|
if not self.token or not self.chat_id:
|
|
return
|
|
|
|
try:
|
|
import httpx
|
|
|
|
url = f"https://api.telegram.org/bot{self.token}/sendMessage"
|
|
data = {
|
|
"chat_id": self.chat_id,
|
|
"text": message,
|
|
"parse_mode": "Markdown"
|
|
}
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
await client.post(url, json=data)
|
|
except Exception as e:
|
|
print(f"Failed to send Telegram notification: {e}")
|
|
|
|
async def tg_bot_polling():
|
|
"""Simple polling function to keep the Telegram bot running"""
|
|
while True:
|
|
await sleep(60) # Just sleep in a loop - in a real bot this would check for messages
|