primo prototipo di server e bot insieme

sclero
Antonio De Lucreziis 10 months ago
parent f97307527b
commit 25300f4421

4
.gitignore vendored

@ -4,6 +4,8 @@
# Python
env/
venv/
__pycache__/
# Editors
.vscode/
.vscode/

@ -1,54 +1,72 @@
import os
import asyncio
from dotenv import load_dotenv
from telebot import types
from telebot.async_telebot import AsyncTeleBot
load_dotenv()
BOT_TOKEN = os.getenv("BOT_TOKEN")
import config
bot = AsyncTeleBot(config.BOT_TOKEN)
bot = AsyncTeleBot(BOT_TOKEN)
def is_mathematician(user_id) -> bool:
return True
# Handle '/start' and '/help'
@bot.message_handler(commands=['help', 'start'])
@bot.message_handler(commands=["help", "start"])
async def send_welcome(message):
markup = types.InlineKeyboardMarkup()
btn = types.InlineKeyboardButton('Autenticati', callback_data='authenticate')
btn = types.InlineKeyboardButton("Autenticati", callback_data="authenticate")
markup.add(btn)
await bot.reply_to(message, """\
await bot.reply_to(
message,
"""\
Benvenuto nel bot di ingresso per i gruppi telegram del dipartimento di matematica dell'università di Pisa! Per poter accedere, fai l'autenticazione con il tuo account di ateneo tramite Google. Clicca il pulsante qui sotto!\
""", reply_markup=markup)
buttons = { # {name of the button: courses}
'Primo Anno 📚': 'Analisi 1 🧮: http://example.com/matematica\n Fisica 🧲: http://example.com/fisica\n Informatica 💻: http://example.com/informatica',
'Secondo Anno 📚': 'Algebra 1 : http://example.com/algebra\n Analisi 2 🧮: http://example.com/analisi2\n Programmazione 💻: http://example.com/programmazione',
'Terzo Anno 📚': 'Meccanica Razionale 🧲: http://example.com/meccraz\n Calcolo Scientifico 🧮: http://example.com/cs\n LPL 💻: http://example.com/lpl',
'Magistrale 🎓': 'Intelligenza Artificiale 🤖: http://example.com/intelligenzaartificiale\n Data Science 📊: http://example.com/datascience\n Cyber Security 🔒: http://example.com/cybersecurity',
'Miscellanea 📝': 'Spam 📬: http://example.com/spam\n Eventi 🎉: http://example.com/eventi\n• PHC 🏥: http://example.com/phc',
""",
reply_markup=markup,
)
buttons = { # {name of the button: courses}
"Primo Anno 📚": "Analisi 1 🧮: http://example.com/matematica\n Fisica 🧲: http://example.com/fisica\n Informatica 💻: http://example.com/informatica",
"Secondo Anno 📚": "Algebra 1 : http://example.com/algebra\n Analisi 2 🧮: http://example.com/analisi2\n Programmazione 💻: http://example.com/programmazione",
"Terzo Anno 📚": "Meccanica Razionale 🧲: http://example.com/meccraz\n Calcolo Scientifico 🧮: http://example.com/cs\n LPL 💻: http://example.com/lpl",
"Magistrale 🎓": "Intelligenza Artificiale 🤖: http://example.com/intelligenzaartificiale\n Data Science 📊: http://example.com/datascience\n Cyber Security 🔒: http://example.com/cybersecurity",
"Miscellanea 📝": "Spam 📬: http://example.com/spam\n Eventi 🎉: http://example.com/eventi\n• PHC 🏥: http://example.com/phc",
}
# Handle button click
@bot.callback_query_handler(func=lambda call: call.data == 'authenticate')
@bot.callback_query_handler(func=lambda call: call.data == "authenticate")
async def handle_authenticate(call):
await bot.answer_callback_query(call.id, text="Autenticazione completata") # Avoids to leave the button pressed and sends a message on screen (not in chat)
await bot.answer_callback_query(
call.id, text="Autenticazione completata"
) # Avoids to leave the button pressed and sends a message on screen (not in chat)
if is_mathematician(call.from_user.id):
markup = types.InlineKeyboardMarkup()
for button in buttons:
markup.add(types.InlineKeyboardButton(button, callback_data=button))
await bot.send_message(call.message.chat.id, 'Autenticazione riuscita! Adesso puoi accedere a tutti i gruppi disponibili presenti', reply_markup=markup)
await bot.send_message(
call.message.chat.id,
"Autenticazione riuscita! Adesso puoi accedere a tutti i gruppi disponibili presenti",
reply_markup=markup,
)
else:
await bot.send_message(call.message.chat.id, 'Sembra che tu non sia iscritto al corso di Laurea in matematica. Se pensi che sia un errore o desideri essere aggiunto, manda una mail a macchinisti@phc.dm.unipi.it. Altrimenti puoi chiedere di farti aggiungere manualmente ai gruppi a cui sei interessato da altri studenti di matematica che sono già iscritti.')
await bot.send_message(
call.message.chat.id,
"Sembra che tu non sia iscritto al corso di Laurea in matematica. Se pensi che sia un errore o desideri essere aggiunto, manda una mail a macchinisti@phc.dm.unipi.it. Altrimenti puoi chiedere di farti aggiungere manualmente ai gruppi a cui sei interessato da altri studenti di matematica che sono già iscritti.",
)
# Handle button click
@bot.callback_query_handler(func=lambda call: call.data in buttons)
async def handle_buttons(call):
await bot.answer_callback_query(call.id) # Avoids to leave the button pressed, no message is sent on screen
await bot.send_message(call.message.chat.id, buttons[call.data])
asyncio.run(bot.polling())
async def polling():
print("Starting telegram bot polling...")
return await bot.polling()

@ -0,0 +1,19 @@
from dotenv import load_dotenv
import os
load_dotenv()
BOT_TOKEN = os.getenv("BOT_TOKEN")
OAUTH_CLIENT_ID = os.getenv("OAUTH_CLIENT_ID")
OAUTH_CLIENT_SECRET = os.getenv("OAUTH_CLIENT_SECRET")
OAUTH_AUTH_URL = os.getenv("OAUTH_AUTH_URL")
OAUTH_TOKEN_HOST = os.getenv("OAUTH_TOKEN_HOST")
OAUTH_TOKEN_PATH = os.getenv("OAUTH_TOKEN_PATH")
OAUTH_REDIRECT_URL = os.getenv("OAUTH_REDIRECT_URL")
OAUTH_USER_INFO_URL = os.getenv("OAUTH_USER_INFO_URL")
OAUTH_SCOPES = os.getenv("OAUTH_SCOPES")
DATABASE_URL = os.getenv("DATABASE_URL")

Binary file not shown.

@ -0,0 +1,76 @@
import secrets
import sqlite3
conn = sqlite3.connect("database.db")
conn.execute(
"""
CREATE TABLE IF NOT EXISTS users (
telegram_id INTEGER PRIMARY KEY,
oauth_id TEXT,
UNIQUE (telegram_id, oauth_id)
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS tokens (
telegram_id INTEGER,
token TEXT,
FOREIGN KEY (telegram_id) REFERENCES users (telegram_id),
UNIQUE (telegram_id, token)
)
"""
)
conn.commit()
def create_user(telegram_id, oauth_id):
# insert the user into the database
conn.execute(
"""
INSERT INTO users (telegram_id, oauth_id) VALUES (?, ?)
""",
(telegram_id, oauth_id),
)
conn.commit()
def create_token(telegram_id):
# delete any existing tokens for this user
conn.execute(
"""
DELETE FROM tokens WHERE telegram_id = ?
""",
(telegram_id,),
)
# generate a new random token using crypt
token = secrets.token_hex(16)
# insert the token into the database
conn.execute(
"""
INSERT INTO tokens (telegram_id, token) VALUES (?, ?)
""",
(telegram_id, token),
)
conn.commit()
return token
def get_telegram_id(token):
# get the telegram id from the database
cursor = conn.execute(
"""
SELECT telegram_id FROM tokens WHERE token = ?
""",
(token,),
)
# fetch the result
result = cursor.fetchone()
# return the telegram id
return result[0] if result else None

@ -0,0 +1,19 @@
import asyncio
import config
import bot
import server
import signal
import sys
async def main():
await asyncio.gather(
bot.polling(),
server.start(),
)
if __name__ == "__main__":
asyncio.run(main())

@ -0,0 +1,2 @@
[tool.black]
line-length = 100

@ -1,2 +1,34 @@
aiohttp==3.9.1
aiosignal==1.3.1
annotated-types==0.6.0
anyio==4.2.0
attrs==23.2.0
black==23.12.1
certifi==2023.11.17
charset-normalizer==3.3.2
click==8.1.7
fastapi==0.109.0
frozenlist==1.4.1
h11==0.14.0
httptools==0.6.1
idna==3.6
multidict==6.0.4
mypy-extensions==1.0.0
packaging==23.2
pathspec==0.12.1
platformdirs==4.1.0
pydantic==2.5.3
pydantic_core==2.14.6
pyTelegramBotAPI==4.15.2
python-dotenv==1.0.0
PyYAML==6.0.1
requests==2.31.0
sniffio==1.3.0
starlette==0.35.1
typing_extensions==4.9.0
urllib3==2.1.0
uvicorn==0.26.0
uvloop==0.19.0
watchfiles==0.21.0
websockets==12.0
yarl==1.9.4

@ -0,0 +1,66 @@
from typing import Union
import uvicorn
from fastapi import FastAPI
from fastapi.responses import RedirectResponse
import config
import database
app = FastAPI()
@app.get("/login")
async def login(token: str):
return RedirectResponse(
f"{config.OAUTH_AUTH_URL}?client_id={config.OAUTH_CLIENT_ID}&redirect_uri={config.OAUTH_REDIRECT_URL}&response_type=code&scope=email&state={token}"
)
@app.get("/callback")
async def oauth_callback(code: str, state: str):
# get the telegram id from the database using the state token
telegram_id = database.get_telegram_id(state)
# do a user info request to get the email address
import requests
response = requests.post(
config.OAUTH_TOKEN_HOST + config.OAUTH_TOKEN_PATH,
data={
"code": code,
"client_id": config.OAUTH_CLIENT_ID,
"client_secret": config.OAUTH_CLIENT_SECRET,
"redirect_uri": config.OAUTH_REDIRECT_URL,
"grant_type": "authorization_code",
},
)
print(response.json())
access_token = response.json()["access_token"]
response = requests.get(
config.OAUTH_USER_INFO_URL,
headers={"Authorization": f"Bearer {access_token}"},
)
print(response.json())
email = response.json()["email"]
# store the username in the database, split at "@" and take the first part
username = email.split("@")[0]
telegram_id = database.get_telegram_id(state)
database.create_user(telegram_id, username)
# redirect to the bot
return RedirectResponse(f"https://t.me/MyBot", status_code=303)
async def start():
config = uvicorn.Config(app=app, port=8000, log_level="info", loop="asyncio")
server = uvicorn.Server(config)
await server.serve()
Loading…
Cancel
Save