You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

378 lines
10 KiB
Python

import os
import sqlite3
import telebot
from telebot import types
from datetime import datetime, timedelta
from dotenv import load_dotenv
import uuid
import textwrap
# Load environment variables from .env file
load_dotenv()
# Telegram bot token
BOT_TOKEN = os.getenv("BOT_TOKEN")
TELEGRAM_ADMIN_ID = os.getenv("TELEGRAM_ADMIN_ID")
#
# Error handling
#
class ExceptionHandler(telebot.ExceptionHandler):
def handle(self, exception):
print(exception)
bot = telebot.TeleBot(BOT_TOKEN, parse_mode="MARKDOWN")
bot.send_message(TELEGRAM_ADMIN_ID, "Il BOT è crashato")
bot.send_message(TELEGRAM_ADMIN_ID, f"`{exception}`")
return True
# Initialize the bot
bot = telebot.TeleBot(
BOT_TOKEN,
parse_mode="MARKDOWN",
exception_handler=ExceptionHandler(),
)
# SQLite database setup
conn = sqlite3.connect("cibo-aula-stud.local.db", check_same_thread=False)
conn.execute("PRAGMA foreign_keys = 1")
cursor = conn.cursor()
cursor.execute(
"""CREATE TABLE IF NOT EXISTS proposte (
id TEXT PRIMARY KEY,
owner_id TEXT NOT NULL,
name TEXT NOT NULL,
description TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expiration_datetime TIMESTAMP DEFAULT NULL,
FOREIGN KEY (owner_id) REFERENCES utenti (telegram_id)
ON DELETE CASCADE
)"""
)
conn.commit()
cursor.execute(
"""CREATE TABLE IF NOT EXISTS ordinazioni (
id TEXT PRIMARY KEY,
owner_id TEXT NOT NULL,
proposta_id TEXT NOT NULL,
content TEXT NOT NULL,
FOREIGN KEY (owner_id) REFERENCES utenti (telegram_id)
ON DELETE CASCADE,
FOREIGN KEY (proposta_id) REFERENCES proposte (id)
ON DELETE CASCADE
)"""
)
conn.commit()
cursor.execute(
"""CREATE TABLE IF NOT EXISTS utenti (
telegram_id TEXT PRIMARY KEY,
fullname TEXT NOT NULL,
notification BOOLEAN NOT NULL DEFAULT FALSE
)"""
)
conn.commit()
#
# Delete old proposte
#
import threading
def every_hour():
threading.Timer(60 * 60, every_hour).start()
every_hour()
#
# Bot Commands
#
@bot.message_handler(commands=["start"])
def handle_start(message):
bot.send_message(
message.chat.id,
"Benvenuto!",
)
cursor.execute(
"""
INSERT OR IGNORE INTO utenti(telegram_id, fullname) VALUES (?, ?)
""",
(str(message.from_user.id), message.from_user.username),
)
conn.commit()
@bot.message_handler(commands=["status"])
def handle_status(message):
owner_id = str(message.from_user.id)
print(owner_id)
ordinazioni_per_tue_proposte = list(
cursor.execute(
"""
SELECT * FROM proposte p INNER JOIN ordinazioni o ON p.id = o.proposta_id INNER JOIN utenti u ON o.owner_id = u.telegram_id WHERE p.owner_id = ?
""",
(owner_id,),
)
)
nl = "\n"
bot.send_message(message.chat.id, f"*Ordinazioni alle tue proposte:*")
for o in ordinazioni_per_tue_proposte:
(proposta_id, _, name, description, _, _, _, _, _, ordinazione, _, username, _) = o
bot.send_message(message.chat.id, f"- _{name}_ per @{username} {nl}{ordinazione}")
tue_ordinazione = list(
cursor.execute(
"""
SELECT * FROM ordinazioni o INNER JOIN proposte p ON o.proposta_id = p.id INNER JOIN utenti u ON p.owner_id = u.telegram_id WHERE o.owner_id = ?
""",
(owner_id,),
)
)
nl = "\n"
bot.send_message(message.chat.id, f"*Le tue ordinazioni:*")
for o in tue_ordinazione:
(_, _, _, ordinazione, _, _, name, description, _, _, _, username, _) = o
bot.send_message(message.chat.id, f"- _{name}_ creata da @{username}{nl}{ordinazione}")
conversazioni_nuova_ordinazione = dict()
conversazioni_proposte = dict()
@bot.message_handler(commands=["nuova_proposta"])
def handle_nuova_proposta(message):
bot.send_message(
message.chat.id,
"Inserisci nome e descrizione dell'ordine (sulla prima riga il nome ed il resto dopo un accapo sarà la descrizione, invia /stop per uscire)",
)
print(f"Inizio conversazione con @{message.chat.username} per aggiungere una proposta")
conversazioni_proposte[message.chat.id] = True
@bot.message_handler(commands=["proposte"])
def handle_proposte(message):
owner_id = str(message.from_user.id)
proposte = list(
cursor.execute(
"""
SELECT * FROM proposte ORDER BY created_at ASC
"""
)
)
bot.send_message(message.chat.id, "*Lista delle tue proposte*")
testo = ""
for i, p in enumerate(proposte):
(uuid, _, name, description, _, expiration_time) = p
testo += f"- *{name}*" + "\n"
if len(description.strip()) > 0:
testo += f"{description}" + "\n"
if testo == "":
testo = "Non hai ancora creato nessuna proposta"
bot.send_message(message.chat.id, testo)
conversazioni_nuova_ordinazione = dict()
@bot.message_handler(commands=["nuova_ordinazione"])
def handle_nuova_ordinazione(message):
owner_id = str(message.from_user.id)
proposte = list(
cursor.execute(
"""
SELECT * FROM proposte ORDER BY created_at ASC
""",
)
)
testo = "Lista delle proposte:\n"
for i, p in enumerate(proposte):
(uuid, _, name, description, _, expiration_time) = p
testo += textwrap.dedent(
f"""
{i + 1}. *{name}*:
{description}
"""
)
indicizzazione = {str(i + 1): p[0] for i, p in enumerate(proposte)}
print(indicizzazione)
conversazioni_nuova_ordinazione[message.chat.id] = {
"indicizzazione": indicizzazione,
"proposta_id": None,
}
bot.send_message(message.chat.id, testo)
bot.send_message(message.chat.id, "Invia il numero della proposta a cui vuoi aggiungere un'ordinazione")
@bot.message_handler(commands=["attiva_notifiche"])
def handle_attiva_notifiche(message):
owner_id = str(message.from_user.id)
cursor.execute(
f"""
UPDATE utenti SET notification = TRUE WHERE telegram_id = ?
""",
(owner_id,),
)
conn.commit()
bot.send_message(message.chat.id, "Notifiche attivate")
@bot.message_handler(commands=["disattiva_notifiche"])
def handle_disattiva_notifiche(message):
owner_id = str(message.from_user.id)
cursor.execute(
f"""
UPDATE utenti SET notification = FALSE WHERE telegram_id = ?
""",
(owner_id,),
)
conn.commit()
bot.send_message(message.chat.id, "Notifiche disattivate")
@bot.message_handler()
def handle_conversazione(message):
print(f"handle_conversazione: {message.text}")
if message.chat.id in conversazioni_proposte:
print(f"Continuo conversazione con @{message.from_user.username} per aggiungere una proposta")
parts = message.text.split("\n", 1)
proposta_id = str(uuid.uuid4())
owner_id = str(message.from_user.id)
name = parts[0]
description = parts[1] if len(parts) > 1 else ""
testo = ""
testo += f"*Proposta aggiunta con successo*" + "\n"
testo += f"Nome: {name}" + "\n"
testo += f"Descrizione: {description}" + "\n"
bot.send_message(message.chat.id, testo)
cursor.execute(
"""
INSERT INTO proposte(id, owner_id, name, description) VALUES (?, ?, ?, ?)
""",
(proposta_id, owner_id, name, description),
)
conn.commit()
del conversazioni_proposte[message.chat.id]
users_to_notify = list(
cursor.execute(
"""
SELECT * FROM utenti WHERE notification = TRUE
""",
)
)
for utente in users_to_notify:
(user_id, _, _) = utente
print(repr(user_id), repr(owner_id))
if user_id == owner_id:
continue
testo = ""
testo += f"*Nuova proposta* da @{message.from_user.username}" + "\n"
testo += f"Nome: {name}" + "\n"
testo += f"Descrizione: {description}" + "\n"
bot.send_message(user_id, testo)
return
if message.chat.id in conversazioni_nuova_ordinazione:
conv = conversazioni_nuova_ordinazione[message.chat.id]
if conv["proposta_id"] is None:
indice = message.text.strip()
if indice not in conv["indicizzazione"]:
indici = ", ".join(str(k) for k in conv["indicizzazione"].keys())
bot.send_message(message.chat.id, f"Numero non valido, inviane uno tra {indici}")
return
proposta_uuid = conv["indicizzazione"][indice]
conv["proposta_id"] = proposta_uuid
bot.send_message(message.chat.id, f"Ok, ora dì cosa vuoi ordinare")
print(conv)
return
else:
ordinazione_id = str(uuid.uuid4())
ordinazione = message.text.strip()
cursor.execute(
f"""
INSERT INTO ordinazioni(id, owner_id, proposta_id, content) VALUES (?, ?, ?, ?)
""",
(ordinazione_id, str(message.from_user.id), conv["proposta_id"], ordinazione),
)
conn.commit()
del conversazioni_nuova_ordinazione[message.chat.id]
# Notifica al creatore della proposta del nuovo ordine
bot.send_message(message.chat.id, f"Ok, ordinazione aggiunta")
(proposta_owner_id, proposta_name) = cursor.execute(
f"""
SELECT owner_id, name from proposte WHERE id = ?
""",
(conv["proposta_id"],),
).fetchone()
testo = ""
testo = f"@{message.from_user.username} ha aggiunto un'ordinazione a _{proposta_name}_" + "\n"
bot.send_message(proposta_owner_id, testo)
return
bot.send_message(message.chat.id, "Non hai cominciato una conversazione! Usa uno dei comandi")
print("Starting the bot...")
# Start the bot
bot.infinity_polling()