Tria.ge-Crawler-WIP/client/telegram_notification.py
2025-04-12 01:40:13 -05:00

31 lines
1 KiB
Python

import os
from asyncio import sleep
class TelegramNotificationClient:
def __init__(self):
self.token = os.environ.get("TELEGRAM_BOT_TOKEN", "")
self.chat_id = os.environ.get("TELEGRAM_CHAT_ID", "")
async def send_bot_message(self, message: str):
if not self.token or not self.chat_id:
return
try:
import httpx
url = f"https://api.telegram.org/bot{self.token}/sendMessage"
data = {
"chat_id": self.chat_id,
"text": message,
"parse_mode": "Markdown"
}
async with httpx.AsyncClient() as client:
await client.post(url, json=data)
except Exception as e:
print(f"Failed to send Telegram notification: {e}")
async def tg_bot_polling():
"""Simple polling function to keep the Telegram bot running"""
while True:
await sleep(60) # Just sleep in a loop - in a real bot this would check for messages