import os import sqlite3 import telebot from telebot import types from datetime import datetime, timedelta from dotenv import load_dotenv import uuid import textwrap # Load environment variables from .env file load_dotenv() # Telegram bot token BOT_TOKEN = os.getenv("BOT_TOKEN") TELEGRAM_ADMIN_ID = os.getenv("TELEGRAM_ADMIN_ID") # # Error handling # class ExceptionHandler(telebot.ExceptionHandler): def handle(self, exception): print(exception) bot = telebot.TeleBot(BOT_TOKEN, parse_mode="MARKDOWN") bot.send_message(TELEGRAM_ADMIN_ID, "Il BOT è crashato") bot.send_message(TELEGRAM_ADMIN_ID, f"`{exception}`") return True # Initialize the bot bot = telebot.TeleBot( BOT_TOKEN, parse_mode="MARKDOWN", exception_handler=ExceptionHandler(), ) # SQLite database setup conn = sqlite3.connect("cibo-aula-stud.local.db", check_same_thread=False) conn.execute("PRAGMA foreign_keys = 1") cursor = conn.cursor() cursor.execute( """CREATE TABLE IF NOT EXISTS proposte ( id TEXT PRIMARY KEY, owner_id TEXT NOT NULL, name TEXT NOT NULL, description TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, expiration_datetime TIMESTAMP DEFAULT NULL, FOREIGN KEY (owner_id) REFERENCES utenti (telegram_id) )""" ) conn.commit() cursor.execute( """CREATE TABLE IF NOT EXISTS ordinazioni ( id TEXT PRIMARY KEY, owner_id TEXT NOT NULL, proposta_id TEXT NOT NULL, content TEXT NOT NULL, FOREIGN KEY (owner_id) REFERENCES utenti (telegram_id), FOREIGN KEY (proposta_id) REFERENCES proposte (id) )""" ) conn.commit() cursor.execute( """ CREATE TRIGGER IF NOT EXISTS cleanup_ordinazioni AFTER DELETE ON proposte BEGIN DELETE FROM ordinazioni WHERE proposta_id = old.id; END """ ) conn.commit() cursor.execute( """CREATE TABLE IF NOT EXISTS utenti ( telegram_id TEXT PRIMARY KEY, fullname TEXT NOT NULL, notification BOOLEAN NOT NULL DEFAULT FALSE )""" ) conn.commit() # # Delete old proposte # import threading def every_hour(): threading.Timer(60 * 60, every_hour).start() every_hour() # # Bot Commands # @bot.message_handler(commands=["start"]) def handle_start(message): bot.send_message( message.chat.id, "Benvenuto!", ) cursor.execute( """ INSERT OR IGNORE INTO utenti(telegram_id, fullname) VALUES (?, ?) """, (str(message.from_user.id), message.from_user.username), ) conn.commit() @bot.message_handler(commands=["status"]) def handle_status(message): owner_id = str(message.from_user.id) print(owner_id) ordinazioni_per_tue_proposte = list( cursor.execute( """ SELECT * FROM proposte p INNER JOIN ordinazioni o ON p.id = o.proposta_id INNER JOIN utenti u ON o.owner_id = u.telegram_id WHERE p.owner_id = ? """, (owner_id,), ) ) nl = "\n" bot.send_message(message.chat.id, f"*Ordinazioni alle tue proposte*") if len(ordinazioni_per_tue_proposte) == 0: bot.send_message(message.chat.id, f"Nessuno ha ancora ordinato qualcosa") else: for o in ordinazioni_per_tue_proposte: (proposta_id, _, name, description, _, _, _, _, _, ordinazione, _, username, _) = o bot.send_message(message.chat.id, f"- Ordine di @{username} per _{name}_{nl}{ordinazione}") tue_ordinazione = list( cursor.execute( """ SELECT * FROM ordinazioni o INNER JOIN proposte p ON o.proposta_id = p.id INNER JOIN utenti u ON p.owner_id = u.telegram_id WHERE o.owner_id = ? """, (owner_id,), ) ) nl = "\n" bot.send_message(message.chat.id, f"*Le tue ordinazioni*") if len(tue_ordinazione) == 0: bot.send_message(message.chat.id, f"Non ha ancora ordinato nulla") else: for o in tue_ordinazione: (_, _, _, ordinazione, _, _, name, description, _, _, _, username, _) = o bot.send_message(message.chat.id, f"- Ordine per _{name}_ di @{username}{nl}{ordinazione}") def conversazione(func): """ Questa annotazione va applicata ad una conversazione a più step e permette di usare message = yield per mettere in pausa il codice e riprendere quando l'utente invia un messaggio di risposta. """ def inner(message): conv = func(message) registro_conversazioni[message.chat.id] = conv next(conv) return inner @bot.message_handler(commands=["stop"]) def handle_stop(message): if message.chat.id in registro_conversazioni: del registro_conversazioni[message.chat.id] else: bot.send_message(message.chat.id, "Boh in realtà stop non serve, non stavi facendo niente") @bot.message_handler(commands=["nuova_proposta"]) @conversazione def handle_nuova_proposta(message): """ Chiede all'utente nome e descrizione della proposta e poi la aggiunge al database """ bot.send_message( message.chat.id, "Inserisci nome e descrizione dell'ordine (sulla prima riga il nome ed il resto dopo un accapo sarà la descrizione, invia /stop per uscire)", ) message = yield parts = message.text.split("\n", 1) proposta_id = str(uuid.uuid4()) owner_id = str(message.from_user.id) name = parts[0] description = parts[1] if len(parts) > 1 else "" testo = "" testo += f"*Proposta aggiunta con successo*" + "\n" testo += f"Nome: {name}" + "\n" testo += f"Descrizione: {description}" + "\n" bot.send_message(message.chat.id, testo) cursor.execute( """ INSERT INTO proposte(id, owner_id, name, description) VALUES (?, ?, ?, ?) """, (proposta_id, owner_id, name, description), ) conn.commit() # Notifichiamo della proposta gli utenti che hanno le notifiche attivate users_to_notify = list( cursor.execute( """ SELECT * FROM utenti WHERE notification = TRUE """, ) ) for utente in users_to_notify: (user_id, _, _) = utente if user_id == owner_id: continue testo = "" testo += f"*Nuova proposta* da @{message.from_user.username}" + "\n" testo += f"Nome: {name}" + "\n" testo += f"Descrizione: {description}" + "\n" bot.send_message(user_id, testo) @bot.message_handler(commands=["proposte"]) def handle_proposte(message): owner_id = str(message.from_user.id) proposte = list( cursor.execute( """ SELECT * FROM proposte ORDER BY created_at ASC """ ) ) bot.send_message(message.chat.id, "*Lista delle tue proposte*") testo = "" for i, p in enumerate(proposte): (uuid, _, name, description, _, expiration_time) = p testo += f"- *{name}*" + "\n" if len(description.strip()) > 0: testo += f"{description}" + "\n" if testo == "": testo = "Non hai ancora creato nessuna proposta" bot.send_message(message.chat.id, testo) @bot.message_handler(commands=["nuova_ordinazione"]) @conversazione def handle_nuova_ordinazione(message): owner_id = str(message.from_user.id) proposte = list( cursor.execute( """ SELECT * FROM proposte ORDER BY created_at ASC """, ) ) indicizzazione = {} testo_proposte = "Lista delle proposte:\n" for i, p in enumerate(proposte): (proposta_id, _, name, description, _, expiration_time) = p indicizzazione[str(i + 1)] = proposta_id testo_proposte += f"{i + 1}. *{name}*:" + "\n" testo_proposte += f"{description}" + "\n" bot.send_message(message.chat.id, testo_proposte) bot.send_message(message.chat.id, "Invia il numero della proposta a cui vuoi aggiungere un'ordinazione") while True: message = yield indice = message.text.strip() if indice in indicizzazione: break indici = ", ".join(str(k) for k in indicizzazione.keys()) bot.send_message(message.chat.id, f"Numero non valido, inviane uno tra {indici}") proposta_id = indicizzazione[indice] bot.send_message(message.chat.id, f"Ok, ora dì cosa vuoi ordinare") message = yield ordinazione_id = str(uuid.uuid4()) content = message.text.strip() cursor.execute( """ INSERT INTO ordinazioni(id, owner_id, proposta_id, content) VALUES (?, ?, ?, ?) """, (ordinazione_id, str(message.from_user.id), proposta_id, content), ) conn.commit() # Notifica il creatore dell'ordine ed il corrispondente creatore della proposta del nuovo ordine bot.send_message(message.chat.id, f"Ok, ordinazione aggiunta") (proposta_owner_id, proposta_name) = cursor.execute( """ SELECT owner_id, name from proposte WHERE id = ? """, (proposta_id,), ).fetchone() testo_proposte = "" testo_proposte = f"@{message.from_user.username} ha aggiunto un'ordinazione a _{proposta_name}_" + "\n" bot.send_message(proposta_owner_id, testo_proposte) return @bot.message_handler(commands=["attiva_notifiche"]) def handle_attiva_notifiche(message): owner_id = str(message.from_user.id) cursor.execute( """ UPDATE utenti SET notification = TRUE WHERE telegram_id = ? """, (owner_id,), ) conn.commit() bot.send_message(message.chat.id, "Notifiche attivate") @bot.message_handler(commands=["disattiva_notifiche"]) def handle_disattiva_notifiche(message): owner_id = str(message.from_user.id) cursor.execute( """ UPDATE utenti SET notification = FALSE WHERE telegram_id = ? """, (owner_id,), ) conn.commit() bot.send_message(message.chat.id, "Notifiche disattivate") # Dizionario da chat a generatore per quella conversazione registro_conversazioni = dict() @bot.message_handler() def handle_conversazione(message): print(f"handle_conversazione: {message.text}") if message.chat.id in registro_conversazioni: conv = registro_conversazioni[message.chat.id] try: result = conv.send(message) print("Generator:", result) except StopIteration: del registro_conversazioni[message.chat.id] return bot.send_message(message.chat.id, "Non hai cominciato una conversazione! Usa uno dei comandi") print("Starting the bot...") # Start the bot bot.infinity_polling()