-
Notifications
You must be signed in to change notification settings - Fork 441
/
main.py
45 lines (36 loc) · 1.55 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import os
from dotenv import load_dotenv
from fastapi import FastAPI, Header, HTTPException, Depends
from telegram import Update, Bot
from pydantic import BaseModel
class TelegramUpdate(BaseModel):
update_id: int
message: dict
app = FastAPI()
# Load variables from .env file if present
load_dotenv()
# Read the variable from the environment (or .env file)
bot_token = os.getenv('BOT_TOKEN')
secret_token = os.getenv("SECRET_TOKEN")
# webhook_url = os.getenv('CYCLIC_URL', 'http://localhost:8181') + "/webhook/"
bot = Bot(token=bot_token)
# bot.set_webhook(url=webhook_url)
# webhook_info = bot.get_webhook_info()
# print(webhook_info)
def auth_telegram_token(x_telegram_bot_api_secret_token: str = Header(None)) -> str:
# return true # uncomment to disable authentication
if x_telegram_bot_api_secret_token != secret_token:
raise HTTPException(status_code=403, detail="Not authenticated")
return x_telegram_bot_api_secret_token
@app.post("/webhook/")
async def handle_webhook(update: TelegramUpdate, token: str = Depends(auth_telegram_token)):
chat_id = update.message["chat"]["id"]
text = update.message["text"]
# print("Received message:", update.message)
if text == "/start":
with open('hello.gif', 'rb') as photo:
await bot.send_photo(chat_id=chat_id, photo=photo)
await bot.send_message(chat_id=chat_id, text="Welcome to Cyclic Starter Python Telegram Bot!")
else:
await bot.send_message(chat_id=chat_id, reply_to_message_id=update.message["message_id"], text="Yo!")
return {"ok": True}