@ -1,4 +1,5 @@
import base64
import hashlib
import json
import logging
import mimetypes
@ -78,6 +79,7 @@ class HistoryEntry(TypedDict, total=False):
instructions : str
model : str
status : str
summary : str
RUNS_DIR . mkdir ( parents = True , exist_ok = True )
@ -106,6 +108,7 @@ def _append_history_entry(run_dir: Path, entry: HistoryEntry) -> None:
" instructions " : entry . get ( " instructions " , " " ) ,
" model " : entry . get ( " model " , " " ) ,
" status " : entry . get ( " status " , " " ) ,
" summary " : entry . get ( " summary " , " " ) ,
}
path = _history_path ( run_dir )
with path . open ( " a " , encoding = " utf-8 " ) as f :
@ -140,6 +143,7 @@ def _load_history_for_ip(run_dir: Path, ip: str) -> list[HistoryEntry]:
" instructions " : obj . get ( " instructions " , " " ) ,
" model " : obj . get ( " model " , " " ) ,
" status " : obj . get ( " status " , " " ) ,
" summary " : obj . get ( " summary " , " " ) ,
}
)
except Exception :
@ -271,10 +275,11 @@ CONVERT_PROMPT = (
EDIT_SYSTEM_PROMPT = (
" You are an expert LaTeX/TikZ editor. "
" Given an existing standalone LaTeX/TikZ document and an edit request, return EXACTLY ONE updated complete LaTeX document. "
" Return ONLY raw LaTeX source: no markdown, no code fences, no commentary, no extra text. "
" The output must compile with pdflatex. "
" Wrap LaTeX to roughly 80 characters per line, keep formatting tidy, and add brief LaTeX comments to clarify sections when helpful. "
" Given an existing standalone LaTeX/TikZ document and an edit request, return EXACTLY TWO elements: "
" a complete LaTeX document wrapped in <latex>...</latex> and a very generic summary (less than 7 words, all lowercase) wrapped in <summary>...</summary>. "
" Return ONLY these tags and their contents: no markdown, no code fences, no commentary, no extra text. "
" The LaTeX must compile with pdflatex. "
" Wrap LaTeX to roughly 80 characters per line, keep formatting tidy, and add brief LaTeX comments where helpful. "
)
EDIT_PROMPT_TEMPLATE = (
@ -282,7 +287,7 @@ EDIT_PROMPT_TEMPLATE = (
" Rules: \n "
" - Return a complete standalone LaTeX document ( \\ documentclass {{ standalone}} ... \\ end {{ document}}). \n "
" - Make minimal changes necessary to satisfy the instructions. \n "
" - Do not add external files or assets. \n \n "
" - Do not add external files or assets. \n "
" INSTRUCTIONS: \n {instructions} \n \n "
" LATEX: \n {latex} \n "
)
@ -326,6 +331,22 @@ def _extract_text_from_litellm(resp: Any) -> str:
return str ( resp ) . strip ( )
def _parse_latex_and_summary ( raw_text : str ) - > tuple [ str , str ] :
""" Extracts <latex>...</latex> and <summary>...</summary> from raw_text.
Falls back to treating the whole raw_text as LaTeX if tags are missing .
"""
latex_re = re . compile ( r " <latex>(.*?)</latex> " , re . DOTALL | re . IGNORECASE )
summary_re = re . compile ( r " <summary>(.*?)</summary> " , re . DOTALL | re . IGNORECASE )
latex_match = latex_re . search ( raw_text or " " )
summary_match = summary_re . search ( raw_text or " " )
latex = ( latex_match . group ( 1 ) if latex_match else ( raw_text or " " ) ) . strip ( )
summary = ( summary_match . group ( 1 ) if summary_match else " " ) . strip ( )
return latex , summary
def _litellm_call ( * , model : str , messages : list [ dict [ str , Any ] ] , * * kwargs : Any ) - > Any :
return completion ( model = model , messages = messages , * * kwargs )
@ -439,6 +460,46 @@ def _render_png_with_magick(
return None
COMPILE_COOLDOWN_SECS = 3
COMPILE_STATE_FILE = " compile_state.json "
def _load_compile_state ( run_dir : Path ) - > dict [ str , Any ] :
path = run_dir / COMPILE_STATE_FILE
if not path . exists ( ) :
return { }
try :
data = json . loads ( path . read_text ( encoding = " utf-8 " , errors = " replace " ) )
return data if isinstance ( data , dict ) else { }
except Exception :
return { }
def _save_compile_state ( run_dir : Path , state : dict [ str , Any ] ) - > None :
path = run_dir / COMPILE_STATE_FILE
path . write_text ( json . dumps ( state , indent = 2 , sort_keys = True ) + " \n " , encoding = " utf-8 " )
def _tex_digest ( text : str ) - > str :
return hashlib . sha256 ( text . encode ( " utf-8 " , errors = " replace " ) ) . hexdigest ( )
def _should_compile (
run_dir : Path , tex_text : str , * , cooldown_s : int = COMPILE_COOLDOWN_SECS
) - > tuple [ bool , str , str ] :
state = _load_compile_state ( run_dir )
last_ts = float ( state . get ( " last_ts " , 0.0 ) or 0.0 )
last_hash = state . get ( " last_hash " ) if isinstance ( state . get ( " last_hash " ) , str ) else " "
now = time . time ( )
digest = _tex_digest ( tex_text )
if now - last_ts < cooldown_s :
return False , " cooldown " , digest
if last_hash and last_hash == digest :
return False , " unchanged " , digest
return True , " ok " , digest
def _render_svg_with_pdf2svg (
run_dir : Path , pdf_path : Path , run_logger : RunLogger , section_title : str , svg_name : str = " diagram.svg "
) - > Optional [ Path ] :
@ -468,7 +529,27 @@ def _compile_latex_to_png(
def _compile_phase (
run_dir : Path , tex_path : Path , run_log : RunLogger , phase : str , start_msg : str = " compile.start "
) - > tuple [ Optional [ Path ] , Optional [ Path ] , Optional [ Path ] , int ] :
) - > tuple [ Optional [ Path ] , Optional [ Path ] , Optional [ Path ] , int , str ] :
try :
tex_text = tex_path . read_text ( encoding = " utf-8 " , errors = " replace " )
except Exception :
tex_text = " "
allowed , reason , digest = _should_compile ( run_dir , tex_text , cooldown_s = COMPILE_COOLDOWN_SECS )
pdf_path_existing = run_dir / Path ( tex_path . name ) . with_suffix ( " .pdf " ) . name
png_path_existing = run_dir / " preview.png "
svg_path_existing = run_dir / " diagram.svg "
if not allowed :
pdf_path = pdf_path_existing if pdf_path_existing . exists ( ) else None
png_path = png_path_existing if png_path_existing . exists ( ) else None
svg_path = svg_path_existing if svg_path_existing . exists ( ) else None
run_log . line (
f " { start_msg } .skip reason= { reason } has_pdf= { pdf_path is not None } has_png= { png_path is not None } has_svg= { svg_path is not None } "
)
return pdf_path , png_path , svg_path , 0 , reason
run_log . line ( start_msg )
started = time . monotonic ( )
pdf_path , png_path = _compile_latex_to_png ( run_dir , tex_path , run_log , phase = phase )
@ -478,17 +559,38 @@ def _compile_phase(
else None
)
elapsed_ms = int ( ( time . monotonic ( ) - started ) * 1000 )
_save_compile_state (
run_dir ,
{
" last_ts " : time . time ( ) ,
" last_hash " : digest ,
} ,
)
run_log . line (
f " compile.end elapsed_ms= { elapsed_ms } ok_pdf= { pdf_path is not None } ok_png= { png_path is not None } ok_svg= { svg_path is not None } "
)
return pdf_path , png_path , svg_path , elapsed_ms
return pdf_path , png_path , svg_path , elapsed_ms , " compiled "
BASE62_ALPHABET = " 0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz "
RUN_ID_RE = re . compile ( r " ( \ d {4} - \ d {2} - \ d {2} -[0-9a-f] {32} |[0-9A-Za-z] {22} ) " )
RUN_ID_RE = re . compile ( r " \ d {4} - \ d {2} - \ d {2} -[0-9a-f] {32} " )
def _uuid_to_base62 ( u : uuid . UUID ) - > str :
n : int = int . from_bytes ( u . bytes , " big " , signed = False )
base = len ( BASE62_ALPHABET )
chars : list [ str ] = [ ]
while n > 0 :
n , rem = divmod ( n , base )
chars . append ( BASE62_ALPHABET [ rem ] )
encoded = " " . join ( reversed ( chars or [ BASE62_ALPHABET [ 0 ] ] ) )
return encoded . rjust ( 22 , BASE62_ALPHABET [ 0 ] )
def _new_run_id ( ) - > str :
return f " { time . strftime ( ' % Y- % m- %d ' ) } - { uuid . uuid4 ( ) . hex } "
return _uuid_to_base62 ( uuid . uuid4 ( ) )
def _write_run_meta ( run_dir : Path , meta : dict [ str , Any ] ) - > None :
@ -674,8 +776,11 @@ async def convert(request: Request, image: UploadFile = File(...), notes: str =
png_path : Optional [ Path ] = None
svg_path : Optional [ Path ] = None
cp_elapsed_ms = 0
cp_reason = " "
if tex :
pdf_path , png_path , svg_path , cp_elapsed_ms = _compile_phase (
pdf_path , png_path , svg_path , cp_elapsed_ms , cp_reason = _compile_phase (
run_dir , tex_path , run_log , phase = " convert "
)
logger . info (
@ -724,11 +829,12 @@ async def convert(request: Request, image: UploadFile = File(...), notes: str =
f " convert.end elapsed_ms= { elapsed_ms } ok_pdf= { pdf_path is not None } ok_png= { png_path is not None } " ,
)
return RedirectResponse ( url = f " { BASE_PATH } / { run_id } " , status_code = 303 )
warn_qs = f " ?warn= { cp_reason } " if cp_elapsed_ms == 0 and cp_reason else " "
return RedirectResponse ( url = f " { BASE_PATH } /d/ { run_id } { warn_qs } " , status_code = 303 )
@app.get ( " / {run_id} " , response_class = HTMLResponse )
async def get_run_result ( request : Request , run_id : str ):
@app.get ( " / d/ {run_id} " , response_class = HTMLResponse )
async def get_run_result ( request : Request , run_id : str , warn : str | None = None ):
run_dir = _run_dir_for_id ( run_id )
if run_dir is None :
return HTMLResponse ( " Not found " , status_code = 404 )
@ -765,6 +871,15 @@ async def get_run_result(request: Request, run_id: str):
download_pdf_url = file_url ( pdf_path . name ) if pdf_path . exists ( ) else file_url ( " diagram.pdf " )
download_svg_url = file_url ( svg_path . name ) if svg_path . exists ( ) else file_url ( " diagram.svg " )
def _warn_text ( reason : str | None ) - > str :
if not reason :
return " "
if reason == " cooldown " :
return " Please wait a few seconds before compiling again. "
if reason == " unchanged " :
return " Source unchanged; compile skipped. "
return " Compilation skipped. "
return templates . TemplateResponse (
" edit.html " ,
{
@ -783,11 +898,12 @@ async def get_run_result(request: Request, run_id: str):
" base_path " : BASE_PATH ,
" edit_remaining " : rate_status [ " edit_remaining " ] ,
" edit_limit " : rate_status [ " edit_limit " ] ,
" compile_warning " : _warn_text ( warn ) ,
} ,
)
@app.post ( " / {run_id} " , response_class = HTMLResponse )
@app.post ( " / d/ {run_id} " , response_class = HTMLResponse )
async def edit_tex ( request : Request , run_id : str , instructions : str = Form ( . . . ) , latex : str = Form ( . . . ) ) :
run_dir = _run_dir_for_id ( run_id )
if run_dir is None :
@ -800,7 +916,7 @@ async def edit_tex(request: Request, run_id: str, instructions: str = Form(...),
instructions = ( instructions or " " ) . strip ( )
if not instructions :
_write_run_error ( run_dir , " Edit instructions are empty. " )
return RedirectResponse ( url = f " { BASE_PATH } / { run_id } " , status_code = 303 )
return RedirectResponse ( url = f " { BASE_PATH } / d/ { run_id } " , status_code = 303 )
tex_path = run_dir / " diagram.tex "
current_tex = latex
@ -825,6 +941,7 @@ async def edit_tex(request: Request, run_id: str, instructions: str = Form(...),
raw_text = " "
new_tex = " "
used_model = " "
summary_text = " "
try :
llm_started = time . monotonic ( )
run_log . line ( " llm.start " )
@ -836,7 +953,7 @@ async def edit_tex(request: Request, run_id: str, instructions: str = Form(...),
] ,
run_log = run_log ,
)
new_tex = ( raw_text or " " ) . strip ( )
new_tex , summary_text = _parse_latex_and_summary ( raw_text or " " )
llm_elapsed_ms = int ( ( time . monotonic ( ) - llm_started ) * 1000 )
logger . info (
" edit.llm.ok run_id= %s elapsed_ms= %s raw_chars= %s tex_chars= %s " ,
@ -861,8 +978,13 @@ async def edit_tex(request: Request, run_id: str, instructions: str = Form(...),
png_path : Optional [ Path ] = None
svg_path : Optional [ Path ] = None
cp_elapsed_ms = 0
cp_reason = " "
if new_tex :
pdf_path , png_path , svg_path , cp_elapsed_ms = _compile_phase ( run_dir , tex_path , run_log , phase = " edit " )
pdf_path , png_path , svg_path , cp_elapsed_ms , cp_reason = _compile_phase (
run_dir , tex_path , run_log , phase = " edit "
)
logger . info (
" edit.compile.done run_id= %s elapsed_ms= %s ok_pdf= %s ok_png= %s ok_svg= %s " ,
run_id ,
@ -872,19 +994,21 @@ async def edit_tex(request: Request, run_id: str, instructions: str = Form(...),
svg_path is not None ,
)
history_status = " ok " if pdf_path else " error "
_append_history_entry (
run_dir ,
{
" ts " : time . time ( ) ,
" ip " : client_ip ,
" action " : " edit " ,
" latex " : new_tex ,
" instructions " : instructions ,
" model " : used_model or ( EDIT_MODELS [ 0 ] if EDIT_MODELS else " " ) ,
" status " : history_status ,
} ,
)
if cp_elapsed_ms > 0 :
history_status = " ok " if pdf_path else " error "
_append_history_entry (
run_dir ,
{
" ts " : time . time ( ) ,
" ip " : client_ip ,
" action " : " edit " ,
" latex " : new_tex ,
" instructions " : instructions ,
" model " : used_model or ( EDIT_MODELS [ 0 ] if EDIT_MODELS else " " ) ,
" status " : history_status ,
" summary " : summary_text ,
} ,
)
run_log . section ( " edit.llm_raw " , raw_text or " " )
@ -941,10 +1065,11 @@ async def edit_tex(request: Request, run_id: str, instructions: str = Form(...),
f " edit.end elapsed_ms= { elapsed_ms } ok_pdf= { pdf_path is not None } ok_png= { png_path is not None } " ,
)
return RedirectResponse ( url = f " { BASE_PATH } / { run_id } " , status_code = 303 )
warn_qs = f " ?warn= { cp_reason } " if cp_elapsed_ms == 0 and cp_reason else " "
return RedirectResponse ( url = f " { BASE_PATH } /d/ { run_id } { warn_qs } " , status_code = 303 )
@app.post ( " / {run_id} /compile " , response_class = HTMLResponse )
@app.post ( " / d/ {run_id} /compile " , response_class = HTMLResponse )
async def compile_tex ( request : Request , run_id : str , latex : str = Form ( . . . ) ) :
run_dir = _run_dir_for_id ( run_id )
if run_dir is None :
@ -964,8 +1089,11 @@ async def compile_tex(request: Request, run_id: str, latex: str = Form(...)):
png_path : Optional [ Path ] = None
svg_path : Optional [ Path ] = None
cp_elapsed_ms = 0
cp_reason = " "
if tex . strip ( ) :
pdf_path , png_path , svg_path , cp_elapsed_ms = _compile_phase (
pdf_path , png_path , svg_path , cp_elapsed_ms , cp_reason = _compile_phase (
run_dir , tex_path , run_log , phase = " compile " , start_msg = " compile.invoke "
)
logger . info (
@ -977,19 +1105,20 @@ async def compile_tex(request: Request, run_id: str, latex: str = Form(...)):
svg_path is not None ,
)
history_status = " ok " if pdf_path else " error "
_append_history_entry (
run_dir ,
{
" ts " : time . time ( ) ,
" ip " : client_ip ,
" action " : " compile " ,
" latex " : tex ,
" instructions " : " " ,
" model " : " " ,
" status " : history_status ,
} ,
)
if cp_elapsed_ms > 0 :
history_status = " ok " if pdf_path else " error "
_append_history_entry (
run_dir ,
{
" ts " : time . time ( ) ,
" ip " : client_ip ,
" action " : " compile " ,
" latex " : tex ,
" instructions " : " " ,
" model " : " " ,
" status " : history_status ,
} ,
)
meta = _read_run_meta ( run_dir )
model_label = _get_meta_string ( meta , " model " )
@ -1028,10 +1157,11 @@ async def compile_tex(request: Request, run_id: str, latex: str = Form(...)):
logger . info ( " compile.end run_id= %s elapsed_ms= %s " , run_id , elapsed_ms )
run_log . line ( f " compile.finish elapsed_ms= { elapsed_ms } " )
return RedirectResponse ( url = f " { BASE_PATH } / { run_id } " , status_code = 303 )
warn_qs = f " ?warn= { cp_reason } " if cp_elapsed_ms == 0 and cp_reason else " "
return RedirectResponse ( url = f " { BASE_PATH } /d/ { run_id } { warn_qs } " , status_code = 303 )
@app.get ( " / {run_id} /history " )
@app.get ( " / d/ {run_id} /history " )
async def get_history ( request : Request , run_id : str ) :
run_dir = _run_dir_for_id ( run_id )
if run_dir is None :
@ -1051,6 +1181,7 @@ async def get_history(request: Request, run_id: str):
" instructions " : entry . get ( " instructions " , " " ) ,
" model " : entry . get ( " model " , " " ) ,
" status " : entry . get ( " status " , " " ) ,
" summary " : entry . get ( " summary " , " " ) ,
}
)