import base64 import hashlib import json import logging import mimetypes import os import re import shutil import subprocess import time import uuid from pathlib import Path from typing import Any, Optional, TypedDict, cast from fastapi import FastAPI, File, Form, Request, UploadFile from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from litellm import completion APP_DIR = Path(__file__).resolve().parent TEMPLATES_DIR = APP_DIR / "templates" RUNS_DIR = APP_DIR / "runs" PUBLIC_DIR = APP_DIR / "public" RATE_LIMIT_PATH = RUNS_DIR / "rate_limits.json" def _normalize_base_path(value: str) -> str: v = (value or "").strip() if not v: return "" if not v.startswith("/"): v = "/" + v v = v.rstrip("/") return v BASE_PATH = _normalize_base_path(os.getenv("BASE_PATH", "")) CONVERT_DAILY_LIMIT = int(os.getenv("CONVERT_DAILY_LIMIT", "5")) EDIT_DAILY_LIMIT = int(os.getenv("EDIT_DAILY_LIMIT", "10")) PDFLATEX_TIMEOUT_SECONDS = int(os.getenv("PDFLATEX_TIMEOUT_SECONDS", "10")) def _parse_model_list(env_value: str | None, default: list[str]) -> list[str]: if not env_value: return default items = [x.strip() for x in env_value.split(",")] return [x for x in items if x] # Default models (Google Gemini via LiteLLM). Order = fallback order. DEFAULT_MODELS = _parse_model_list( os.getenv("LLM_MODELS"), [ "gemini/gemini-3-pro-preview", "gemini/gemini-3-flash-preview", "gemini/gemini-flash-latest", ], ) EDIT_MODELS = _parse_model_list( os.getenv("EDIT_MODELS"), [ "gemini/gemini-3-flash-preview", "gemini/gemini-flash-latest", ], ) class RateLimitEntry(TypedDict, total=False): day: str convert: int edit: int class HistoryEntry(TypedDict, total=False): ts: float ip: str action: str latex: str instructions: str model: str status: str summary: str RUNS_DIR.mkdir(parents=True, exist_ok=True) RATE_LIMIT_PATH.touch(exist_ok=True) def _default_rate_limit_entry(day: str) -> RateLimitEntry: return {"day": day, "convert": 0, "edit": 0} def _entry_count(entry: RateLimitEntry, key: str) -> int: value = entry.get(key) return value if isinstance(value, int) else 0 def _history_path(run_dir: Path) -> Path: return run_dir / "history.jsonl" def _append_history_entry(run_dir: Path, entry: HistoryEntry) -> None: record: HistoryEntry = { "ts": entry.get("ts", time.time()), "ip": entry.get("ip", "unknown"), "action": entry.get("action", "edit"), "latex": entry.get("latex", ""), "instructions": entry.get("instructions", ""), "model": entry.get("model", ""), "status": entry.get("status", ""), "summary": entry.get("summary", ""), } path = _history_path(run_dir) with path.open("a", encoding="utf-8") as f: f.write(json.dumps(record, ensure_ascii=False) + "\n") def _load_history_for_ip(run_dir: Path, ip: str) -> list[HistoryEntry]: path = _history_path(run_dir) entries: list[HistoryEntry] = [] if not path.exists(): return entries try: with path.open("r", encoding="utf-8", errors="replace") as f: for line in f: line = line.strip() if not line: continue try: obj = json.loads(line) except Exception: continue if not isinstance(obj, dict): continue if obj.get("ip") != ip: continue entries.append( { "ts": obj.get("ts", 0.0), "ip": obj.get("ip", ""), "action": obj.get("action", ""), "latex": obj.get("latex", ""), "instructions": obj.get("instructions", ""), "model": obj.get("model", ""), "status": obj.get("status", ""), "summary": obj.get("summary", ""), } ) except Exception: return entries return entries def _setup_logging() -> None: level_name = os.getenv("LOG_LEVEL", "INFO").upper() level = getattr(logging, level_name, logging.INFO) logging.basicConfig( level=level, format="%(asctime)s %(levelname)s %(name)s %(message)s", ) _setup_logging() logger = logging.getLogger("diagram-to-tikz") def _load_rate_limits() -> dict[str, RateLimitEntry]: if not RATE_LIMIT_PATH.exists(): return {} try: data = json.loads(RATE_LIMIT_PATH.read_text(encoding="utf-8", errors="replace")) return cast(dict[str, RateLimitEntry], data) if isinstance(data, dict) else {} except Exception: return {} def _save_rate_limits(data: dict[str, RateLimitEntry]) -> None: RATE_LIMIT_PATH.write_text(json.dumps(data, indent=2, sort_keys=True) + "\n", encoding="utf-8") def _consume_rate_limit(ip: str, *, key: str, limit: int) -> bool: today = time.strftime("%Y-%m-%d") data = _load_rate_limits() raw_entry = data.get(ip) if isinstance(raw_entry, dict): entry: RateLimitEntry = cast(RateLimitEntry, raw_entry) else: entry = _default_rate_limit_entry(today) last_day = entry.get("day") if isinstance(entry.get("day"), str) else "" if last_day != today: entry = _default_rate_limit_entry(today) if key not in ("convert", "edit"): return False count = _entry_count(entry, key) if count >= limit: return False entry[key] = count + 1 if "day" not in entry: entry["day"] = today data[ip] = entry _save_rate_limits(data) return True def _rate_limit_status(ip: str) -> dict[str, int]: today = time.strftime("%Y-%m-%d") data = _load_rate_limits() raw_entry = data.get(ip) if isinstance(raw_entry, dict): entry: RateLimitEntry = cast(RateLimitEntry, raw_entry) else: entry = _default_rate_limit_entry(today) if entry.get("day") != today: entry = _default_rate_limit_entry(today) data[ip] = entry _save_rate_limits(data) convert_used = _entry_count(entry, "convert") edit_used = _entry_count(entry, "edit") return { "convert_limit": CONVERT_DAILY_LIMIT, "convert_used": convert_used, "convert_remaining": max(CONVERT_DAILY_LIMIT - convert_used, 0), "edit_limit": EDIT_DAILY_LIMIT, "edit_used": edit_used, "edit_remaining": max(EDIT_DAILY_LIMIT - edit_used, 0), } class RunLogger: def __init__(self, run_dir: Path, py_logger: logging.Logger): self.run_dir = run_dir self.py = py_logger self._log_path = run_dir / "run.log.txt" def line(self, msg: str) -> None: ts = time.strftime("%Y-%m-%d %H:%M:%S") with self._log_path.open("a", encoding="utf-8") as f: f.write(f"{ts} {msg}\n") def section(self, title: str, body: str) -> None: ts = time.strftime("%Y-%m-%d %H:%M:%S") header = f"\n--- {title} ({ts}) ---\n" text = (body or "").rstrip() + "\n" with self._log_path.open("a", encoding="utf-8") as f: f.write(header) f.write(text) CONVERT_SYSTEM_PROMPT = ( "You convert hand-drawn diagrams into clean, compilable LaTeX/TikZ. " "Return EXACTLY ONE complete LaTeX document (not a fragment). " 'Output must start with \\"\\\\documentclass{standalone}\\" and end with \\"\\\\end{document}\\". ' "Return ONLY raw LaTeX source: no markdown, no code fences, no commentary, no extra text. " "Wrap LaTeX to roughly 80 characters per line, format it cleanly, and include concise LaTeX comments that explain structure where useful." ) CONVERT_PROMPT = ( "Convert the diagram in the image into a standalone LaTeX document that compiles with pdflatex.\n" "Requirements:\n" "- Output must be a complete LaTeX document starting with \\documentclass{standalone}.\n" "- Use TikZ (\\usepackage{tikz}) and draw the diagram.\n" "- The output must compile without external assets.\n" "- Prefer simple primitives (nodes, lines, arrows, rectangles, circles, text).\n" "- Keep it reasonably sized and centered; use consistent spacing and alignment.\n" ) EDIT_SYSTEM_PROMPT = ( "You are an expert LaTeX/TikZ editor. " "Given an existing standalone LaTeX/TikZ document and an edit request, return EXACTLY TWO elements: " "a complete LaTeX document wrapped in ... and a very generic summary (less than 7 words, all lowercase) wrapped in .... " "Return ONLY these tags and their contents: no markdown, no code fences, no commentary, no extra text. " "The LaTeX must compile with pdflatex. " "Wrap LaTeX to roughly 80 characters per line, keep formatting tidy, and add brief LaTeX comments where helpful." ) EDIT_PROMPT_TEMPLATE = ( "Edit the following LaTeX document according to the instructions.\n" "Rules:\n" "- Return a complete standalone LaTeX document (\\documentclass{{standalone}} ... \\end{{document}}).\n" "- Make minimal changes necessary to satisfy the instructions.\n" "- Do not add external files or assets.\n" "INSTRUCTIONS:\n{instructions}\n\n" "LATEX:\n{latex}\n" ) def _guess_mime(filename: str) -> str: mime, _ = mimetypes.guess_type(filename) return mime or "application/octet-stream" def _to_data_url(image_bytes: bytes, mime: str) -> str: b64 = base64.b64encode(image_bytes).decode("ascii") return f"data:{mime};base64,{b64}" def _extract_text_from_litellm(resp: Any) -> str: # LiteLLM tries to be OpenAI-compatible but the return shape can vary. try: choice = resp["choices"][0] message = choice.get("message") or {} content = message.get("content") except Exception: content = None if isinstance(content, str): return content # Sometimes content can be a list of parts. if isinstance(content, list): parts: list[str] = [] for part in content: if isinstance(part, str): parts.append(part) elif isinstance(part, dict): text = part.get("text") if isinstance(text, str): parts.append(text) return "\n".join(parts).strip() # Fallback to stringification. return str(resp).strip() def _parse_latex_and_summary(raw_text: str) -> tuple[str, str]: """Extracts ... and ... from raw_text. Falls back to treating the whole raw_text as LaTeX if tags are missing. """ latex_re = re.compile(r"(.*?)", re.DOTALL | re.IGNORECASE) summary_re = re.compile(r"(.*?)", re.DOTALL | re.IGNORECASE) latex_match = latex_re.search(raw_text or "") summary_match = summary_re.search(raw_text or "") latex = (latex_match.group(1) if latex_match else (raw_text or "")).strip() summary = (summary_match.group(1) if summary_match else "").strip() return latex, summary def _litellm_call(*, model: str, messages: list[dict[str, Any]], **kwargs: Any) -> Any: return completion(model=model, messages=messages, **kwargs) def _litellm_with_retries( *, models: list[str], messages: list[dict[str, Any]], run_log: RunLogger, **kwargs: Any, ) -> tuple[str, str]: last_error: Exception | None = None for model in models: try: run_log.line(f"llm.try model={model}") resp = _litellm_call(model=model, messages=messages, **kwargs) raw_text = _extract_text_from_litellm(resp) text = (raw_text or "").strip() if not text: raise RuntimeError("Empty LLM response") return model, raw_text except Exception as e: last_error = e run_log.line(f"llm.fail model={model} error={e}") # sleep between retries time.sleep(1) continue raise RuntimeError(f"All models failed: {[m for m in models]} ({last_error})") def _run_cmd( cmd: list[str], cwd: Path, timeout_s: int = 120, py_logger: logging.Logger | None = None ) -> tuple[int, str]: started = time.monotonic() cmd_str = " ".join(cmd) log = py_logger or logger log.info("cmd.start cwd=%s timeout=%ss cmd=%s", str(cwd), timeout_s, cmd_str) try: proc = subprocess.run( cmd, cwd=str(cwd), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, timeout=timeout_s, ) elapsed_ms = int((time.monotonic() - started) * 1000) log.info( "cmd.end rc=%s elapsed_ms=%s out_chars=%s cmd=%s", proc.returncode, elapsed_ms, len(proc.stdout or ""), cmd_str, ) return proc.returncode, proc.stdout except subprocess.TimeoutExpired as e: elapsed_ms = int((time.monotonic() - started) * 1000) if isinstance(e.stdout, (bytes, bytearray)): out = bytes(e.stdout).decode("utf-8", errors="replace") elif isinstance(e.stdout, str): out = e.stdout else: out = "" log.warning("cmd.timeout elapsed_ms=%s cmd=%s", elapsed_ms, cmd_str) return 124, out + f"\n[timeout after {timeout_s}s]\n" def _compile_pdflatex( run_dir: Path, tex_filename: str, run_logger: RunLogger, section_title: str ) -> Optional[Path]: code, out = _run_cmd( [ "pdflatex", "-interaction=nonstopmode", "-halt-on-error", "-file-line-error", "-output-directory", str(run_dir), str(run_dir / tex_filename), ], cwd=run_dir, timeout_s=PDFLATEX_TIMEOUT_SECONDS, py_logger=run_logger.py, ) if code == 0: run_logger.section(section_title, "Compilation succeeded.") else: run_logger.section(section_title, out) pdf_path = run_dir / Path(tex_filename).with_suffix(".pdf").name if code == 0 and pdf_path.exists(): return pdf_path return None def _render_png_with_magick( run_dir: Path, pdf_path: Path, run_logger: RunLogger, section_title: str, png_name: str = "diagram.png", size: tuple[int, int] | None = None, padding: int = 0, ) -> Optional[Path]: png_path = run_dir / png_name # Base command: use first page, 300 DPI, and flatten transparency cmd = [ "magick", "-density", "300", f"{pdf_path}[0]", "-background", "white", "-alpha", "remove", "-alpha", "off", ] if size: width, height = size # Calculate the target size for the content after subtracting margins inner_width = width - (2 * padding) inner_height = height - (2 * padding) inner_size = f"{inner_width}x{inner_height}" size_str = f"{width}x{height}" cmd.extend( [ "-resize", inner_size, # Resize to fit within margin bounds "-gravity", "center", # Center the image "-extent", size_str, # Pad out to the full requested size ] ) elif padding > 0: # If no size is fixed but margin is requested, use -border cmd.extend(["-bordercolor", "white", "-border", str(padding)]) cmd.append(str(png_path)) code, out = _run_cmd(cmd, cwd=run_dir, timeout_s=5, py_logger=run_logger.py) run_logger.section(section_title, out) if code == 0 and png_path.exists(): return png_path return None COMPILE_COOLDOWN_SECS = 3 COMPILE_STATE_FILE = "compile_state.json" def _load_compile_state(run_dir: Path) -> dict[str, Any]: path = run_dir / COMPILE_STATE_FILE if not path.exists(): return {} try: data = json.loads(path.read_text(encoding="utf-8", errors="replace")) return data if isinstance(data, dict) else {} except Exception: return {} def _save_compile_state(run_dir: Path, state: dict[str, Any]) -> None: path = run_dir / COMPILE_STATE_FILE path.write_text(json.dumps(state, indent=2, sort_keys=True) + "\n", encoding="utf-8") def _tex_digest(text: str) -> str: return hashlib.sha256(text.encode("utf-8", errors="replace")).hexdigest() def _should_compile( run_dir: Path, tex_text: str, *, cooldown_s: int = COMPILE_COOLDOWN_SECS ) -> tuple[bool, str, str]: state = _load_compile_state(run_dir) last_ts = float(state.get("last_ts", 0.0) or 0.0) last_hash = state.get("last_hash") if isinstance(state.get("last_hash"), str) else "" now = time.time() digest = _tex_digest(tex_text) if now - last_ts < cooldown_s: return False, "cooldown", digest if last_hash and last_hash == digest: return False, "unchanged", digest return True, "ok", digest def _render_svg_with_pdf2svg( run_dir: Path, pdf_path: Path, run_logger: RunLogger, section_title: str, svg_name: str = "diagram.svg" ) -> Optional[Path]: svg_path = run_dir / svg_name cmd = ["pdf2svg", str(pdf_path), str(svg_path), "1"] code, out = _run_cmd(cmd, cwd=run_dir, timeout_s=5, py_logger=run_logger.py) run_logger.section(section_title, out) if code == 0 and svg_path.exists(): return svg_path return None def _compile_latex_to_png( run_dir: Path, tex_path: Path, run_logger: RunLogger, phase: str ) -> tuple[Optional[Path], Optional[Path]]: """Compile a .tex file with pdflatex and render first page to PNG via ImageMagick. Logs outputs into run.log.txt and returns only the artifact paths. """ pdf_path = _compile_pdflatex(run_dir, tex_path.name, run_logger, section_title=f"{phase}.pdflatex") if pdf_path is None: return None, None png_path = _render_png_with_magick(run_dir, pdf_path, run_logger, section_title=f"{phase}.magick") return pdf_path, png_path def _compile_phase( run_dir: Path, tex_path: Path, run_log: RunLogger, phase: str, start_msg: str = "compile.start" ) -> tuple[Optional[Path], Optional[Path], Optional[Path], int, str]: try: tex_text = tex_path.read_text(encoding="utf-8", errors="replace") except Exception: tex_text = "" allowed, reason, digest = _should_compile(run_dir, tex_text, cooldown_s=COMPILE_COOLDOWN_SECS) pdf_path_existing = run_dir / Path(tex_path.name).with_suffix(".pdf").name png_path_existing = run_dir / "diagram.png" svg_path_existing = run_dir / "diagram.svg" if not allowed: pdf_path = pdf_path_existing if pdf_path_existing.exists() else None png_path = png_path_existing if png_path_existing.exists() else None svg_path = svg_path_existing if svg_path_existing.exists() else None run_log.line( f"{start_msg}.skip reason={reason} has_pdf={pdf_path is not None} has_png={png_path is not None} has_svg={svg_path is not None}" ) return pdf_path, png_path, svg_path, 0, reason run_log.line(start_msg) started = time.monotonic() pdf_path, png_path = _compile_latex_to_png(run_dir, tex_path, run_log, phase=phase) svg_path = ( _render_svg_with_pdf2svg(run_dir, pdf_path, run_log, section_title=f"{phase}.pdf2svg") if pdf_path else None ) # Render preview.png with OG image size (1200x630, center-fit) preview_path = ( _render_png_with_magick( run_dir, pdf_path, run_log, section_title=f"{phase}.magick.preview", png_name="preview.png", size=(1200, 630), padding=20, ) if pdf_path else None ) elapsed_ms = int((time.monotonic() - started) * 1000) _save_compile_state( run_dir, { "last_ts": time.time(), "last_hash": digest, }, ) run_log.line( f"compile.end elapsed_ms={elapsed_ms} ok_pdf={pdf_path is not None} ok_png={png_path is not None} ok_svg={svg_path is not None}" ) return pdf_path, png_path, svg_path, elapsed_ms, "compiled" BASE62_ALPHABET = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" RUN_ID_RE = re.compile(r"(\d{4}-\d{2}-\d{2}-[0-9a-f]{32}|[0-9A-Za-z]{22})") def _uuid_to_base62(u: uuid.UUID) -> str: n: int = int.from_bytes(u.bytes, "big", signed=False) base = len(BASE62_ALPHABET) chars: list[str] = [] while n > 0: n, rem = divmod(n, base) chars.append(BASE62_ALPHABET[rem]) encoded = "".join(reversed(chars or [BASE62_ALPHABET[0]])) return encoded.rjust(22, BASE62_ALPHABET[0]) def _new_run_id() -> str: return _uuid_to_base62(uuid.uuid4()) def _write_run_meta(run_dir: Path, meta: dict[str, Any]) -> None: (run_dir / "meta.json").write_text(json.dumps(meta, indent=2, sort_keys=True) + "\n", encoding="utf-8") def _read_run_meta(run_dir: Path) -> dict[str, Any]: path = run_dir / "meta.json" if not path.exists(): return {} try: return json.loads(path.read_text(encoding="utf-8", errors="replace")) except Exception: return {} def _get_meta_string(meta: dict[str, Any], key: str) -> str: value = meta.get(key) return value if isinstance(value, str) else "" def _write_run_error(run_dir: Path, error_text: str) -> None: (run_dir / "error.txt").write_text((error_text or "").strip() + "\n", encoding="utf-8") def _read_run_error(run_dir: Path) -> str: path = run_dir / "error.txt" if not path.exists(): return "" return path.read_text(encoding="utf-8", errors="replace").strip() def _write_run_error_parts(run_dir: Path, parts: list[str]) -> None: text = "\n\n".join([p for p in parts if p]).strip() _write_run_error(run_dir, text) def _safe_join_run_file(run_id: str, filename: str) -> Optional[Path]: # Prevent path traversal. if not RUN_ID_RE.fullmatch(run_id): return None if "/" in filename or "\\" in filename: return None run_dir = RUNS_DIR / run_id candidate = (run_dir / filename).resolve() try: run_dir_resolved = run_dir.resolve() except FileNotFoundError: return None if run_dir_resolved not in candidate.parents: return None return candidate def _run_dir_for_id(run_id: str) -> Optional[Path]: if not RUN_ID_RE.fullmatch(run_id): return None run_dir = RUNS_DIR / run_id if not run_dir.exists() or not run_dir.is_dir(): return None return run_dir app = FastAPI() app.mount("/public", StaticFiles(directory=str(PUBLIC_DIR)), name="public") templates = Jinja2Templates(directory=str(TEMPLATES_DIR)) templates.env.globals["base_path"] = BASE_PATH @app.get("/", response_class=HTMLResponse) async def index(request: Request, error: str | None = None): client_ip = request.client.host if request.client else "unknown" rate_status = _rate_limit_status(client_ip) return templates.TemplateResponse( "index.html", { "request": request, "error": error, "base_path": BASE_PATH, "convert_remaining": rate_status["convert_remaining"], "convert_limit": rate_status["convert_limit"], }, ) @app.post("/convert", response_class=HTMLResponse) async def convert(request: Request, image: UploadFile = File(...), notes: str = Form("")): client_ip = request.client.host if request.client else "unknown" if not _consume_rate_limit(client_ip, key="convert", limit=5): return RedirectResponse(url=f"{BASE_PATH}/?error=Rate%20limit%20exceeded", status_code=303) models = DEFAULT_MODELS run_id = _new_run_id() run_dir = RUNS_DIR / run_id run_dir.mkdir(parents=True, exist_ok=True) run_log = RunLogger(run_dir, logger) started = time.monotonic() logger.info("run.start run_id=%s", run_id) run_log.line(f"convert.start models={models}") notes = (notes or "").strip() # Save original upload original_bytes = await image.read() if not original_bytes: logger.warning("run.empty_upload run_id=%s", run_id) return RedirectResponse(url=f"{BASE_PATH}/?error=Empty%20upload", status_code=303) original_name = image.filename or "upload" ext = Path(original_name).suffix if not ext: ext = ".png" original_path = run_dir / f"original{ext}" original_path.write_bytes(original_bytes) logger.info( "upload.saved run_id=%s filename=%s content_type=%s bytes=%s path=%s", run_id, original_name, image.content_type, len(original_bytes), str(original_path), ) run_log.line( f"upload.saved filename={original_name} content_type={image.content_type} bytes={len(original_bytes)} path={original_path.name}", ) mime = image.content_type or _guess_mime(original_name) data_url = _to_data_url(original_bytes, mime) # Call LLM (vision) llm_error = "" raw_text = "" tex = "" used_model = "" try: llm_started = time.monotonic() run_log.line("llm.start") if notes: run_log.section("convert.notes", notes) user_text = CONVERT_PROMPT if notes: user_text = f"{user_text}\n\nAdditional notes from user:\n{notes}" used_model, raw_text = _litellm_with_retries( models=models, messages=[ {"role": "system", "content": CONVERT_SYSTEM_PROMPT}, { "role": "user", "content": [ {"type": "text", "text": user_text}, {"type": "image_url", "image_url": {"url": data_url}}, ], }, ], run_log=run_log, ) tex = (raw_text or "").strip() llm_elapsed_ms = int((time.monotonic() - llm_started) * 1000) logger.info( "llm.ok run_id=%s elapsed_ms=%s raw_chars=%s tex_chars=%s", run_id, llm_elapsed_ms, len(raw_text or ""), len(tex or ""), ) run_log.line( f"llm.ok model={used_model} elapsed_ms={llm_elapsed_ms} raw_chars={len(raw_text or '')} tex_chars={len(tex or '')}", ) except Exception as e: llm_error = f"LLM call failed: {e}" logger.exception("llm.error run_id=%s", run_id) run_log.line(f"llm.error {llm_error}") tex_path = run_dir / "diagram.tex" tex_path.write_text(tex or "% LLM failed to produce LaTeX\n", encoding="utf-8") run_log.line(f"tex.written path={tex_path.name} chars={len(tex or '')}") # Compile and render pdf_path: Optional[Path] = None png_path: Optional[Path] = None svg_path: Optional[Path] = None cp_elapsed_ms = 0 cp_reason = "" if tex: pdf_path, png_path, svg_path, cp_elapsed_ms, cp_reason = _compile_phase( run_dir, tex_path, run_log, phase="convert" ) logger.info( "compile.done run_id=%s elapsed_ms=%s ok_pdf=%s ok_png=%s ok_svg=%s", run_id, cp_elapsed_ms, pdf_path is not None, png_path is not None, svg_path is not None, ) run_log.section("convert.llm_raw", raw_text or "") convert_model = used_model or (models[0] if models else "") _write_run_meta( run_dir, { "convert_model": convert_model, "last_edit_model": "", "created_at": time.time(), "updated_at": time.time(), }, ) _write_run_error_parts( run_dir, [ llm_error, "pdflatex failed. See run.log.txt in the run folder." if (tex and pdf_path is None) else "", "ImageMagick render failed. See run.log.txt in the run folder." if (pdf_path and not png_path) else "", "SVG render failed. See run.log.txt in the run folder." if (pdf_path and not svg_path) else "", ], ) elapsed_ms = int((time.monotonic() - started) * 1000) logger.info( "run.end run_id=%s elapsed_ms=%s ok_pdf=%s ok_png=%s", run_id, elapsed_ms, pdf_path is not None, png_path is not None, ) run_log.line( f"convert.end elapsed_ms={elapsed_ms} ok_pdf={pdf_path is not None} ok_png={png_path is not None}", ) warn_qs = f"?warn={cp_reason}" if cp_elapsed_ms == 0 and cp_reason else "" return RedirectResponse(url=f"{BASE_PATH}/d/{run_id}{warn_qs}", status_code=303) @app.get("/d/{run_id}", response_class=HTMLResponse) async def get_run_result(request: Request, run_id: str, warn: str | None = None): run_dir = _run_dir_for_id(run_id) if run_dir is None: return HTMLResponse("Not found", status_code=404) client_ip = request.client.host if request.client else "unknown" rate_status = _rate_limit_status(client_ip) tex_path = run_dir / "diagram.tex" pdf_path = run_dir / "diagram.pdf" png_path = run_dir / "diagram.png" svg_path = run_dir / "diagram.svg" tex = tex_path.read_text(encoding="utf-8", errors="replace") if tex_path.exists() else "" png_url = f"{BASE_PATH}/d/{run_id}/files/{png_path.name}" if png_path.exists() else None original_candidates = sorted(run_dir.glob("original.*")) original_path = original_candidates[0] if original_candidates else (run_dir / "original") meta = _read_run_meta(run_dir) convert_model = _get_meta_string(meta, "convert_model") or _get_meta_string(meta, "model") edit_model = _get_meta_string(meta, "last_edit_model") if not edit_model: model_str = _get_meta_string(meta, "model") if model_str.endswith("(edit)"): edit_model = model_str.replace("(edit)", "").strip() if not convert_model: convert_model = DEFAULT_MODELS[0] if DEFAULT_MODELS else "" error_text = _read_run_error(run_dir) or None def file_url(name: str) -> str: return f"{BASE_PATH}/d/{run_id}/files/{name}" download_pdf_url = file_url(pdf_path.name) if pdf_path.exists() else file_url("diagram.pdf") download_svg_url = file_url(svg_path.name) if svg_path.exists() else file_url("diagram.svg") def _warn_text(reason: str | None) -> str: if not reason: return "" if reason == "cooldown": return "Please wait a few seconds before compiling again." if reason == "unchanged": return "Source unchanged; compile skipped." return "Compilation skipped." return templates.TemplateResponse( "edit.html", { "request": request, "run_id": run_id, "convert_model": convert_model, "last_edit_model": edit_model, "tex": tex, "png_url": png_url, "download_original_url": file_url(original_path.name), "download_tex_url": file_url(tex_path.name), "download_pdf_url": download_pdf_url, "download_png_url": file_url("diagram.png"), "download_svg_url": download_svg_url, "error": error_text, "base_path": BASE_PATH, "edit_remaining": rate_status["edit_remaining"], "edit_limit": rate_status["edit_limit"], "compile_warning": _warn_text(warn), }, ) @app.post("/d/{run_id}", response_class=HTMLResponse) async def edit_tex(request: Request, run_id: str, instructions: str = Form(...), latex: str = Form(...)): run_dir = _run_dir_for_id(run_id) if run_dir is None: return RedirectResponse(url=f"{BASE_PATH}/?error=Invalid%20run%20id", status_code=303) client_ip = request.client.host if request.client else "unknown" if not _consume_rate_limit(client_ip, key="edit", limit=10): return HTMLResponse("Rate limit exceeded", status_code=429) instructions = (instructions or "").strip() if not instructions: _write_run_error(run_dir, "Edit instructions are empty.") return RedirectResponse(url=f"{BASE_PATH}/d/{run_id}", status_code=303) tex_path = run_dir / "diagram.tex" current_tex = latex if not isinstance(current_tex, str) or not current_tex.strip(): if tex_path.exists(): current_tex = tex_path.read_text(encoding="utf-8", errors="replace") else: return RedirectResponse(url=f"{BASE_PATH}/?error=Missing%20diagram.tex", status_code=303) meta = _read_run_meta(run_dir) started = time.monotonic() logger.info("edit.start run_id=%s", run_id) run_log = RunLogger(run_dir, logger) run_log.line(f"edit.start models={EDIT_MODELS}") edit_system = EDIT_SYSTEM_PROMPT edit_user = EDIT_PROMPT_TEMPLATE.format(instructions=instructions, latex=current_tex) run_log.section("edit.request", f"instructions=\n{instructions}\n") llm_error = "" raw_text = "" new_tex = "" used_model = "" summary_text = "" try: llm_started = time.monotonic() run_log.line("llm.start") used_model, raw_text = _litellm_with_retries( models=EDIT_MODELS, messages=[ {"role": "system", "content": edit_system}, {"role": "user", "content": edit_user}, ], run_log=run_log, ) new_tex, summary_text = _parse_latex_and_summary(raw_text or "") llm_elapsed_ms = int((time.monotonic() - llm_started) * 1000) logger.info( "edit.llm.ok run_id=%s elapsed_ms=%s raw_chars=%s tex_chars=%s", run_id, llm_elapsed_ms, len(raw_text or ""), len(new_tex or ""), ) run_log.line( f"llm.ok model={used_model} elapsed_ms={llm_elapsed_ms} raw_chars={len(raw_text or '')} tex_chars={len(new_tex or '')}", ) except Exception as e: llm_error = f"LLM call failed: {e}" logger.exception("edit.llm.error run_id=%s", run_id) run_log.line(f"llm.error {llm_error}") if new_tex: tex_path.write_text(new_tex, encoding="utf-8") run_log.line(f"tex.written path={tex_path.name} chars={len(new_tex)}") pdf_path: Optional[Path] = None png_path: Optional[Path] = None svg_path: Optional[Path] = None cp_elapsed_ms = 0 cp_reason = "" if new_tex: pdf_path, png_path, svg_path, cp_elapsed_ms, cp_reason = _compile_phase( run_dir, tex_path, run_log, phase="edit" ) logger.info( "edit.compile.done run_id=%s elapsed_ms=%s ok_pdf=%s ok_png=%s ok_svg=%s", run_id, cp_elapsed_ms, pdf_path is not None, png_path is not None, svg_path is not None, ) if cp_elapsed_ms > 0: history_status = "ok" if pdf_path else "error" _append_history_entry( run_dir, { "ts": time.time(), "ip": client_ip, "action": "edit", "latex": new_tex, "instructions": instructions, "model": used_model or (EDIT_MODELS[0] if EDIT_MODELS else ""), "status": history_status, "summary": summary_text, }, ) run_log.section("edit.llm_raw", raw_text or "") convert_model = _get_meta_string(meta, "convert_model") or _get_meta_string(meta, "model") last_model_label = _get_meta_string(meta, "model") last_edit_model = _get_meta_string(meta, "last_edit_model") if not convert_model and last_model_label.endswith("(edit)"): convert_model = last_model_label.replace("(edit)", "").strip() default_convert_model = convert_model or last_model_label or (DEFAULT_MODELS[0] if DEFAULT_MODELS else "") if new_tex: edit_model = used_model or (EDIT_MODELS[0] if EDIT_MODELS else "") _write_run_meta( run_dir, { "convert_model": default_convert_model, "last_edit_model": edit_model, "created_at": meta.get("created_at", time.time()), "updated_at": time.time(), }, ) else: _write_run_meta( run_dir, { "convert_model": default_convert_model, "last_edit_model": last_edit_model, "created_at": meta.get("created_at", time.time()), "updated_at": time.time(), }, ) _write_run_error_parts( run_dir, [ llm_error, "pdflatex failed. See run.log.txt in the run folder." if (new_tex and pdf_path is None) else "", "ImageMagick render failed. See run.log.txt in the run folder." if (pdf_path and not png_path) else "", "SVG render failed. See run.log.txt in the run folder." if (pdf_path and not svg_path) else "", ], ) elapsed_ms = int((time.monotonic() - started) * 1000) logger.info( "edit.end run_id=%s elapsed_ms=%s ok_pdf=%s ok_png=%s", run_id, elapsed_ms, pdf_path is not None, png_path is not None, ) run_log.line( f"edit.end elapsed_ms={elapsed_ms} ok_pdf={pdf_path is not None} ok_png={png_path is not None}", ) warn_qs = f"?warn={cp_reason}" if cp_elapsed_ms == 0 and cp_reason else "" return RedirectResponse(url=f"{BASE_PATH}/d/{run_id}{warn_qs}", status_code=303) @app.post("/d/{run_id}/compile", response_class=HTMLResponse) async def compile_tex(request: Request, run_id: str, latex: str = Form(...)): run_dir = _run_dir_for_id(run_id) if run_dir is None: return RedirectResponse(url=f"{BASE_PATH}/?error=Invalid%20run%20id", status_code=303) client_ip = request.client.host if request.client else "unknown" tex = latex if isinstance(latex, str) else "" tex_path = run_dir / "diagram.tex" tex_path.write_text(tex, encoding="utf-8") started = time.monotonic() logger.info("compile.start run_id=%s", run_id) run_log = RunLogger(run_dir, logger) run_log.line("compile.start") pdf_path: Optional[Path] = None png_path: Optional[Path] = None svg_path: Optional[Path] = None cp_elapsed_ms = 0 cp_reason = "" if tex.strip(): pdf_path, png_path, svg_path, cp_elapsed_ms, cp_reason = _compile_phase( run_dir, tex_path, run_log, phase="compile", start_msg="compile.invoke" ) logger.info( "compile.done run_id=%s elapsed_ms=%s ok_pdf=%s ok_png=%s ok_svg=%s", run_id, cp_elapsed_ms, pdf_path is not None, png_path is not None, svg_path is not None, ) if cp_elapsed_ms > 0: history_status = "ok" if pdf_path else "error" _append_history_entry( run_dir, { "ts": time.time(), "ip": client_ip, "action": "compile", "latex": tex, "instructions": "", "model": "", "status": history_status, }, ) meta = _read_run_meta(run_dir) model_label = _get_meta_string(meta, "model") convert_model = _get_meta_string(meta, "convert_model") or model_label last_edit_model = _get_meta_string(meta, "last_edit_model") if not convert_model and model_label.endswith("(edit)"): convert_model = model_label.replace("(edit)", "").strip() default_convert_model = convert_model or model_label or (DEFAULT_MODELS[0] if DEFAULT_MODELS else "") _write_run_meta( run_dir, { "convert_model": default_convert_model, "last_edit_model": last_edit_model, "created_at": meta.get("created_at", time.time()), "updated_at": time.time(), }, ) _write_run_error_parts( run_dir, [ "LaTeX is empty." if not tex.strip() else "", "pdflatex failed. See run.log.txt in the run folder." if (tex.strip() and pdf_path is None) else "", "ImageMagick render failed. See run.log.txt in the run folder." if (tex.strip() and pdf_path is not None and png_path is None) else "", "SVG render failed. See run.log.txt in the run folder." if (tex.strip() and pdf_path is not None and svg_path is None) else "", ], ) elapsed_ms = int((time.monotonic() - started) * 1000) logger.info("compile.end run_id=%s elapsed_ms=%s", run_id, elapsed_ms) run_log.line(f"compile.finish elapsed_ms={elapsed_ms}") warn_qs = f"?warn={cp_reason}" if cp_elapsed_ms == 0 and cp_reason else "" return RedirectResponse(url=f"{BASE_PATH}/d/{run_id}{warn_qs}", status_code=303) @app.get("/d/{run_id}/history") async def get_history(request: Request, run_id: str): run_dir = _run_dir_for_id(run_id) if run_dir is None: return JSONResponse({"entries": []}, status_code=404) client_ip = request.client.host if request.client else "unknown" entries = _load_history_for_ip(run_dir, client_ip) entries_sorted = sorted(entries, key=lambda e: e.get("ts", 0), reverse=True) safe_entries: list[dict[str, Any]] = [] for entry in entries_sorted: safe_entries.append( { "ts": entry.get("ts", 0.0), "action": entry.get("action", ""), "latex": entry.get("latex", ""), "instructions": entry.get("instructions", ""), "model": entry.get("model", ""), "status": entry.get("status", ""), "summary": entry.get("summary", ""), } ) return JSONResponse({"entries": safe_entries}) @app.post("/d/{run_id}/delete", response_class=HTMLResponse) async def delete_run(request: Request, run_id: str): run_dir = _run_dir_for_id(run_id) if run_dir is None: return RedirectResponse(url=f"{BASE_PATH}/?error=Invalid%20run%20id", status_code=303) try: resolved_runs_dir = RUNS_DIR.resolve() resolved_run_dir = run_dir.resolve() if resolved_runs_dir not in resolved_run_dir.parents: logger.warning("delete.invalid_path run_id=%s path=%s", run_id, str(run_dir)) return HTMLResponse("Invalid run directory", status_code=400) shutil.rmtree(resolved_run_dir) logger.info("delete.ok run_id=%s", run_id) except Exception: logger.exception("delete.error run_id=%s", run_id) return RedirectResponse(url=f"{BASE_PATH}/d/{run_id}?error=Delete%20failed", status_code=303) return RedirectResponse(url=f"{BASE_PATH}/", status_code=303) @app.get("/d/{run_id}/files/{filename}") async def get_run_file(run_id: str, filename: str): path = _safe_join_run_file(run_id, filename) if path is None or not path.exists() or not path.is_file(): logger.info("download.not_found run_id=%s filename=%s", run_id, filename) return HTMLResponse("Not found", status_code=404) logger.info("download.ok run_id=%s filename=%s", run_id, filename) return FileResponse( path, headers={ "Cache-Control": "no-store, no-cache, must-revalidate, max-age=0", "Pragma": "no-cache", "Expires": "0", }, ) if __name__ == "__main__": import uvicorn uvicorn.run( "main:app", host=os.getenv("HOST", "127.0.0.1"), port=int(os.getenv("PORT", "8000")), reload=bool(os.getenv("RELOAD", "1") == "1"), )