You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

407 lines
10 KiB
Python

import os
import sqlite3
import telebot
from telebot import types
from datetime import datetime, timedelta
from dotenv import load_dotenv
import uuid
import textwrap
# Load environment variables from .env file
load_dotenv()
# Telegram bot token
BOT_TOKEN = os.getenv("BOT_TOKEN")
TELEGRAM_ADMIN_ID = os.getenv("TELEGRAM_ADMIN_ID")
#
# Error handling
#
class ExceptionHandler(telebot.ExceptionHandler):
def handle(self, exception):
print(exception)
bot = telebot.TeleBot(BOT_TOKEN, parse_mode="MARKDOWN")
bot.send_message(TELEGRAM_ADMIN_ID, "Il BOT è crashato")
bot.send_message(TELEGRAM_ADMIN_ID, f"`{exception}`")
return True
# Initialize the bot
bot = telebot.TeleBot(
BOT_TOKEN,
parse_mode="MARKDOWN",
exception_handler=ExceptionHandler(),
)
# SQLite database setup
conn = sqlite3.connect("cibo-aula-stud.local.db", check_same_thread=False)
conn.execute("PRAGMA foreign_keys = 1")
cursor = conn.cursor()
cursor.execute(
"""CREATE TABLE IF NOT EXISTS proposte (
id TEXT PRIMARY KEY,
owner_id TEXT NOT NULL,
name TEXT NOT NULL,
description TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expiration_datetime TIMESTAMP DEFAULT NULL,
FOREIGN KEY (owner_id) REFERENCES utenti (telegram_id)
)"""
)
conn.commit()
cursor.execute(
"""CREATE TABLE IF NOT EXISTS ordinazioni (
id TEXT PRIMARY KEY,
owner_id TEXT NOT NULL,
proposta_id TEXT NOT NULL,
content TEXT NOT NULL,
FOREIGN KEY (owner_id) REFERENCES utenti (telegram_id),
FOREIGN KEY (proposta_id) REFERENCES proposte (id)
)"""
)
conn.commit()
cursor.execute(
"""
CREATE TRIGGER IF NOT EXISTS cleanup_ordinazioni
AFTER DELETE ON proposte
BEGIN
DELETE FROM ordinazioni WHERE proposta_id = old.id;
END
"""
)
conn.commit()
cursor.execute(
"""CREATE TABLE IF NOT EXISTS utenti (
telegram_id TEXT PRIMARY KEY,
fullname TEXT NOT NULL,
notification BOOLEAN NOT NULL DEFAULT FALSE
)"""
)
conn.commit()
#
# Delete old proposte
#
import threading
def every_hour():
threading.Timer(60 * 60, every_hour).start()
every_hour()
#
# Bot Commands
#
@bot.message_handler(commands=["start"])
def handle_start(message):
bot.send_message(
message.chat.id,
"Benvenuto!",
)
cursor.execute(
"""
INSERT OR IGNORE INTO utenti(telegram_id, fullname) VALUES (?, ?)
""",
(str(message.from_user.id), message.from_user.username),
)
conn.commit()
@bot.message_handler(commands=["status"])
def handle_status(message):
owner_id = str(message.from_user.id)
print(owner_id)
ordinazioni_per_tue_proposte = list(
cursor.execute(
"""
SELECT * FROM proposte p INNER JOIN ordinazioni o ON p.id = o.proposta_id INNER JOIN utenti u ON o.owner_id = u.telegram_id WHERE p.owner_id = ?
""",
(owner_id,),
)
)
nl = "\n"
bot.send_message(message.chat.id, f"*Ordinazioni alle tue proposte*")
if len(ordinazioni_per_tue_proposte) == 0:
bot.send_message(message.chat.id, f"Nessuno ha ancora ordinato qualcosa")
else:
for o in ordinazioni_per_tue_proposte:
(proposta_id, _, name, description, _, _, _, _, _, ordinazione, _, username, _) = o
bot.send_message(message.chat.id, f"- Ordine di @{username} per _{name}_{nl}{ordinazione}")
tue_ordinazione = list(
cursor.execute(
"""
SELECT * FROM ordinazioni o INNER JOIN proposte p ON o.proposta_id = p.id INNER JOIN utenti u ON p.owner_id = u.telegram_id WHERE o.owner_id = ?
""",
(owner_id,),
)
)
nl = "\n"
bot.send_message(message.chat.id, f"*Le tue ordinazioni*")
if len(tue_ordinazione) == 0:
bot.send_message(message.chat.id, f"Non ha ancora ordinato nulla")
else:
for o in tue_ordinazione:
(_, _, _, ordinazione, _, _, name, description, _, _, _, username, _) = o
bot.send_message(message.chat.id, f"- Ordine per _{name}_ di @{username}{nl}{ordinazione}")
def conversazione(func):
"""
Questa annotazione va applicata ad una conversazione a più step e permette di usare
message = yield
per mettere in pausa il codice e riprendere quando l'utente invia un messaggio di risposta.
"""
def inner(message):
conv = func(message)
registro_conversazioni[message.chat.id] = conv
next(conv)
return inner
@bot.message_handler(commands=["stop"])
def handle_stop(message):
if message.chat.id in registro_conversazioni:
del registro_conversazioni[message.chat.id]
else:
bot.send_message(message.chat.id, "Boh in realtà stop non serve, non stavi facendo niente")
@bot.message_handler(commands=["nuova_proposta"])
@conversazione
def handle_nuova_proposta(message):
"""
Chiede all'utente nome e descrizione della proposta e poi la aggiunge al database
"""
bot.send_message(
message.chat.id,
"Inserisci nome e descrizione dell'ordine (sulla prima riga il nome ed il resto dopo un accapo sarà la descrizione, invia /stop per uscire)",
)
message = yield
parts = message.text.split("\n", 1)
proposta_id = str(uuid.uuid4())
owner_id = str(message.from_user.id)
name = parts[0]
description = parts[1] if len(parts) > 1 else ""
testo = ""
testo += f"*Proposta aggiunta con successo*" + "\n"
testo += f"Nome: {name}" + "\n"
testo += f"Descrizione: {description}" + "\n"
bot.send_message(message.chat.id, testo)
cursor.execute(
"""
INSERT INTO proposte(id, owner_id, name, description) VALUES (?, ?, ?, ?)
""",
(proposta_id, owner_id, name, description),
)
conn.commit()
# Notifichiamo della proposta gli utenti che hanno le notifiche attivate
users_to_notify = list(
cursor.execute(
"""
SELECT * FROM utenti WHERE notification = TRUE
""",
)
)
for utente in users_to_notify:
(user_id, _, _) = utente
if user_id == owner_id:
continue
testo = ""
testo += f"*Nuova proposta* da @{message.from_user.username}" + "\n"
testo += f"Nome: {name}" + "\n"
testo += f"Descrizione: {description}" + "\n"
bot.send_message(user_id, testo)
@bot.message_handler(commands=["proposte"])
def handle_proposte(message):
owner_id = str(message.from_user.id)
proposte = list(
cursor.execute(
"""
SELECT * FROM proposte ORDER BY created_at ASC
"""
)
)
bot.send_message(message.chat.id, "*Lista delle tue proposte*")
testo = ""
for i, p in enumerate(proposte):
(uuid, _, name, description, _, expiration_time) = p
testo += f"- *{name}*" + "\n"
if len(description.strip()) > 0:
testo += f"{description}" + "\n"
if testo == "":
testo = "Non hai ancora creato nessuna proposta"
bot.send_message(message.chat.id, testo)
@bot.message_handler(commands=["nuova_ordinazione"])
@conversazione
def handle_nuova_ordinazione(message):
owner_id = str(message.from_user.id)
proposte = list(
cursor.execute(
"""
SELECT * FROM proposte ORDER BY created_at ASC
""",
)
)
indicizzazione = {}
testo_proposte = "Lista delle proposte:\n"
for i, p in enumerate(proposte):
(proposta_id, _, name, description, _, expiration_time) = p
indicizzazione[str(i + 1)] = proposta_id
testo_proposte += f"{i + 1}. *{name}*:" + "\n"
testo_proposte += f"{description}" + "\n"
bot.send_message(message.chat.id, testo_proposte)
bot.send_message(message.chat.id, "Invia il numero della proposta a cui vuoi aggiungere un'ordinazione")
while True:
message = yield
indice = message.text.strip()
if indice in indicizzazione:
break
indici = ", ".join(str(k) for k in indicizzazione.keys())
bot.send_message(message.chat.id, f"Numero non valido, inviane uno tra {indici}")
proposta_id = indicizzazione[indice]
bot.send_message(message.chat.id, f"Ok, ora dì cosa vuoi ordinare")
message = yield
ordinazione_id = str(uuid.uuid4())
content = message.text.strip()
cursor.execute(
"""
INSERT INTO ordinazioni(id, owner_id, proposta_id, content) VALUES (?, ?, ?, ?)
""",
(ordinazione_id, str(message.from_user.id), proposta_id, content),
)
conn.commit()
# Notifica il creatore dell'ordine ed il corrispondente creatore della proposta del nuovo ordine
bot.send_message(message.chat.id, f"Ok, ordinazione aggiunta")
(proposta_owner_id, proposta_name) = cursor.execute(
"""
SELECT owner_id, name from proposte WHERE id = ?
""",
(proposta_id,),
).fetchone()
testo_proposte = ""
testo_proposte = f"@{message.from_user.username} ha aggiunto un'ordinazione a _{proposta_name}_" + "\n"
bot.send_message(proposta_owner_id, testo_proposte)
return
@bot.message_handler(commands=["attiva_notifiche"])
def handle_attiva_notifiche(message):
owner_id = str(message.from_user.id)
cursor.execute(
"""
UPDATE utenti SET notification = TRUE WHERE telegram_id = ?
""",
(owner_id,),
)
conn.commit()
bot.send_message(message.chat.id, "Notifiche attivate")
@bot.message_handler(commands=["disattiva_notifiche"])
def handle_disattiva_notifiche(message):
owner_id = str(message.from_user.id)
cursor.execute(
"""
UPDATE utenti SET notification = FALSE WHERE telegram_id = ?
""",
(owner_id,),
)
conn.commit()
bot.send_message(message.chat.id, "Notifiche disattivate")
# Dizionario da chat a generatore per quella conversazione
registro_conversazioni = dict()
@bot.message_handler()
def handle_conversazione(message):
print(f"handle_conversazione: {message.text}")
if message.chat.id in registro_conversazioni:
conv = registro_conversazioni[message.chat.id]
try:
result = conv.send(message)
print("Generator:", result)
except StopIteration:
del registro_conversazioni[message.chat.id]
return
bot.send_message(message.chat.id, "Non hai cominciato una conversazione! Usa uno dei comandi")
print("Starting the bot...")
# Start the bot
bot.infinity_polling()