You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

404 lines
10 KiB
Python

1 year ago
import os
import sqlite3
import telebot
from telebot import types
from datetime import datetime, timedelta
from dotenv import load_dotenv
import uuid
import textwrap
# Load environment variables from .env file
load_dotenv()
# Telegram bot token
BOT_TOKEN = os.getenv("BOT_TOKEN")
TELEGRAM_ADMIN_ID = os.getenv("TELEGRAM_ADMIN_ID")
#
# Error handling
#
class ExceptionHandler(telebot.ExceptionHandler):
def handle(self, exception):
print(exception)
bot = telebot.TeleBot(BOT_TOKEN, parse_mode="MARKDOWN")
bot.send_message(TELEGRAM_ADMIN_ID, "Il BOT è crashato")
bot.send_message(TELEGRAM_ADMIN_ID, f"`{exception}`")
return True
# Initialize the bot
bot = telebot.TeleBot(
BOT_TOKEN,
parse_mode="MARKDOWN",
exception_handler=ExceptionHandler(),
)
# SQLite database setup
conn = sqlite3.connect("cibo-aula-stud.local.db", check_same_thread=False)
conn.execute("PRAGMA foreign_keys = 1")
cursor = conn.cursor()
cursor.execute(
"""CREATE TABLE IF NOT EXISTS proposte (
id TEXT PRIMARY KEY,
owner_id TEXT NOT NULL,
name TEXT NOT NULL,
description TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expiration_datetime TIMESTAMP DEFAULT NULL,
FOREIGN KEY (owner_id) REFERENCES utenti (telegram_id)
)"""
)
conn.commit()
cursor.execute(
"""CREATE TABLE IF NOT EXISTS ordinazioni (
id TEXT PRIMARY KEY,
owner_id TEXT NOT NULL,
proposta_id TEXT NOT NULL,
content TEXT NOT NULL,
FOREIGN KEY (owner_id) REFERENCES utenti (telegram_id),
1 year ago
FOREIGN KEY (proposta_id) REFERENCES proposte (id)
)"""
)
conn.commit()
cursor.execute(
"""
CREATE TRIGGER IF NOT EXISTS cleanup_ordinazioni
AFTER DELETE ON proposte
BEGIN
DELETE FROM ordinazioni WHERE proposta_id = old.id;
END
"""
)
conn.commit()
1 year ago
cursor.execute(
"""CREATE TABLE IF NOT EXISTS utenti (
telegram_id TEXT PRIMARY KEY,
fullname TEXT NOT NULL,
notification BOOLEAN NOT NULL DEFAULT FALSE
)"""
)
conn.commit()
#
# Delete old proposte
#
import threading
def every_hour():
threading.Timer(60 * 60, every_hour).start()
every_hour()
#
# Bot Commands
#
@bot.message_handler(commands=["start"])
def handle_start(message):
bot.send_message(
message.chat.id,
"Benvenuto!",
)
cursor.execute(
"""
INSERT OR IGNORE INTO utenti(telegram_id, fullname) VALUES (?, ?)
""",
(str(message.from_user.id), message.from_user.username),
)
conn.commit()
@bot.message_handler(commands=["status"])
def handle_status(message):
owner_id = str(message.from_user.id)
print(owner_id)
ordinazioni_per_tue_proposte = list(
cursor.execute(
"""
SELECT * FROM proposte p INNER JOIN ordinazioni o ON p.id = o.proposta_id INNER JOIN utenti u ON o.owner_id = u.telegram_id WHERE p.owner_id = ?
""",
(owner_id,),
)
)
nl = "\n"
bot.send_message(message.chat.id, f"*Ordinazioni alle tue proposte*")
if len(ordinazioni_per_tue_proposte) == 0:
bot.send_message(message.chat.id, f"Nessuno ha ancora ordinato qualcosa")
else:
for o in ordinazioni_per_tue_proposte:
(proposta_id, _, name, description, _, _, _, _, _, ordinazione, _, username, _) = o
bot.send_message(message.chat.id, f"- Ordine di @{username} per _{name}_{nl}{ordinazione}")
1 year ago
tue_ordinazione = list(
cursor.execute(
"""
SELECT * FROM ordinazioni o INNER JOIN proposte p ON o.proposta_id = p.id INNER JOIN utenti u ON p.owner_id = u.telegram_id WHERE o.owner_id = ?
""",
(owner_id,),
)
)
nl = "\n"
bot.send_message(message.chat.id, f"*Le tue ordinazioni*")
if len(tue_ordinazione) == 0:
bot.send_message(message.chat.id, f"Non ha ancora ordinato nulla")
else:
for o in tue_ordinazione:
(_, _, _, ordinazione, _, _, name, description, _, _, _, username, _) = o
bot.send_message(message.chat.id, f"- Ordine per _{name}_ di @{username}{nl}{ordinazione}")
def conversazione(func):
"""
Questa annotazione va applicata ad una conversazione a più step e permette di usare
1 year ago
message = yield
1 year ago
per mettere in pausa il codice e riprendere quando l'utente invia un messaggio di risposta.
"""
def inner(message):
conv = func(message)
registro_conversazioni[message.chat.id] = conv
next(conv)
return inner
@bot.message_handler(commands=["stop"])
def handle_stop(message):
del registro_conversazioni[message.chat.id]
1 year ago
@bot.message_handler(commands=["nuova_proposta"])
@conversazione
1 year ago
def handle_nuova_proposta(message):
"""
Chiede all'utente nome e descrizione della proposta e poi la aggiunge al database
"""
1 year ago
bot.send_message(
message.chat.id,
"Inserisci nome e descrizione dell'ordine (sulla prima riga il nome ed il resto dopo un accapo sarà la descrizione, invia /stop per uscire)",
)
message = yield
1 year ago
parts = message.text.split("\n", 1)
proposta_id = str(uuid.uuid4())
owner_id = str(message.from_user.id)
name = parts[0]
description = parts[1] if len(parts) > 1 else ""
testo = ""
testo += f"*Proposta aggiunta con successo*" + "\n"
testo += f"Nome: {name}" + "\n"
testo += f"Descrizione: {description}" + "\n"
bot.send_message(message.chat.id, testo)
cursor.execute(
"""
INSERT INTO proposte(id, owner_id, name, description) VALUES (?, ?, ?, ?)
""",
(proposta_id, owner_id, name, description),
)
conn.commit()
# Notifichiamo della proposta gli utenti che hanno le notifiche attivate
users_to_notify = list(
cursor.execute(
"""
SELECT * FROM utenti WHERE notification = TRUE
""",
)
)
for utente in users_to_notify:
(user_id, _, _) = utente
if user_id == owner_id:
continue
testo = ""
testo += f"*Nuova proposta* da @{message.from_user.username}" + "\n"
testo += f"Nome: {name}" + "\n"
testo += f"Descrizione: {description}" + "\n"
bot.send_message(user_id, testo)
1 year ago
@bot.message_handler(commands=["proposte"])
def handle_proposte(message):
owner_id = str(message.from_user.id)
proposte = list(
cursor.execute(
"""
SELECT * FROM proposte ORDER BY created_at ASC
"""
)
)
bot.send_message(message.chat.id, "*Lista delle tue proposte*")
testo = ""
for i, p in enumerate(proposte):
(uuid, _, name, description, _, expiration_time) = p
testo += f"- *{name}*" + "\n"
if len(description.strip()) > 0:
testo += f"{description}" + "\n"
if testo == "":
testo = "Non hai ancora creato nessuna proposta"
bot.send_message(message.chat.id, testo)
@bot.message_handler(commands=["nuova_ordinazione"])
@conversazione
1 year ago
def handle_nuova_ordinazione(message):
owner_id = str(message.from_user.id)
proposte = list(
cursor.execute(
"""
SELECT * FROM proposte ORDER BY created_at ASC
""",
)
)
indicizzazione = {}
testo_proposte = "Lista delle proposte:\n"
1 year ago
for i, p in enumerate(proposte):
(proposta_id, _, name, description, _, expiration_time) = p
1 year ago
indicizzazione[str(i + 1)] = proposta_id
1 year ago
testo_proposte += f"{i + 1}. *{name}*:" + "\n"
testo_proposte += f"{description}" + "\n"
1 year ago
bot.send_message(message.chat.id, testo_proposte)
1 year ago
bot.send_message(message.chat.id, "Invia il numero della proposta a cui vuoi aggiungere un'ordinazione")
while True:
message = yield
indice = message.text.strip()
if indice in indicizzazione:
break
indici = ", ".join(str(k) for k in indicizzazione.keys())
bot.send_message(message.chat.id, f"Numero non valido, inviane uno tra {indici}")
proposta_id = indicizzazione[indice]
bot.send_message(message.chat.id, f"Ok, ora dì cosa vuoi ordinare")
message = yield
ordinazione_id = str(uuid.uuid4())
content = message.text.strip()
cursor.execute(
"""
INSERT INTO ordinazioni(id, owner_id, proposta_id, content) VALUES (?, ?, ?, ?)
""",
(ordinazione_id, str(message.from_user.id), proposta_id, content),
)
conn.commit()
# Notifica il creatore dell'ordine ed il corrispondente creatore della proposta del nuovo ordine
bot.send_message(message.chat.id, f"Ok, ordinazione aggiunta")
(proposta_owner_id, proposta_name) = cursor.execute(
"""
SELECT owner_id, name from proposte WHERE id = ?
""",
(proposta_id,),
).fetchone()
testo_proposte = ""
testo_proposte = f"@{message.from_user.username} ha aggiunto un'ordinazione a _{proposta_name}_" + "\n"
bot.send_message(proposta_owner_id, testo_proposte)
return
1 year ago
@bot.message_handler(commands=["attiva_notifiche"])
def handle_attiva_notifiche(message):
owner_id = str(message.from_user.id)
cursor.execute(
"""
1 year ago
UPDATE utenti SET notification = TRUE WHERE telegram_id = ?
""",
(owner_id,),
)
conn.commit()
bot.send_message(message.chat.id, "Notifiche attivate")
@bot.message_handler(commands=["disattiva_notifiche"])
def handle_disattiva_notifiche(message):
owner_id = str(message.from_user.id)
cursor.execute(
"""
1 year ago
UPDATE utenti SET notification = FALSE WHERE telegram_id = ?
""",
(owner_id,),
)
conn.commit()
bot.send_message(message.chat.id, "Notifiche disattivate")
# Dizionario da chat a generatore per quella conversazione
registro_conversazioni = dict()
1 year ago
@bot.message_handler()
def handle_conversazione(message):
print(f"handle_conversazione: {message.text}")
if message.chat.id in registro_conversazioni:
conv = registro_conversazioni[message.chat.id]
try:
result = conv.send(message)
print("Generator:", result)
except StopIteration:
del registro_conversazioni[message.chat.id]
1 year ago
return
bot.send_message(message.chat.id, "Non hai cominciato una conversazione! Usa uno dei comandi")
print("Starting the bot...")
# Start the bot
bot.infinity_polling()