""" FastAPI webapp за RIP Help System. Endpoint-и: GET / — viewer HTML (с опционален ?prefix=&home=) GET /images/{path} — статичен сервинг на картинки от OUTPUT_DIR GET /home-image — home image ако HOME_IMAGE е зададен GET /api/sections — JSON списък със секции (?prefix=) POST /api/keywords/{code} — обновяване на keywords в БД GET /healthz — health check Конфигурация (env vars): HELP_DB_CONN — libpq формат за Postgres OUTPUT_DIR — директорията със секциите и картинките (default: ./data) HOME_IMAGE — път към home картинка (опционален) """ import os, sys, json, re from pathlib import Path from datetime import datetime from typing import Optional import psycopg2 from fastapi import FastAPI, HTTPException, Request, Query from fastapi.responses import HTMLResponse, FileResponse, JSONResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from pydantic import BaseModel # ────────────────────────────────────────────── # Configuration # ────────────────────────────────────────────── CONN_STR = os.getenv( "HELP_DB_CONN", "host=192.168.88.18 port=5432 dbname=rip_help_system user=sa password=Parola~12345!!!" ) OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", Path(__file__).parent.parent / "data")) HOME_IMAGE = os.getenv("HOME_IMAGE") # абсолютен път или None _IMG_PLACEHOLDER_RE = re.compile(r"\[IMG:\s*([^\]]+?)\s*\]") # ────────────────────────────────────────────── # App # ────────────────────────────────────────────── BASE_DIR = Path(__file__).parent app = FastAPI(title="RIP Help System") templates = Jinja2Templates(directory=str(BASE_DIR / "templates")) # Static mount за картинките — само ако директорията съществува _images_dir = OUTPUT_DIR / "images" if _images_dir.exists(): app.mount("/images", StaticFiles(directory=str(_images_dir)), name="images") # ────────────────────────────────────────────── # DB # ────────────────────────────────────────────── def db_conn(): return psycopg2.connect(CONN_STR) def _esc(s: str) -> str: return (str(s or "") .replace("&", "&").replace("<", "<") .replace(">", ">").replace('"', """)) def _text_to_html(text: str) -> str: """Конвертира [IMG: images/foo.png] към .""" parts = [] last = 0 for m in _IMG_PLACEHOLDER_RE.finditer(text): parts.append(_esc(text[last:m.start()])) rel = m.group(1).strip().replace("\\", "/") # Очакваме path като "images/" — превръщаме в URL /images/ fname = rel.split("/", 1)[1] if rel.startswith("images/") else rel parts.append( f'' ) last = m.end() parts.append(_esc(text[last:])) return "".join(parts).replace("\n", "
") def _rich_html_with_images(html: str) -> str: """Подменя [IMG: ...] placeholder-и с ; не escape-ва HTML.""" def sub(m): rel = m.group(1).strip().replace("\\", "/") fname = rel.split("/", 1)[1] if rel.startswith("images/") else rel return (f'') return _IMG_PLACEHOLDER_RE.sub(sub, html) def fetch_sections(prefix: Optional[str] = None) -> list[dict]: conn = db_conn() cur = conn.cursor() if prefix: cur.execute(""" SELECT s.prefix, s.code, s.title, s.keywords, s.char_count, s.source_file, s.output_path, s.updated_at, s.images, s.html_text, f.section_count FROM rip_help_sections s LEFT JOIN rip_help_files f ON f.file_path = s.source_file AND f.prefix = s.prefix WHERE s.prefix = %s ORDER BY s.code """, (prefix,)) else: cur.execute(""" SELECT s.prefix, s.code, s.title, s.keywords, s.char_count, s.source_file, s.output_path, s.updated_at, s.images, s.html_text, f.section_count FROM rip_help_sections s LEFT JOIN rip_help_files f ON f.file_path = s.source_file AND f.prefix = s.prefix ORDER BY s.prefix, s.code """) cols = [c[0] for c in cur.description] rows = [] for r in cur.fetchall(): d = dict(zip(cols, r)) d["updated_at"] = str(d["updated_at"])[:16] if d["updated_at"] else "" try: d["images"] = json.loads(d["images"]) if d.get("images") else [] except Exception: d["images"] = [] # Render rich HTML или fallback към plain text + image placeholders if d.get("html_text"): d["text_html"] = _rich_html_with_images(d["html_text"]) else: # Plain text fallback — четем от output_path ако може body = "" op = d.get("output_path") if op: # Опитваме се да намерим файла в OUTPUT_DIR (по basename) txt_name = Path(op).name txt_path = OUTPUT_DIR / txt_name if txt_path.exists(): try: raw = txt_path.read_text(encoding="utf-8") parts = raw.split("─" * 60, 1) body = parts[1].strip() if len(parts) > 1 else raw except Exception: pass d["text"] = body[:800] d["text_html"] = _text_to_html(body[:1200]) if body else "" rows.append(d) conn.close() return rows # ────────────────────────────────────────────── # Routes # ────────────────────────────────────────────── @app.get("/", response_class=HTMLResponse) def viewer(request: Request, prefix: Optional[str] = Query(None), home: Optional[str] = Query(None)): sections = fetch_sections(prefix) home_url = "/home-image" if (home or HOME_IMAGE) else None return templates.TemplateResponse(request, "viewer.html", { "sections_json": json.dumps(sections, ensure_ascii=False, default=str), "section_count": len(sections), "prefix": prefix or "", "home_url": home_url, "generated": datetime.now().strftime("%d.%m.%Y %H:%M"), }) @app.get("/home-image") def home_image(): img_path = HOME_IMAGE # Ако HOME_IMAGE не е зададен, пробваме Bairaci.png в проекта if not img_path: candidate = BASE_DIR.parent / "Bairaci.png" if candidate.exists(): img_path = str(candidate) if not img_path or not Path(img_path).is_file(): raise HTTPException(404, "home image not configured") return FileResponse(img_path) @app.get("/api/sections") def api_sections(prefix: Optional[str] = Query(None)): return JSONResponse(fetch_sections(prefix)) class KeywordsUpdate(BaseModel): keywords: str @app.post("/api/keywords/{code}") def update_keywords(code: str, body: KeywordsUpdate): conn = db_conn() cur = conn.cursor() cur.execute( "UPDATE rip_help_sections SET keywords=%s, updated_at=NOW() WHERE code=%s", (body.keywords, code) ) if cur.rowcount == 0: conn.close() raise HTTPException(404, f"section {code} not found") conn.commit() conn.close() return {"ok": True, "code": code} @app.get("/healthz") def healthz(): try: conn = db_conn() cur = conn.cursor() cur.execute("SELECT 1") cur.fetchone() conn.close() return {"status": "ok", "db": "ok"} except Exception as e: return JSONResponse({"status": "error", "db": str(e)}, status_code=503)