Backend migration: - Replace pyodbc/SQL Server with psycopg2/PostgreSQL throughout - Rewrite Database class with portable SQL: SERIAL, ON CONFLICT, NOW() - Lowercase table names (rip_help_files, rip_help_sections) - Postgres convention - libpq connection string format in HELP_DB_CONN Webapp (webapp/): - FastAPI app: GET /, GET /images/<f>, GET /home-image, GET /api/sections, POST /api/keywords/<code>, GET /healthz - Jinja2 template extracted from generate_html.py with HTTP image URLs - Direct keyword save to DB (no JSON download detour) - Same prefix scoping as CLI tools (?prefix=RIP) Deployment: - Dockerfile (python:3.12-slim + uvicorn) - docker-compose.yml for local dev - requirements-webapp.txt (minimal, no Windows-only deps) - .dockerignore excludes pipeline scripts and BAT files - README updated with webapp section and Coolify deploy guide Also: switch AI model to claude-haiku-4-5 (~3x cheaper, same quality for this task) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
230 lines
8.9 KiB
Python
230 lines
8.9 KiB
Python
"""
|
||
FastAPI webapp за RIP Help System.
|
||
|
||
Endpoint-и:
|
||
GET / — viewer HTML (с опционален ?prefix=&home=)
|
||
GET /images/{path} — статичен сервинг на картинки от OUTPUT_DIR
|
||
GET /home-image — home image ако HOME_IMAGE е зададен
|
||
GET /api/sections — JSON списък със секции (?prefix=)
|
||
POST /api/keywords/{code} — обновяване на keywords в БД
|
||
GET /healthz — health check
|
||
|
||
Конфигурация (env vars):
|
||
HELP_DB_CONN — libpq формат за Postgres
|
||
OUTPUT_DIR — директорията със секциите и картинките (default: ./data)
|
||
HOME_IMAGE — път към home картинка (опционален)
|
||
"""
|
||
|
||
import os, sys, json, re
|
||
from pathlib import Path
|
||
from datetime import datetime
|
||
from typing import Optional
|
||
|
||
import psycopg2
|
||
from fastapi import FastAPI, HTTPException, Request, Query
|
||
from fastapi.responses import HTMLResponse, FileResponse, JSONResponse
|
||
from fastapi.staticfiles import StaticFiles
|
||
from fastapi.templating import Jinja2Templates
|
||
from pydantic import BaseModel
|
||
|
||
|
||
# ──────────────────────────────────────────────
|
||
# Configuration
|
||
# ──────────────────────────────────────────────
|
||
|
||
CONN_STR = os.getenv(
|
||
"HELP_DB_CONN",
|
||
"host=192.168.88.18 port=5432 dbname=rip_help_system user=sa password=Parola~12345!!!"
|
||
)
|
||
OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", Path(__file__).parent.parent / "data"))
|
||
HOME_IMAGE = os.getenv("HOME_IMAGE") # абсолютен път или None
|
||
|
||
|
||
_IMG_PLACEHOLDER_RE = re.compile(r"\[IMG:\s*([^\]]+?)\s*\]")
|
||
|
||
|
||
# ──────────────────────────────────────────────
|
||
# App
|
||
# ──────────────────────────────────────────────
|
||
|
||
BASE_DIR = Path(__file__).parent
|
||
app = FastAPI(title="RIP Help System")
|
||
templates = Jinja2Templates(directory=str(BASE_DIR / "templates"))
|
||
|
||
# Static mount за картинките — само ако директорията съществува
|
||
_images_dir = OUTPUT_DIR / "images"
|
||
if _images_dir.exists():
|
||
app.mount("/images", StaticFiles(directory=str(_images_dir)), name="images")
|
||
|
||
|
||
# ──────────────────────────────────────────────
|
||
# DB
|
||
# ──────────────────────────────────────────────
|
||
|
||
def db_conn():
|
||
return psycopg2.connect(CONN_STR)
|
||
|
||
|
||
def _esc(s: str) -> str:
|
||
return (str(s or "")
|
||
.replace("&", "&").replace("<", "<")
|
||
.replace(">", ">").replace('"', """))
|
||
|
||
|
||
def _text_to_html(text: str) -> str:
|
||
"""Конвертира [IMG: images/foo.png] към <img src="/images/foo.png">."""
|
||
parts = []
|
||
last = 0
|
||
for m in _IMG_PLACEHOLDER_RE.finditer(text):
|
||
parts.append(_esc(text[last:m.start()]))
|
||
rel = m.group(1).strip().replace("\\", "/")
|
||
# Очакваме path като "images/<filename>" — превръщаме в URL /images/<filename>
|
||
fname = rel.split("/", 1)[1] if rel.startswith("images/") else rel
|
||
parts.append(
|
||
f'<img src="/images/{_esc(fname)}" alt="" '
|
||
f'style="max-width:100%;max-height:240px;display:block;margin:8px 0;'
|
||
f'border:1px solid #d8dce3;border-radius:6px">'
|
||
)
|
||
last = m.end()
|
||
parts.append(_esc(text[last:]))
|
||
return "".join(parts).replace("\n", "<br>")
|
||
|
||
|
||
def _rich_html_with_images(html: str) -> str:
|
||
"""Подменя [IMG: ...] placeholder-и с <img src="/images/...">; не escape-ва HTML."""
|
||
def sub(m):
|
||
rel = m.group(1).strip().replace("\\", "/")
|
||
fname = rel.split("/", 1)[1] if rel.startswith("images/") else rel
|
||
return (f'<img src="/images/{_esc(fname)}" alt="" '
|
||
f'style="max-width:100%;max-height:240px;display:block;margin:8px 0;'
|
||
f'border:1px solid #d8dce3;border-radius:6px">')
|
||
return _IMG_PLACEHOLDER_RE.sub(sub, html)
|
||
|
||
|
||
def fetch_sections(prefix: Optional[str] = None) -> list[dict]:
|
||
conn = db_conn()
|
||
cur = conn.cursor()
|
||
if prefix:
|
||
cur.execute("""
|
||
SELECT s.prefix, s.code, s.title, s.keywords, s.char_count,
|
||
s.source_file, s.output_path, s.updated_at,
|
||
s.images, s.html_text, f.section_count
|
||
FROM rip_help_sections s
|
||
LEFT JOIN rip_help_files f
|
||
ON f.file_path = s.source_file AND f.prefix = s.prefix
|
||
WHERE s.prefix = %s
|
||
ORDER BY s.code
|
||
""", (prefix,))
|
||
else:
|
||
cur.execute("""
|
||
SELECT s.prefix, s.code, s.title, s.keywords, s.char_count,
|
||
s.source_file, s.output_path, s.updated_at,
|
||
s.images, s.html_text, f.section_count
|
||
FROM rip_help_sections s
|
||
LEFT JOIN rip_help_files f
|
||
ON f.file_path = s.source_file AND f.prefix = s.prefix
|
||
ORDER BY s.prefix, s.code
|
||
""")
|
||
cols = [c[0] for c in cur.description]
|
||
rows = []
|
||
for r in cur.fetchall():
|
||
d = dict(zip(cols, r))
|
||
d["updated_at"] = str(d["updated_at"])[:16] if d["updated_at"] else ""
|
||
try:
|
||
d["images"] = json.loads(d["images"]) if d.get("images") else []
|
||
except Exception:
|
||
d["images"] = []
|
||
# Render rich HTML или fallback към plain text + image placeholders
|
||
if d.get("html_text"):
|
||
d["text_html"] = _rich_html_with_images(d["html_text"])
|
||
else:
|
||
# Plain text fallback — четем от output_path ако може
|
||
body = ""
|
||
op = d.get("output_path")
|
||
if op:
|
||
# Опитваме се да намерим файла в OUTPUT_DIR (по basename)
|
||
txt_name = Path(op).name
|
||
txt_path = OUTPUT_DIR / txt_name
|
||
if txt_path.exists():
|
||
try:
|
||
raw = txt_path.read_text(encoding="utf-8")
|
||
parts = raw.split("─" * 60, 1)
|
||
body = parts[1].strip() if len(parts) > 1 else raw
|
||
except Exception:
|
||
pass
|
||
d["text"] = body[:800]
|
||
d["text_html"] = _text_to_html(body[:1200]) if body else ""
|
||
rows.append(d)
|
||
conn.close()
|
||
return rows
|
||
|
||
|
||
# ──────────────────────────────────────────────
|
||
# Routes
|
||
# ──────────────────────────────────────────────
|
||
|
||
@app.get("/", response_class=HTMLResponse)
|
||
def viewer(request: Request,
|
||
prefix: Optional[str] = Query(None),
|
||
home: Optional[str] = Query(None)):
|
||
sections = fetch_sections(prefix)
|
||
home_url = "/home-image" if (home or HOME_IMAGE) else None
|
||
return templates.TemplateResponse(request, "viewer.html", {
|
||
"sections_json": json.dumps(sections, ensure_ascii=False, default=str),
|
||
"section_count": len(sections),
|
||
"prefix": prefix or "",
|
||
"home_url": home_url,
|
||
"generated": datetime.now().strftime("%d.%m.%Y %H:%M"),
|
||
})
|
||
|
||
|
||
@app.get("/home-image")
|
||
def home_image():
|
||
img_path = HOME_IMAGE
|
||
# Ако HOME_IMAGE не е зададен, пробваме Bairaci.png в проекта
|
||
if not img_path:
|
||
candidate = BASE_DIR.parent / "Bairaci.png"
|
||
if candidate.exists():
|
||
img_path = str(candidate)
|
||
if not img_path or not Path(img_path).is_file():
|
||
raise HTTPException(404, "home image not configured")
|
||
return FileResponse(img_path)
|
||
|
||
|
||
@app.get("/api/sections")
|
||
def api_sections(prefix: Optional[str] = Query(None)):
|
||
return JSONResponse(fetch_sections(prefix))
|
||
|
||
|
||
class KeywordsUpdate(BaseModel):
|
||
keywords: str
|
||
|
||
|
||
@app.post("/api/keywords/{code}")
|
||
def update_keywords(code: str, body: KeywordsUpdate):
|
||
conn = db_conn()
|
||
cur = conn.cursor()
|
||
cur.execute(
|
||
"UPDATE rip_help_sections SET keywords=%s, updated_at=NOW() WHERE code=%s",
|
||
(body.keywords, code)
|
||
)
|
||
if cur.rowcount == 0:
|
||
conn.close()
|
||
raise HTTPException(404, f"section {code} not found")
|
||
conn.commit()
|
||
conn.close()
|
||
return {"ok": True, "code": code}
|
||
|
||
|
||
@app.get("/healthz")
|
||
def healthz():
|
||
try:
|
||
conn = db_conn()
|
||
cur = conn.cursor()
|
||
cur.execute("SELECT 1")
|
||
cur.fetchone()
|
||
conn.close()
|
||
return {"status": "ok", "db": "ok"}
|
||
except Exception as e:
|
||
return JSONResponse({"status": "error", "db": str(e)}, status_code=503)
|