You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1079 lines
36 KiB
Python

import base64
import json
import logging
import mimetypes
import os
import re
import subprocess
import time
import uuid
from pathlib import Path
from typing import Any, Optional, TypedDict, cast
from fastapi import FastAPI, File, Form, Request, UploadFile
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from litellm import completion
APP_DIR = Path(__file__).resolve().parent
TEMPLATES_DIR = APP_DIR / "templates"
RUNS_DIR = APP_DIR / "runs"
PUBLIC_DIR = APP_DIR / "public"
RATE_LIMIT_PATH = RUNS_DIR / "rate_limits.json"
def _normalize_base_path(value: str) -> str:
v = (value or "").strip()
if not v:
return ""
if not v.startswith("/"):
v = "/" + v
v = v.rstrip("/")
return v
BASE_PATH = _normalize_base_path(os.getenv("BASE_PATH", ""))
def _parse_model_list(env_value: str | None, default: list[str]) -> list[str]:
if not env_value:
return default
items = [x.strip() for x in env_value.split(",")]
return [x for x in items if x]
# Default models (Google Gemini via LiteLLM). Order = fallback order.
DEFAULT_MODELS = _parse_model_list(
os.getenv("LLM_MODELS"),
[
"gemini/gemini-3-pro-preview",
"gemini/gemini-3-flash-preview",
"gemini/gemini-flash-latest",
],
)
EDIT_MODELS = _parse_model_list(
os.getenv("EDIT_MODELS"),
[
"gemini/gemini-3-flash-preview",
"gemini/gemini-flash-latest",
],
)
CONVERT_DAILY_LIMIT = 5
EDIT_DAILY_LIMIT = 10
class RateLimitEntry(TypedDict, total=False):
day: str
convert: int
edit: int
class HistoryEntry(TypedDict, total=False):
ts: float
ip: str
action: str
latex: str
instructions: str
model: str
status: str
RUNS_DIR.mkdir(parents=True, exist_ok=True)
RATE_LIMIT_PATH.touch(exist_ok=True)
def _default_rate_limit_entry(day: str) -> RateLimitEntry:
return {"day": day, "convert": 0, "edit": 0}
def _entry_count(entry: RateLimitEntry, key: str) -> int:
value = entry.get(key)
return value if isinstance(value, int) else 0
def _history_path(run_dir: Path) -> Path:
return run_dir / "history.jsonl"
def _append_history_entry(run_dir: Path, entry: HistoryEntry) -> None:
record: HistoryEntry = {
"ts": entry.get("ts", time.time()),
"ip": entry.get("ip", "unknown"),
"action": entry.get("action", "edit"),
"latex": entry.get("latex", ""),
"instructions": entry.get("instructions", ""),
"model": entry.get("model", ""),
"status": entry.get("status", ""),
}
path = _history_path(run_dir)
with path.open("a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
def _load_history_for_ip(run_dir: Path, ip: str) -> list[HistoryEntry]:
path = _history_path(run_dir)
entries: list[HistoryEntry] = []
if not path.exists():
return entries
try:
with path.open("r", encoding="utf-8", errors="replace") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except Exception:
continue
if not isinstance(obj, dict):
continue
if obj.get("ip") != ip:
continue
entries.append(
{
"ts": obj.get("ts", 0.0),
"ip": obj.get("ip", ""),
"action": obj.get("action", ""),
"latex": obj.get("latex", ""),
"instructions": obj.get("instructions", ""),
"model": obj.get("model", ""),
"status": obj.get("status", ""),
}
)
except Exception:
return entries
return entries
def _setup_logging() -> None:
level_name = os.getenv("LOG_LEVEL", "INFO").upper()
level = getattr(logging, level_name, logging.INFO)
logging.basicConfig(
level=level,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
_setup_logging()
logger = logging.getLogger("latex-diagram-to-tikz")
def _load_rate_limits() -> dict[str, RateLimitEntry]:
if not RATE_LIMIT_PATH.exists():
return {}
try:
data = json.loads(RATE_LIMIT_PATH.read_text(encoding="utf-8", errors="replace"))
return cast(dict[str, RateLimitEntry], data) if isinstance(data, dict) else {}
except Exception:
return {}
def _save_rate_limits(data: dict[str, RateLimitEntry]) -> None:
RATE_LIMIT_PATH.write_text(json.dumps(data, indent=2, sort_keys=True) + "\n", encoding="utf-8")
def _consume_rate_limit(ip: str, *, key: str, limit: int) -> bool:
today = time.strftime("%Y-%m-%d")
data = _load_rate_limits()
raw_entry = data.get(ip)
if isinstance(raw_entry, dict):
entry: RateLimitEntry = cast(RateLimitEntry, raw_entry)
else:
entry = _default_rate_limit_entry(today)
last_day = entry.get("day") if isinstance(entry.get("day"), str) else ""
if last_day != today:
entry = _default_rate_limit_entry(today)
if key not in ("convert", "edit"):
return False
count = _entry_count(entry, key)
if count >= limit:
return False
entry[key] = count + 1
if "day" not in entry:
entry["day"] = today
data[ip] = entry
_save_rate_limits(data)
return True
def _rate_limit_status(ip: str) -> dict[str, int]:
today = time.strftime("%Y-%m-%d")
data = _load_rate_limits()
raw_entry = data.get(ip)
if isinstance(raw_entry, dict):
entry: RateLimitEntry = cast(RateLimitEntry, raw_entry)
else:
entry = _default_rate_limit_entry(today)
if entry.get("day") != today:
entry = _default_rate_limit_entry(today)
data[ip] = entry
_save_rate_limits(data)
convert_used = _entry_count(entry, "convert")
edit_used = _entry_count(entry, "edit")
return {
"convert_limit": CONVERT_DAILY_LIMIT,
"convert_used": convert_used,
"convert_remaining": max(CONVERT_DAILY_LIMIT - convert_used, 0),
"edit_limit": EDIT_DAILY_LIMIT,
"edit_used": edit_used,
"edit_remaining": max(EDIT_DAILY_LIMIT - edit_used, 0),
}
class RunLogger:
def __init__(self, run_dir: Path, py_logger: logging.Logger):
self.run_dir = run_dir
self.py = py_logger
self._log_path = run_dir / "run.log.txt"
def line(self, msg: str) -> None:
ts = time.strftime("%Y-%m-%d %H:%M:%S")
with self._log_path.open("a", encoding="utf-8") as f:
f.write(f"{ts} {msg}\n")
def section(self, title: str, body: str) -> None:
ts = time.strftime("%Y-%m-%d %H:%M:%S")
header = f"\n--- {title} ({ts}) ---\n"
text = (body or "").rstrip() + "\n"
with self._log_path.open("a", encoding="utf-8") as f:
f.write(header)
f.write(text)
CONVERT_SYSTEM_PROMPT = (
"You convert hand-drawn diagrams into clean, compilable LaTeX/TikZ. "
"Return EXACTLY ONE complete LaTeX document (not a fragment). "
'Output must start with \\"\\\\documentclass{standalone}\\" and end with \\"\\\\end{document}\\". '
"Return ONLY raw LaTeX source: no markdown, no code fences, no commentary, no extra text. "
"Wrap LaTeX to roughly 80 characters per line, format it cleanly, and include concise LaTeX comments that explain structure where useful."
)
CONVERT_PROMPT = (
"Convert the diagram in the image into a standalone LaTeX document that compiles with pdflatex.\n"
"Requirements:\n"
"- Output must be a complete LaTeX document starting with \\documentclass{standalone}.\n"
"- Use TikZ (\\usepackage{tikz}) and draw the diagram.\n"
"- The output must compile without external assets.\n"
"- Prefer simple primitives (nodes, lines, arrows, rectangles, circles, text).\n"
"- Keep it reasonably sized and centered; use consistent spacing and alignment.\n"
)
EDIT_SYSTEM_PROMPT = (
"You are an expert LaTeX/TikZ editor. "
"Given an existing standalone LaTeX/TikZ document and an edit request, return EXACTLY ONE updated complete LaTeX document. "
"Return ONLY raw LaTeX source: no markdown, no code fences, no commentary, no extra text. "
"The output must compile with pdflatex. "
"Wrap LaTeX to roughly 80 characters per line, keep formatting tidy, and add brief LaTeX comments to clarify sections when helpful."
)
EDIT_PROMPT_TEMPLATE = (
"Edit the following LaTeX document according to the instructions.\n"
"Rules:\n"
"- Return a complete standalone LaTeX document (\\documentclass{{standalone}} ... \\end{{document}}).\n"
"- Make minimal changes necessary to satisfy the instructions.\n"
"- Do not add external files or assets.\n\n"
"INSTRUCTIONS:\n{instructions}\n\n"
"LATEX:\n{latex}\n"
)
def _guess_mime(filename: str) -> str:
mime, _ = mimetypes.guess_type(filename)
return mime or "application/octet-stream"
def _to_data_url(image_bytes: bytes, mime: str) -> str:
b64 = base64.b64encode(image_bytes).decode("ascii")
return f"data:{mime};base64,{b64}"
def _extract_text_from_litellm(resp: Any) -> str:
# LiteLLM tries to be OpenAI-compatible but the return shape can vary.
try:
choice = resp["choices"][0]
message = choice.get("message") or {}
content = message.get("content")
except Exception:
content = None
if isinstance(content, str):
return content
# Sometimes content can be a list of parts.
if isinstance(content, list):
parts: list[str] = []
for part in content:
if isinstance(part, str):
parts.append(part)
elif isinstance(part, dict):
text = part.get("text")
if isinstance(text, str):
parts.append(text)
return "\n".join(parts).strip()
# Fallback to stringification.
return str(resp).strip()
def _litellm_call(*, model: str, messages: list[dict[str, Any]], **kwargs: Any) -> Any:
return completion(model=model, messages=messages, **kwargs)
def _litellm_with_retries(
*,
models: list[str],
messages: list[dict[str, Any]],
run_log: RunLogger,
**kwargs: Any,
) -> tuple[str, str]:
last_error: Exception | None = None
for model in models:
try:
run_log.line(f"llm.try model={model}")
resp = _litellm_call(model=model, messages=messages, **kwargs)
raw_text = _extract_text_from_litellm(resp)
text = (raw_text or "").strip()
if not text:
raise RuntimeError("Empty LLM response")
return model, raw_text
except Exception as e:
last_error = e
run_log.line(f"llm.fail model={model} error={e}")
# sleep between retries
time.sleep(1)
continue
raise RuntimeError(f"All models failed: {[m for m in models]} ({last_error})")
def _run_cmd(
cmd: list[str], cwd: Path, timeout_s: int = 120, py_logger: logging.Logger | None = None
) -> tuple[int, str]:
started = time.monotonic()
cmd_str = " ".join(cmd)
log = py_logger or logger
log.info("cmd.start cwd=%s timeout=%ss cmd=%s", str(cwd), timeout_s, cmd_str)
try:
proc = subprocess.run(
cmd,
cwd=str(cwd),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
timeout=timeout_s,
)
elapsed_ms = int((time.monotonic() - started) * 1000)
log.info(
"cmd.end rc=%s elapsed_ms=%s out_chars=%s cmd=%s",
proc.returncode,
elapsed_ms,
len(proc.stdout or ""),
cmd_str,
)
return proc.returncode, proc.stdout
except subprocess.TimeoutExpired as e:
elapsed_ms = int((time.monotonic() - started) * 1000)
if isinstance(e.stdout, (bytes, bytearray)):
out = bytes(e.stdout).decode("utf-8", errors="replace")
elif isinstance(e.stdout, str):
out = e.stdout
else:
out = ""
log.warning("cmd.timeout elapsed_ms=%s cmd=%s", elapsed_ms, cmd_str)
return 124, out + f"\n[timeout after {timeout_s}s]\n"
def _compile_pdflatex(
run_dir: Path, tex_filename: str, run_logger: RunLogger, section_title: str
) -> Optional[Path]:
cmd = [
"pdflatex",
"-interaction=nonstopmode",
"-halt-on-error",
"-file-line-error",
"-output-directory",
str(run_dir),
str(run_dir / tex_filename),
]
code, out = _run_cmd(cmd, cwd=run_dir, timeout_s=180, py_logger=run_logger.py)
run_logger.section(section_title, out)
pdf_path = run_dir / Path(tex_filename).with_suffix(".pdf").name
if code == 0 and pdf_path.exists():
return pdf_path
return None
def _render_png_with_magick(
run_dir: Path, pdf_path: Path, run_logger: RunLogger, section_title: str, png_name: str = "preview.png"
) -> Optional[Path]:
png_path = run_dir / png_name
# Use first page only; remove alpha to avoid transparency surprises.
cmd = [
"magick",
"-density",
"300",
str(pdf_path) + "[0]",
"-background",
"white",
"-alpha",
"remove",
"-alpha",
"off",
str(png_path),
]
code, out = _run_cmd(cmd, cwd=run_dir, timeout_s=180, py_logger=run_logger.py)
run_logger.section(section_title, out)
if code == 0 and png_path.exists():
return png_path
return None
def _render_svg_with_pdf2svg(
run_dir: Path, pdf_path: Path, run_logger: RunLogger, section_title: str, svg_name: str = "diagram.svg"
) -> Optional[Path]:
svg_path = run_dir / svg_name
cmd = ["pdf2svg", str(pdf_path), str(svg_path), "1"]
code, out = _run_cmd(cmd, cwd=run_dir, timeout_s=180, py_logger=run_logger.py)
run_logger.section(section_title, out)
if code == 0 and svg_path.exists():
return svg_path
return None
def _compile_latex_to_png(
run_dir: Path, tex_path: Path, run_logger: RunLogger, phase: str
) -> tuple[Optional[Path], Optional[Path]]:
"""Compile a .tex file with pdflatex and render first page to PNG via ImageMagick.
Logs outputs into run.log.txt and returns only the artifact paths.
"""
pdf_path = _compile_pdflatex(run_dir, tex_path.name, run_logger, section_title=f"{phase}.pdflatex")
if pdf_path is None:
return None, None
png_path = _render_png_with_magick(run_dir, pdf_path, run_logger, section_title=f"{phase}.magick")
return pdf_path, png_path
def _compile_phase(
run_dir: Path, tex_path: Path, run_log: RunLogger, phase: str, start_msg: str = "compile.start"
) -> tuple[Optional[Path], Optional[Path], Optional[Path], int]:
run_log.line(start_msg)
started = time.monotonic()
pdf_path, png_path = _compile_latex_to_png(run_dir, tex_path, run_log, phase=phase)
svg_path = (
_render_svg_with_pdf2svg(run_dir, pdf_path, run_log, section_title=f"{phase}.pdf2svg")
if pdf_path
else None
)
elapsed_ms = int((time.monotonic() - started) * 1000)
run_log.line(
f"compile.end elapsed_ms={elapsed_ms} ok_pdf={pdf_path is not None} ok_png={png_path is not None} ok_svg={svg_path is not None}"
)
return pdf_path, png_path, svg_path, elapsed_ms
RUN_ID_RE = re.compile(r"\d{4}-\d{2}-\d{2}-[0-9a-f]{32}")
def _new_run_id() -> str:
return f"{time.strftime('%Y-%m-%d')}-{uuid.uuid4().hex}"
def _write_run_meta(run_dir: Path, meta: dict[str, Any]) -> None:
(run_dir / "meta.json").write_text(json.dumps(meta, indent=2, sort_keys=True) + "\n", encoding="utf-8")
def _read_run_meta(run_dir: Path) -> dict[str, Any]:
path = run_dir / "meta.json"
if not path.exists():
return {}
try:
return json.loads(path.read_text(encoding="utf-8", errors="replace"))
except Exception:
return {}
def _get_meta_string(meta: dict[str, Any], key: str) -> str:
value = meta.get(key)
return value if isinstance(value, str) else ""
def _write_run_error(run_dir: Path, error_text: str) -> None:
(run_dir / "error.txt").write_text((error_text or "").strip() + "\n", encoding="utf-8")
def _read_run_error(run_dir: Path) -> str:
path = run_dir / "error.txt"
if not path.exists():
return ""
return path.read_text(encoding="utf-8", errors="replace").strip()
def _write_run_error_parts(run_dir: Path, parts: list[str]) -> None:
text = "\n\n".join([p for p in parts if p]).strip()
_write_run_error(run_dir, text)
def _safe_join_run_file(run_id: str, filename: str) -> Optional[Path]:
# Prevent path traversal.
if not RUN_ID_RE.fullmatch(run_id):
return None
if "/" in filename or "\\" in filename:
return None
run_dir = RUNS_DIR / run_id
candidate = (run_dir / filename).resolve()
try:
run_dir_resolved = run_dir.resolve()
except FileNotFoundError:
return None
if run_dir_resolved not in candidate.parents:
return None
return candidate
def _run_dir_for_id(run_id: str) -> Optional[Path]:
if not RUN_ID_RE.fullmatch(run_id):
return None
run_dir = RUNS_DIR / run_id
if not run_dir.exists() or not run_dir.is_dir():
return None
return run_dir
app = FastAPI()
app.mount("/public", StaticFiles(directory=str(PUBLIC_DIR)), name="public")
templates = Jinja2Templates(directory=str(TEMPLATES_DIR))
templates.env.globals["base_path"] = BASE_PATH
@app.get("/", response_class=HTMLResponse)
async def index(request: Request, error: str | None = None):
client_ip = request.client.host if request.client else "unknown"
rate_status = _rate_limit_status(client_ip)
return templates.TemplateResponse(
"index.html",
{
"request": request,
"error": error,
"base_path": BASE_PATH,
"convert_remaining": rate_status["convert_remaining"],
"convert_limit": rate_status["convert_limit"],
},
)
@app.post("/convert", response_class=HTMLResponse)
async def convert(request: Request, image: UploadFile = File(...), notes: str = Form("")):
client_ip = request.client.host if request.client else "unknown"
if not _consume_rate_limit(client_ip, key="convert", limit=5):
return RedirectResponse(url=f"{BASE_PATH}/?error=Rate%20limit%20exceeded", status_code=303)
models = DEFAULT_MODELS
run_id = _new_run_id()
run_dir = RUNS_DIR / run_id
run_dir.mkdir(parents=True, exist_ok=True)
run_log = RunLogger(run_dir, logger)
started = time.monotonic()
logger.info("run.start run_id=%s", run_id)
run_log.line(f"convert.start models={models}")
notes = (notes or "").strip()
# Save original upload
original_bytes = await image.read()
if not original_bytes:
logger.warning("run.empty_upload run_id=%s", run_id)
return RedirectResponse(url=f"{BASE_PATH}/?error=Empty%20upload", status_code=303)
original_name = image.filename or "upload"
ext = Path(original_name).suffix
if not ext:
ext = ".png"
original_path = run_dir / f"original{ext}"
original_path.write_bytes(original_bytes)
logger.info(
"upload.saved run_id=%s filename=%s content_type=%s bytes=%s path=%s",
run_id,
original_name,
image.content_type,
len(original_bytes),
str(original_path),
)
run_log.line(
f"upload.saved filename={original_name} content_type={image.content_type} bytes={len(original_bytes)} path={original_path.name}",
)
mime = image.content_type or _guess_mime(original_name)
data_url = _to_data_url(original_bytes, mime)
# Call LLM (vision)
llm_error = ""
raw_text = ""
tex = ""
used_model = ""
try:
llm_started = time.monotonic()
run_log.line("llm.start")
if notes:
run_log.section("convert.notes", notes)
user_text = CONVERT_PROMPT
if notes:
user_text = f"{user_text}\n\nAdditional notes from user:\n{notes}"
used_model, raw_text = _litellm_with_retries(
models=models,
messages=[
{"role": "system", "content": CONVERT_SYSTEM_PROMPT},
{
"role": "user",
"content": [
{"type": "text", "text": user_text},
{"type": "image_url", "image_url": {"url": data_url}},
],
},
],
run_log=run_log,
)
tex = (raw_text or "").strip()
llm_elapsed_ms = int((time.monotonic() - llm_started) * 1000)
logger.info(
"llm.ok run_id=%s elapsed_ms=%s raw_chars=%s tex_chars=%s",
run_id,
llm_elapsed_ms,
len(raw_text or ""),
len(tex or ""),
)
run_log.line(
f"llm.ok model={used_model} elapsed_ms={llm_elapsed_ms} raw_chars={len(raw_text or '')} tex_chars={len(tex or '')}",
)
except Exception as e:
llm_error = f"LLM call failed: {e}"
logger.exception("llm.error run_id=%s", run_id)
run_log.line(f"llm.error {llm_error}")
tex_path = run_dir / "diagram.tex"
tex_path.write_text(tex or "% LLM failed to produce LaTeX\n", encoding="utf-8")
run_log.line(f"tex.written path={tex_path.name} chars={len(tex or '')}")
# Compile and render
pdf_path: Optional[Path] = None
png_path: Optional[Path] = None
svg_path: Optional[Path] = None
if tex:
pdf_path, png_path, svg_path, cp_elapsed_ms = _compile_phase(
run_dir, tex_path, run_log, phase="convert"
)
logger.info(
"compile.done run_id=%s elapsed_ms=%s ok_pdf=%s ok_png=%s ok_svg=%s",
run_id,
cp_elapsed_ms,
pdf_path is not None,
png_path is not None,
svg_path is not None,
)
run_log.section("convert.llm_raw", raw_text or "")
convert_model = used_model or (models[0] if models else "")
_write_run_meta(
run_dir,
{
"convert_model": convert_model,
"last_edit_model": "",
"created_at": time.time(),
"updated_at": time.time(),
},
)
_write_run_error_parts(
run_dir,
[
llm_error,
"pdflatex failed. See run.log.txt in the run folder." if (tex and pdf_path is None) else "",
"ImageMagick render failed. See run.log.txt in the run folder."
if (pdf_path and not png_path)
else "",
"SVG render failed. See run.log.txt in the run folder." if (pdf_path and not svg_path) else "",
],
)
elapsed_ms = int((time.monotonic() - started) * 1000)
logger.info(
"run.end run_id=%s elapsed_ms=%s ok_pdf=%s ok_png=%s",
run_id,
elapsed_ms,
pdf_path is not None,
png_path is not None,
)
run_log.line(
f"convert.end elapsed_ms={elapsed_ms} ok_pdf={pdf_path is not None} ok_png={png_path is not None}",
)
return RedirectResponse(url=f"{BASE_PATH}/{run_id}", status_code=303)
@app.get("/{run_id}", response_class=HTMLResponse)
async def get_run_result(request: Request, run_id: str):
run_dir = _run_dir_for_id(run_id)
if run_dir is None:
return HTMLResponse("Not found", status_code=404)
client_ip = request.client.host if request.client else "unknown"
rate_status = _rate_limit_status(client_ip)
tex_path = run_dir / "diagram.tex"
pdf_path = run_dir / "diagram.pdf"
png_path = run_dir / "preview.png"
svg_path = run_dir / "diagram.svg"
tex = tex_path.read_text(encoding="utf-8", errors="replace") if tex_path.exists() else ""
png_url = f"{BASE_PATH}/runs/{run_id}/{png_path.name}" if png_path.exists() else None
original_candidates = sorted(run_dir.glob("original.*"))
original_path = original_candidates[0] if original_candidates else (run_dir / "original")
meta = _read_run_meta(run_dir)
convert_model = _get_meta_string(meta, "convert_model") or _get_meta_string(meta, "model")
edit_model = _get_meta_string(meta, "last_edit_model")
if not edit_model:
model_str = _get_meta_string(meta, "model")
if model_str.endswith("(edit)"):
edit_model = model_str.replace("(edit)", "").strip()
if not convert_model:
convert_model = DEFAULT_MODELS[0] if DEFAULT_MODELS else ""
error_text = _read_run_error(run_dir) or None
def file_url(name: str) -> str:
return f"{BASE_PATH}/runs/{run_id}/{name}"
download_pdf_url = file_url(pdf_path.name) if pdf_path.exists() else file_url("diagram.pdf")
download_svg_url = file_url(svg_path.name) if svg_path.exists() else file_url("diagram.svg")
return templates.TemplateResponse(
"edit.html",
{
"request": request,
"run_id": run_id,
"convert_model": convert_model,
"last_edit_model": edit_model,
"tex": tex,
"png_url": png_url,
"download_original_url": file_url(original_path.name),
"download_tex_url": file_url(tex_path.name),
"download_pdf_url": download_pdf_url,
"download_png_url": file_url("preview.png"),
"download_svg_url": download_svg_url,
"error": error_text,
"base_path": BASE_PATH,
"edit_remaining": rate_status["edit_remaining"],
"edit_limit": rate_status["edit_limit"],
},
)
@app.post("/{run_id}", response_class=HTMLResponse)
async def edit_tex(request: Request, run_id: str, instructions: str = Form(...), latex: str = Form(...)):
run_dir = _run_dir_for_id(run_id)
if run_dir is None:
return RedirectResponse(url=f"{BASE_PATH}/?error=Invalid%20run%20id", status_code=303)
client_ip = request.client.host if request.client else "unknown"
if not _consume_rate_limit(client_ip, key="edit", limit=10):
return HTMLResponse("Rate limit exceeded", status_code=429)
instructions = (instructions or "").strip()
if not instructions:
_write_run_error(run_dir, "Edit instructions are empty.")
return RedirectResponse(url=f"{BASE_PATH}/{run_id}", status_code=303)
tex_path = run_dir / "diagram.tex"
current_tex = latex
if not isinstance(current_tex, str) or not current_tex.strip():
if tex_path.exists():
current_tex = tex_path.read_text(encoding="utf-8", errors="replace")
else:
return RedirectResponse(url=f"{BASE_PATH}/?error=Missing%20diagram.tex", status_code=303)
meta = _read_run_meta(run_dir)
started = time.monotonic()
logger.info("edit.start run_id=%s", run_id)
run_log = RunLogger(run_dir, logger)
run_log.line(f"edit.start models={EDIT_MODELS}")
edit_system = EDIT_SYSTEM_PROMPT
edit_user = EDIT_PROMPT_TEMPLATE.format(instructions=instructions, latex=current_tex)
run_log.section("edit.request", f"instructions=\n{instructions}\n")
llm_error = ""
raw_text = ""
new_tex = ""
used_model = ""
try:
llm_started = time.monotonic()
run_log.line("llm.start")
used_model, raw_text = _litellm_with_retries(
models=EDIT_MODELS,
messages=[
{"role": "system", "content": edit_system},
{"role": "user", "content": edit_user},
],
run_log=run_log,
)
new_tex = (raw_text or "").strip()
llm_elapsed_ms = int((time.monotonic() - llm_started) * 1000)
logger.info(
"edit.llm.ok run_id=%s elapsed_ms=%s raw_chars=%s tex_chars=%s",
run_id,
llm_elapsed_ms,
len(raw_text or ""),
len(new_tex or ""),
)
run_log.line(
f"llm.ok model={used_model} elapsed_ms={llm_elapsed_ms} raw_chars={len(raw_text or '')} tex_chars={len(new_tex or '')}",
)
except Exception as e:
llm_error = f"LLM call failed: {e}"
logger.exception("edit.llm.error run_id=%s", run_id)
run_log.line(f"llm.error {llm_error}")
if new_tex:
tex_path.write_text(new_tex, encoding="utf-8")
run_log.line(f"tex.written path={tex_path.name} chars={len(new_tex)}")
pdf_path: Optional[Path] = None
png_path: Optional[Path] = None
svg_path: Optional[Path] = None
if new_tex:
pdf_path, png_path, svg_path, cp_elapsed_ms = _compile_phase(run_dir, tex_path, run_log, phase="edit")
logger.info(
"edit.compile.done run_id=%s elapsed_ms=%s ok_pdf=%s ok_png=%s ok_svg=%s",
run_id,
cp_elapsed_ms,
pdf_path is not None,
png_path is not None,
svg_path is not None,
)
history_status = "ok" if pdf_path else "error"
_append_history_entry(
run_dir,
{
"ts": time.time(),
"ip": client_ip,
"action": "edit",
"latex": new_tex,
"instructions": instructions,
"model": used_model or (EDIT_MODELS[0] if EDIT_MODELS else ""),
"status": history_status,
},
)
run_log.section("edit.llm_raw", raw_text or "")
convert_model = _get_meta_string(meta, "convert_model") or _get_meta_string(meta, "model")
last_model_label = _get_meta_string(meta, "model")
last_edit_model = _get_meta_string(meta, "last_edit_model")
if not convert_model and last_model_label.endswith("(edit)"):
convert_model = last_model_label.replace("(edit)", "").strip()
default_convert_model = convert_model or last_model_label or (DEFAULT_MODELS[0] if DEFAULT_MODELS else "")
if new_tex:
edit_model = used_model or (EDIT_MODELS[0] if EDIT_MODELS else "")
_write_run_meta(
run_dir,
{
"convert_model": default_convert_model,
"last_edit_model": edit_model,
"created_at": meta.get("created_at", time.time()),
"updated_at": time.time(),
},
)
else:
_write_run_meta(
run_dir,
{
"convert_model": default_convert_model,
"last_edit_model": last_edit_model,
"created_at": meta.get("created_at", time.time()),
"updated_at": time.time(),
},
)
_write_run_error_parts(
run_dir,
[
llm_error,
"pdflatex failed. See run.log.txt in the run folder." if (new_tex and pdf_path is None) else "",
"ImageMagick render failed. See run.log.txt in the run folder."
if (pdf_path and not png_path)
else "",
"SVG render failed. See run.log.txt in the run folder." if (pdf_path and not svg_path) else "",
],
)
elapsed_ms = int((time.monotonic() - started) * 1000)
logger.info(
"edit.end run_id=%s elapsed_ms=%s ok_pdf=%s ok_png=%s",
run_id,
elapsed_ms,
pdf_path is not None,
png_path is not None,
)
run_log.line(
f"edit.end elapsed_ms={elapsed_ms} ok_pdf={pdf_path is not None} ok_png={png_path is not None}",
)
return RedirectResponse(url=f"{BASE_PATH}/{run_id}", status_code=303)
@app.post("/{run_id}/compile", response_class=HTMLResponse)
async def compile_tex(request: Request, run_id: str, latex: str = Form(...)):
run_dir = _run_dir_for_id(run_id)
if run_dir is None:
return RedirectResponse(url=f"{BASE_PATH}/?error=Invalid%20run%20id", status_code=303)
client_ip = request.client.host if request.client else "unknown"
tex = latex if isinstance(latex, str) else ""
tex_path = run_dir / "diagram.tex"
tex_path.write_text(tex, encoding="utf-8")
started = time.monotonic()
logger.info("compile.start run_id=%s", run_id)
run_log = RunLogger(run_dir, logger)
run_log.line("compile.start")
pdf_path: Optional[Path] = None
png_path: Optional[Path] = None
svg_path: Optional[Path] = None
if tex.strip():
pdf_path, png_path, svg_path, cp_elapsed_ms = _compile_phase(
run_dir, tex_path, run_log, phase="compile", start_msg="compile.invoke"
)
logger.info(
"compile.done run_id=%s elapsed_ms=%s ok_pdf=%s ok_png=%s ok_svg=%s",
run_id,
cp_elapsed_ms,
pdf_path is not None,
png_path is not None,
svg_path is not None,
)
history_status = "ok" if pdf_path else "error"
_append_history_entry(
run_dir,
{
"ts": time.time(),
"ip": client_ip,
"action": "compile",
"latex": tex,
"instructions": "",
"model": "",
"status": history_status,
},
)
meta = _read_run_meta(run_dir)
model_label = _get_meta_string(meta, "model")
convert_model = _get_meta_string(meta, "convert_model") or model_label
last_edit_model = _get_meta_string(meta, "last_edit_model")
if not convert_model and model_label.endswith("(edit)"):
convert_model = model_label.replace("(edit)", "").strip()
default_convert_model = convert_model or model_label or (DEFAULT_MODELS[0] if DEFAULT_MODELS else "")
_write_run_meta(
run_dir,
{
"convert_model": default_convert_model,
"last_edit_model": last_edit_model,
"created_at": meta.get("created_at", time.time()),
"updated_at": time.time(),
},
)
_write_run_error_parts(
run_dir,
[
"LaTeX is empty." if not tex.strip() else "",
"pdflatex failed. See run.log.txt in the run folder."
if (tex.strip() and pdf_path is None)
else "",
"ImageMagick render failed. See run.log.txt in the run folder."
if (tex.strip() and pdf_path is not None and png_path is None)
else "",
"SVG render failed. See run.log.txt in the run folder."
if (tex.strip() and pdf_path is not None and svg_path is None)
else "",
],
)
elapsed_ms = int((time.monotonic() - started) * 1000)
logger.info("compile.end run_id=%s elapsed_ms=%s", run_id, elapsed_ms)
run_log.line(f"compile.finish elapsed_ms={elapsed_ms}")
return RedirectResponse(url=f"{BASE_PATH}/{run_id}", status_code=303)
@app.get("/{run_id}/history")
async def get_history(request: Request, run_id: str):
run_dir = _run_dir_for_id(run_id)
if run_dir is None:
return JSONResponse({"entries": []}, status_code=404)
client_ip = request.client.host if request.client else "unknown"
entries = _load_history_for_ip(run_dir, client_ip)
entries_sorted = sorted(entries, key=lambda e: e.get("ts", 0), reverse=True)
safe_entries: list[dict[str, Any]] = []
for entry in entries_sorted:
safe_entries.append(
{
"ts": entry.get("ts", 0.0),
"action": entry.get("action", ""),
"latex": entry.get("latex", ""),
"instructions": entry.get("instructions", ""),
"model": entry.get("model", ""),
"status": entry.get("status", ""),
}
)
return JSONResponse({"entries": safe_entries})
@app.get("/runs/{run_id}/{filename}")
async def get_run_file(run_id: str, filename: str):
path = _safe_join_run_file(run_id, filename)
if path is None or not path.exists() or not path.is_file():
logger.info("download.not_found run_id=%s filename=%s", run_id, filename)
return HTMLResponse("Not found", status_code=404)
logger.info("download.ok run_id=%s filename=%s", run_id, filename)
return FileResponse(path)
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host=os.getenv("HOST", "127.0.0.1"),
port=int(os.getenv("PORT", "8000")),
reload=bool(os.getenv("RELOAD", "1") == "1"),
)