Files
rip-help-system/help_processor.py
Sabo Sabev 9613420d1d Migrate to PostgreSQL + add FastAPI webapp for Coolify deploy
Backend migration:
- Replace pyodbc/SQL Server with psycopg2/PostgreSQL throughout
- Rewrite Database class with portable SQL: SERIAL, ON CONFLICT, NOW()
- Lowercase table names (rip_help_files, rip_help_sections) - Postgres convention
- libpq connection string format in HELP_DB_CONN

Webapp (webapp/):
- FastAPI app: GET /, GET /images/<f>, GET /home-image, GET /api/sections,
  POST /api/keywords/<code>, GET /healthz
- Jinja2 template extracted from generate_html.py with HTTP image URLs
- Direct keyword save to DB (no JSON download detour)
- Same prefix scoping as CLI tools (?prefix=RIP)

Deployment:
- Dockerfile (python:3.12-slim + uvicorn)
- docker-compose.yml for local dev
- requirements-webapp.txt (minimal, no Windows-only deps)
- .dockerignore excludes pipeline scripts and BAT files
- README updated with webapp section and Coolify deploy guide

Also: switch AI model to claude-haiku-4-5 (~3x cheaper, same quality for this task)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 17:00:44 +03:00

1094 lines
40 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
help_processor.py
=================
Обработва help-файлове (.doc, .docx, .html, .htm, .txt, .pdf),
декомпозира ги на смислови секции, извлича ключови думи чрез Anthropic API
и записва резултатите в SQL Server + изходна директория.
Поддържа инкрементална обработка: файлове, чийто hash не се е променил,
се прескачат при повторно пускане.
Изисквания (pip install):
pip install anthropic pyodbc python-docx beautifulsoup4 lxml
pip install pdfplumber striprtf chardet
pip install pywin32 # за MS Word fallback на Windows
За .doc (стар формат) е необходим един от:
- LibreOffice (soffice в PATH) — кросплатформено
- MS Word — Windows, чрез pywin32 COM (автоматичен fallback)
- antiword — Linux (apt install antiword)
"""
import os
import re
import sys
import json
import hashlib
import logging
import argparse
import subprocess
import tempfile
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass, field
from typing import Optional
import psycopg2
import anthropic
from docx import Document
from bs4 import BeautifulSoup
try:
import pdfplumber
HAS_PDF = True
except ImportError:
HAS_PDF = False
try:
from PIL import Image
HAS_PIL = True
except ImportError:
HAS_PIL = False
# ──────────────────────────────────────────────
# Конфигурация
# ──────────────────────────────────────────────
# На Windows конзолата често е cp1251 → пренастройваме stdout на utf-8
try:
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
sys.stderr.reconfigure(encoding="utf-8", errors="replace")
except AttributeError:
pass
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("help_processor.log", encoding="utf-8"),
],
)
log = logging.getLogger(__name__)
MIN_SECTION_TOKENS = 60 # секции под тази граница се сливат с предишната
MAX_AI_CHARS = 4000 # максимален текст, изпращан към Claude за класификация
AI_MODEL = "claude-haiku-4-5"
MIN_IMAGE_PX = 50 # картинки под NxN px се пропускат (иконки/булети)
# ──────────────────────────────────────────────
# Изображения — помощни
# ──────────────────────────────────────────────
@dataclass
class ImageRef:
placeholder: str # вътрешен ID в текста, напр. "img_01"
data: bytes
ext: str # "png", "jpg", "gif"...
def _img_dimensions(data: bytes) -> Optional[tuple[int, int]]:
if not HAS_PIL:
return None
try:
from io import BytesIO
with Image.open(BytesIO(data)) as im:
return im.size
except Exception:
return None
def _should_keep_image(data: bytes) -> bool:
"""Връща False за дребни иконки/булети под MIN_IMAGE_PX × MIN_IMAGE_PX."""
if not data:
return False
dims = _img_dimensions(data)
if dims is None:
# Не можем да преценим — пазим по подразбиране
return True
w, h = dims
return w >= MIN_IMAGE_PX and h >= MIN_IMAGE_PX
def _ext_from_content_type(ct: str) -> str:
ct = (ct or "").lower()
if "png" in ct: return "png"
if "jpeg" in ct or "jpg" in ct: return "jpg"
if "gif" in ct: return "gif"
if "bmp" in ct: return "bmp"
if "svg" in ct: return "svg"
if "webp" in ct: return "webp"
return "png"
_IMG_PLACEHOLDER_RE = re.compile(r"\[IMG:\s*([A-Za-z0-9_./\\-]+)\s*\]")
# ──────────────────────────────────────────────
# Структури
# ──────────────────────────────────────────────
@dataclass
class Section:
title: str
text: str
level: int = 1 # 1=H1, 2=H2, 3=H3, 0=без заглавие
images: list = field(default_factory=list) # list[ImageRef]
html_text: Optional[str] = None # rich HTML с [IMG: ...] placeholders
@dataclass
class ProcessedSection:
code: str # DOC_003_SEC_012
source_file: str
title: str
keywords: str # "кл1, кл2, кл3"
text: str
images_json: str = "[]" # JSON масив с относителни пътища
html_text: str = "" # rich HTML (само за HTML-source файлове)
char_count: int = 0
def __post_init__(self):
self.char_count = len(self.text)
# ──────────────────────────────────────────────
# База данни
# ──────────────────────────────────────────────
class Database:
"""PostgreSQL backend (psycopg2). Connection string е libpq формат:
'host=... port=... dbname=... user=... password=...'
"""
def __init__(self, conn_str: str):
self.conn_str = conn_str
self.conn = psycopg2.connect(conn_str)
self._ensure_schema()
def _ensure_schema(self):
"""Създава таблиците ако не съществуват (Postgres syntax)."""
cur = self.conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS rip_help_files (
id SERIAL PRIMARY KEY,
prefix VARCHAR(50) NOT NULL DEFAULT 'HLP',
file_path VARCHAR(1000) NOT NULL,
file_hash CHAR(64) NOT NULL,
processed_at TIMESTAMP NOT NULL DEFAULT NOW(),
section_count INTEGER NOT NULL DEFAULT 0,
UNIQUE (prefix, file_path)
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS rip_help_sections (
id SERIAL PRIMARY KEY,
prefix VARCHAR(50) NOT NULL DEFAULT 'HLP',
code VARCHAR(80) NOT NULL UNIQUE,
source_file VARCHAR(1000) NOT NULL,
title VARCHAR(500),
keywords VARCHAR(300),
char_count INTEGER,
output_path VARCHAR(1000),
images TEXT,
html_text TEXT,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
)
""")
cur.execute("""
CREATE INDEX IF NOT EXISTS ix_rip_help_sections_keywords
ON rip_help_sections(keywords)
""")
cur.execute("""
CREATE INDEX IF NOT EXISTS ix_rip_help_sections_prefix
ON rip_help_sections(prefix)
""")
self.conn.commit()
log.info("Схемата е проверена / създадена.")
def get_file_hash(self, prefix: str, file_path: str) -> Optional[str]:
cur = self.conn.cursor()
cur.execute(
"SELECT file_hash FROM rip_help_files WHERE prefix=%s AND file_path=%s",
(prefix, file_path)
)
row = cur.fetchone()
return row[0] if row else None
def upsert_file(self, prefix: str, file_path: str, file_hash: str, section_count: int):
cur = self.conn.cursor()
cur.execute("""
INSERT INTO rip_help_files (prefix, file_path, file_hash, section_count)
VALUES (%s, %s, %s, %s)
ON CONFLICT (prefix, file_path) DO UPDATE SET
file_hash = EXCLUDED.file_hash,
section_count= EXCLUDED.section_count,
processed_at = NOW()
""", (prefix, file_path, file_hash, section_count))
self.conn.commit()
def delete_sections_for_file(self, prefix: str, file_path: str):
cur = self.conn.cursor()
cur.execute(
"DELETE FROM rip_help_sections WHERE prefix=%s AND source_file=%s",
(prefix, file_path)
)
self.conn.commit()
def all_source_files(self, prefix: str) -> list[str]:
"""Връща всички source_file пътища за даден префикс."""
cur = self.conn.cursor()
cur.execute("""
SELECT file_path FROM rip_help_files WHERE prefix=%s
UNION
SELECT source_file FROM rip_help_sections WHERE prefix=%s
""", (prefix, prefix))
return [r[0] for r in cur.fetchall()]
def section_output_paths_for(self, prefix: str, source_files: list[str]) -> list[str]:
if not source_files:
return []
cur = self.conn.cursor()
cur.execute(
"SELECT output_path FROM rip_help_sections "
"WHERE prefix=%s AND source_file = ANY(%s)",
(prefix, list(source_files))
)
return [r[0] for r in cur.fetchall() if r[0]]
def purge_sources(self, prefix: str, source_files: list[str]) -> int:
if not source_files:
return 0
cur = self.conn.cursor()
cur.execute(
"DELETE FROM rip_help_sections "
"WHERE prefix=%s AND source_file = ANY(%s)",
(prefix, list(source_files))
)
sec_deleted = cur.rowcount
cur.execute(
"DELETE FROM rip_help_files "
"WHERE prefix=%s AND file_path = ANY(%s)",
(prefix, list(source_files))
)
self.conn.commit()
return sec_deleted
def insert_section(self, prefix: str, ps: ProcessedSection, output_path: str):
cur = self.conn.cursor()
cur.execute("""
INSERT INTO rip_help_sections
(prefix, code, source_file, title, keywords,
char_count, output_path, images, html_text)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (code) DO UPDATE SET
prefix = EXCLUDED.prefix,
source_file = EXCLUDED.source_file,
title = EXCLUDED.title,
keywords = EXCLUDED.keywords,
char_count = EXCLUDED.char_count,
output_path = EXCLUDED.output_path,
images = EXCLUDED.images,
html_text = EXCLUDED.html_text,
updated_at = NOW()
""", (prefix, ps.code, ps.source_file, ps.title, ps.keywords,
ps.char_count, output_path, ps.images_json, ps.html_text))
self.conn.commit()
def close(self):
self.conn.close()
# ──────────────────────────────────────────────
# Парсъри
# ──────────────────────────────────────────────
def file_hash(path: Path) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()
def _load_html_image(src: str, base_dir: Path) -> Optional[tuple[bytes, str]]:
"""Връща (data, ext) или None. Пропуска HTTP/HTTPS."""
if not src:
return None
s = src.strip()
if s.startswith("data:"):
# data:image/png;base64,XXXX
m = re.match(r"data:([^;]+);base64,(.+)$", s, re.DOTALL)
if not m:
return None
import base64
try:
data = base64.b64decode(m.group(2))
except Exception:
return None
return data, _ext_from_content_type(m.group(1))
if s.startswith(("http://", "https://")):
return None # по правило пропускаме мрежови картинки
# локален път, относителен или абсолютен
p = (base_dir / s).resolve() if not Path(s).is_absolute() else Path(s)
try:
if p.is_file():
data = p.read_bytes()
ext = p.suffix.lstrip(".").lower() or "png"
return data, ext
except Exception:
return None
return None
def _detect_html_encoding(raw: bytes) -> str:
"""Връща име на encoding: BOM → chardet → fallback (utf-8 ако ASCII, иначе windows-1251)."""
# BOM-и
if raw.startswith(b"\xef\xbb\xbf"):
return "utf-8"
if raw.startswith((b"\xff\xfe", b"\xfe\xff")):
return "utf-16"
# chardet
try:
import chardet
det = chardet.detect(raw[:65536]) or {}
enc = (det.get("encoding") or "").lower()
conf = det.get("confidence", 0) or 0
if enc and conf >= 0.6:
# нормализиране на често срещани имена
if enc in ("cp1251", "ms-cyrl", "windows-1251"):
return "windows-1251"
if enc.startswith("utf"):
return enc
return enc
except Exception:
pass
# fallback: ако байтовете изглеждат "над 127" (т.е. има не-ASCII), приемаме CP1251
if any(b > 127 for b in raw[:8192]):
return "windows-1251"
return "utf-8"
_HTML_BLOCK_TAGS = ["h1", "h2", "h3", "h4", "h5", "h6",
"p", "ul", "ol", "table", "dl", "pre",
"blockquote", "figure", "hr"]
_HTML_DROP_ATTRS = ("class", "style", "id", "lang", "dir", "align",
"valign", "width", "height", "bgcolor", "border")
def _strip_attrs(el):
"""Премахва decorative атрибути (class, style, on*, data-*)."""
for t in el.find_all(True):
for a in list(t.attrs):
if a in _HTML_DROP_ATTRS or a.startswith("on") or a.startswith("data-"):
del t[a]
def _swap_imgs_in_block(el, base_dir: Path, sec_images: list, img_counter: list) -> None:
"""Намира всички <img> в подадения елемент, извлича данните и подменя с
NavigableString placeholder ([IMG: img_NN])."""
from bs4 import NavigableString
for img in el.find_all("img"):
src = img.get("src") or img.get("data-src") or ""
loaded = _load_html_image(src, base_dir)
if not loaded:
img.decompose()
continue
data, ext = loaded
if not _should_keep_image(data):
img.decompose()
continue
img_counter[0] += 1
ref = ImageRef(placeholder=f"img_{img_counter[0]:02d}", data=data, ext=ext)
sec_images.append(ref)
img.replace_with(NavigableString(f"[IMG: {ref.placeholder}]"))
def parse_html(path: Path) -> list[Section]:
raw = path.read_bytes()
enc = _detect_html_encoding(raw)
log.debug(f" {path.name} encoding: {enc}")
try:
soup = BeautifulSoup(raw, "lxml", from_encoding=enc)
except Exception:
soup = BeautifulSoup(raw, "lxml")
# Премахваме скриптове и стилове
for tag in soup(["script", "style", "nav", "footer", "header", "noscript"]):
tag.decompose()
base_dir = path.parent
body = soup.body or soup
heading_map = {"h1": 1, "h2": 2, "h3": 3, "h4": 3, "h5": 3, "h6": 3}
# Събираме top-level блокови елементи (без да включваме вложените в тях)
consumed = set()
blocks = []
for el in body.find_all(_HTML_BLOCK_TAGS + ["img"]):
if any(id(par) in consumed for par in el.parents):
continue
consumed.add(id(el))
blocks.append(el)
sections: list[Section] = []
current_title = ""
current_level = 1
sec_text: list[str] = []
sec_html: list[str] = []
sec_images: list[ImageRef] = []
img_counter = [0]
def flush():
if sec_text or sec_html or sec_images:
sec = Section(current_title, "\n".join(sec_text), current_level)
sec.images = list(sec_images)
sec.html_text = "\n".join(sec_html) if sec_html else None
sections.append(sec)
for el in blocks:
if el.name in heading_map:
txt = el.get_text(" ", strip=True)
if not txt:
continue
flush()
current_title = txt
current_level = heading_map[el.name]
sec_text, sec_html, sec_images = [], [], []
continue
if el.name == "img":
# самостоятелен <img> (не вътре в блок)
_swap_imgs_in_block(el.parent if el.parent and el.parent.name else el,
base_dir, sec_images, img_counter)
# ако е заменен с placeholder, добавяме като текст
txt = el.get_text(" ", strip=True) if el.name else ""
if txt:
sec_text.append(txt)
sec_html.append(f"<p>{txt}</p>")
continue
_swap_imgs_in_block(el, base_dir, sec_images, img_counter)
_strip_attrs(el)
txt = el.get_text(" ", strip=True)
if txt:
sec_text.append(txt)
try:
sec_html.append(str(el))
except Exception:
pass
flush()
if not sections:
plain = body.get_text(" ", strip=True)
return [Section("", plain, 0)]
return sections
def _extract_docx_paragraph_images(para, doc) -> list[ImageRef]:
"""Намира drawing-и в параграф; връща ImageRef-и за филтрираните по размер."""
from docx.oxml.ns import qn
imgs: list[ImageRef] = []
try:
blips = para._element.findall(".//" + qn("a:blip"))
except Exception:
return imgs
embed_attr = qn("r:embed")
for blip in blips:
rId = blip.get(embed_attr)
if not rId:
continue
try:
part = doc.part.related_parts[rId]
data = part.blob
ct = getattr(part, "content_type", "") or ""
except Exception:
continue
if not _should_keep_image(data):
continue
ext = _ext_from_content_type(ct)
imgs.append(ImageRef(placeholder=f"__IMG_{len(imgs)+1}__", data=data, ext=ext))
return imgs
def parse_docx(path: Path) -> list[Section]:
doc = Document(path)
sections: list[Section] = []
current_title, current_level = "", 1
buf: list[str] = []
sec_images: list[ImageRef] = []
img_counter = [0] # списък за nonlocal-стил мутация
HEADING_STYLES = {"heading 1": 1, "heading 2": 2, "heading 3": 3,
"title": 1, "subtitle": 2}
def flush():
if buf or sec_images:
sec = Section(current_title, "\n".join(buf), current_level)
sec.images = list(sec_images)
sections.append(sec)
for para in doc.paragraphs:
style_name = para.style.name.lower() if para.style else ""
text = para.text.strip()
para_imgs = _extract_docx_paragraph_images(para, doc)
if not text and not para_imgs:
continue
level = HEADING_STYLES.get(style_name)
is_bold_heading = bool(text and len(text) < 120 and not style_name.startswith("list")
and para.runs
and all(run.bold for run in para.runs if run.text.strip()))
if level or (is_bold_heading and not para_imgs):
flush()
buf, sec_images = [], []
current_title = text
current_level = level or 2
continue
if text:
buf.append(text)
for im in para_imgs:
img_counter[0] += 1
im.placeholder = f"img_{img_counter[0]:02d}"
sec_images.append(im)
buf.append(f"[IMG: {im.placeholder}]")
flush()
if not sections:
fallback_text = "\n".join(p.text for p in doc.paragraphs if p.text.strip())
return [Section("", fallback_text, 0)]
return sections
def _convert_doc_with_libreoffice(path: Path, out_dir: Path) -> Optional[Path]:
try:
subprocess.run(
["soffice", "--headless", "--convert-to", "docx",
"--outdir", str(out_dir), str(path)],
check=True, capture_output=True, timeout=60
)
except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired) as e:
log.debug(f"LibreOffice конверсия неуспешна: {e}")
return None
out = list(out_dir.glob("*.docx"))
return out[0] if out else None
def _convert_doc_with_word(path: Path, out_dir: Path) -> Optional[Path]:
"""Fallback: ползва MS Word през COM на Windows."""
try:
import win32com.client # noqa: F401
import pythoncom
except ImportError:
log.debug("pywin32 не е инсталиран — MS Word fallback недостъпен.")
return None
import win32com.client as wcc
pythoncom.CoInitialize()
word = None
doc = None
try:
word = wcc.DispatchEx("Word.Application")
word.Visible = False
word.DisplayAlerts = False
doc = word.Documents.Open(str(path.resolve()), ReadOnly=True)
out_path = out_dir / (path.stem + ".docx")
# FileFormat=16 → wdFormatXMLDocument (.docx)
doc.SaveAs2(str(out_path.resolve()), FileFormat=16)
return out_path if out_path.exists() else None
except Exception as e:
log.debug(f"MS Word конверсия неуспешна: {e}")
return None
finally:
try:
if doc is not None:
doc.Close(SaveChanges=False)
except Exception:
pass
try:
if word is not None:
word.Quit()
except Exception:
pass
pythoncom.CoUninitialize()
def parse_doc_old(path: Path) -> list[Section]:
"""Конвертира стар .doc до .docx чрез LibreOffice или MS Word, после парси."""
with tempfile.TemporaryDirectory() as tmp:
tmp_dir = Path(tmp)
converted = _convert_doc_with_libreoffice(path, tmp_dir)
engine = "LibreOffice"
if not converted:
converted = _convert_doc_with_word(path, tmp_dir)
engine = "MS Word"
if not converted:
log.warning(
f"Нито LibreOffice, нито MS Word успяха да конвертират {path.name}. "
f"Пробваме като текст."
)
return parse_txt(path)
log.info(f" {path.name} конвертиран чрез {engine}")
return parse_docx(converted)
def _render_pdf_image(page, img_info, resolution: int = 150) -> Optional[bytes]:
"""Кропва картинката от PDF страницата и я записва като PNG bytes."""
try:
x0 = float(img_info.get("x0", 0))
x1 = float(img_info.get("x1", 0))
top = float(img_info.get("top", img_info.get("y0", 0)))
bot = float(img_info.get("bottom", img_info.get("y1", 0)))
if x1 <= x0 or bot <= top:
return None
# ограничаваме до страницата (pdfplumber иначе хвърля)
x0 = max(0, x0); top = max(0, top)
x1 = min(page.width, x1); bot = min(page.height, bot)
if x1 - x0 < 1 or bot - top < 1:
return None
cropped = page.crop((x0, top, x1, bot))
pil = cropped.to_image(resolution=resolution).original
from io import BytesIO
buf = BytesIO()
pil.save(buf, format="PNG")
return buf.getvalue()
except Exception as e:
log.debug(f"PDF image render failed: {e}")
return None
def parse_pdf(path: Path) -> list[Section]:
if not HAS_PDF:
log.warning("pdfplumber не е инсталиран. PDF се прескача.")
return []
sections: list[Section] = []
current_title = ""
buf: list[str] = []
sec_images: list[ImageRef] = []
img_counter = [0]
prev_size = None
def flush():
if buf or sec_images:
sec = Section(current_title, "\n".join(buf), 2)
sec.images = list(sec_images)
sections.append(sec)
with pdfplumber.open(path) as pdf:
for page in pdf.pages:
# Картинките за страницата (сортирани по y отгоре надолу)
page_images = sorted(
page.images or [],
key=lambda im: float(im.get("top", im.get("y0", 0)))
)
img_queue = []
for im in page_images:
data = _render_pdf_image(page, im)
if not data or not _should_keep_image(data):
continue
img_queue.append((float(im.get("top", 0)), data))
words = page.extract_words(extra_attrs=["size"])
line_buf, line_size = [], None
def emit_images_before(y: float):
while img_queue and img_queue[0][0] <= y:
_, data = img_queue.pop(0)
img_counter[0] += 1
ref = ImageRef(placeholder=f"img_{img_counter[0]:02d}",
data=data, ext="png")
sec_images.append(ref)
buf.append(f"[IMG: {ref.placeholder}]")
for w in words:
sz = round(float(w.get("size", 10)), 1)
y = float(w.get("top", 0))
if line_size is None:
line_size = sz
if abs(sz - line_size) > 1:
line_text = " ".join(line_buf).strip()
if line_text:
if line_size > (prev_size or 10) + 1 and len(line_text) < 150:
flush()
buf, sec_images = [], []
current_title = line_text
else:
emit_images_before(y)
buf.append(line_text)
prev_size = line_size
line_buf, line_size = [w["text"]], sz
else:
line_buf.append(w["text"])
if line_buf:
emit_images_before(page.height)
buf.append(" ".join(line_buf))
# картинките след всичкия текст на страницата
emit_images_before(page.height + 1)
flush()
return sections or [Section("", "", 0)]
def parse_txt(path: Path) -> list[Section]:
import chardet
raw = path.read_bytes()
enc = chardet.detect(raw)["encoding"] or "utf-8"
text = raw.decode(enc, errors="replace")
return [Section("", text, 0)]
PARSERS = {
".html": parse_html,
".htm": parse_html,
".docx": parse_docx,
".doc": parse_doc_old,
".txt": parse_txt,
".pdf": parse_pdf,
}
# ──────────────────────────────────────────────
# Сегментиране и почистване
# ──────────────────────────────────────────────
def merge_short_sections(sections: list[Section]) -> list[Section]:
"""Слива секции, по-кратки от MIN_SECTION_TOKENS думи, с предишната."""
result: list[Section] = []
for sec in sections:
words = len(sec.text.split())
if result and words < MIN_SECTION_TOKENS:
prev = result[-1]
merged = Section(
prev.title,
prev.text + "\n" + sec.text,
prev.level,
)
merged.images = (prev.images or []) + (sec.images or [])
html_parts = [h for h in (prev.html_text, sec.html_text) if h]
merged.html_text = "\n".join(html_parts) if html_parts else None
result[-1] = merged
else:
result.append(sec)
return result
def clean_text(text: str) -> str:
text = re.sub(r"\s+", " ", text)
text = re.sub(r" {2,}", " ", text)
return text.strip()
# ──────────────────────────────────────────────
# AI класификация
# ──────────────────────────────────────────────
def classify_section(client: anthropic.Anthropic, title: str, text: str) -> tuple[str, str]:
"""Връща (наименование, 'кл1, кл2, кл3') чрез Claude."""
snippet = text[:MAX_AI_CHARS]
prompt = f"""Анализирай следната секция от help-документация и върни JSON обект с два ключа:
- "title": кратко наименование на секцията (до 8 думи, на езика на текста)
- "keywords": списък от до 5 ключови думи/фрази, разделени със запетая (на езика на текста)
Съществуващо заглавие (може да е празно): {title!r}
Текст:
{snippet}
Върни САМО валиден JSON без markdown, без коментари."""
msg = client.messages.create(
model=AI_MODEL,
max_tokens=200,
messages=[{"role": "user", "content": prompt}]
)
raw = msg.content[0].text.strip()
raw = re.sub(r"^```[a-z]*\n?", "", raw)
raw = re.sub(r"\n?```$", "", raw)
try:
data = json.loads(raw)
t = str(data.get("title", title or "Секция"))[:200]
k = str(data.get("keywords", ""))[:300]
return t, k
except json.JSONDecodeError:
log.warning(f"AI върна невалиден JSON: {raw[:120]}")
return title or "Секция", ""
# ──────────────────────────────────────────────
# Генериране на кодове
# ──────────────────────────────────────────────
def make_code(prefix: str, file_index: int, sec_index: int) -> str:
return f"{prefix}_{file_index:04d}_SEC_{sec_index:04d}"
# ──────────────────────────────────────────────
# Основна обработка
# ──────────────────────────────────────────────
def process_file(
path: Path,
file_index: int,
db: Database,
client: anthropic.Anthropic,
output_dir: Path,
prefix: str = "HLP",
force: bool = False,
) -> int:
"""Обработва един файл. Връща броя записани секции (0 = пропуснат)."""
rel = str(path)
fh = file_hash(path)
if not force:
stored = db.get_file_hash(prefix, rel)
if stored == fh:
log.info(f" [SKIP] {path.name} (непроменен)")
return 0
log.info(f" [PROC] {path.name}")
ext = path.suffix.lower()
parser = PARSERS.get(ext)
if not parser:
log.warning(f" Неподдържан формат: {ext}")
return 0
try:
sections = parser(path)
except Exception as e:
log.error(f" Грешка при парсване: {e}")
return 0
sections = merge_short_sections(sections)
# Изтриваме старите секции за файла при повторна обработка
db.delete_sections_for_file(prefix, rel)
images_dir = output_dir / "images"
images_dir.mkdir(parents=True, exist_ok=True)
saved = 0
for i, sec in enumerate(sections, 1):
text = clean_text(sec.text)
html_text = sec.html_text or ""
if not text and not sec.images and not html_text:
continue
code = make_code(prefix, file_index, i)
# Записваме картинките на диск и заменяме placeholder-ите в текста + HTML
image_rel_paths: list[str] = []
for ref in sec.images or []:
fname = f"{code}_{ref.placeholder}.{ref.ext}"
disk_path = images_dir / fname
try:
disk_path.write_bytes(ref.data)
except Exception as e:
log.warning(f" Грешка при запис на картинка {fname}: {e}")
continue
rel_path = f"images/{fname}"
image_rel_paths.append(rel_path)
old_ph = f"[IMG: {ref.placeholder}]"
new_ph = f"[IMG: {rel_path}]"
text = text.replace(old_ph, new_ph)
html_text = html_text.replace(old_ph, new_ph)
# Премахваме placeholder-и, останали без файл
text = _IMG_PLACEHOLDER_RE.sub(
lambda m: m.group(0) if "/" in m.group(1) or "\\" in m.group(1) else "",
text
).strip()
html_text = _IMG_PLACEHOLDER_RE.sub(
lambda m: m.group(0) if "/" in m.group(1) or "\\" in m.group(1) else "",
html_text
).strip()
if not text and not image_rel_paths and not html_text:
continue
try:
title, keywords = classify_section(client, sec.title, text)
except Exception as e:
log.warning(f" AI грешка за {code}: {e}")
title, keywords = sec.title or f"Секция {i}", ""
images_json = json.dumps(image_rel_paths, ensure_ascii=False)
ps = ProcessedSection(
code=code,
source_file=rel,
title=title,
keywords=keywords,
text=text,
images_json=images_json,
html_text=html_text,
)
# Записваме текста в изходна директория
out_path = output_dir / f"{code}.txt"
out_path.write_text(
f"КОД: {code}\nФАЙЛ: {rel}\nЗАГЛАВИЕ: {title}\nКЛЮЧОВИ ДУМИ: {keywords}\n"
f"КАРТИНКИ: {len(image_rel_paths)}\n"
f"{''*60}\n{text}",
encoding="utf-8"
)
db.insert_section(prefix, ps, str(out_path))
saved += 1
log.debug(f" {code}: {title[:60]} ({len(image_rel_paths)} img)")
db.upsert_file(prefix, rel, fh, saved)
log.info(f"{saved} секции записани")
return saved
_PREFIX_RE = re.compile(r"^[A-Za-z][A-Za-z0-9_]{0,49}$")
def process_directory(
input_dir: Path,
output_dir: Path,
conn_str: str,
api_key: str,
prefix: str = "HLP",
force: bool = False,
purge_missing: bool = False,
):
if not _PREFIX_RE.match(prefix):
raise ValueError(
f"Невалиден prefix {prefix!r}. Допустими: буква + букви/цифри/подчертавки, до 50 символа."
)
output_dir.mkdir(parents=True, exist_ok=True)
db = Database(conn_str)
client = anthropic.Anthropic(api_key=api_key)
extensions = set(PARSERS.keys())
output_resolved = output_dir.resolve()
def _under_output(p: Path) -> bool:
try:
p.resolve().relative_to(output_resolved)
return True
except ValueError:
return False
files = [
p for p in input_dir.rglob("*")
if p.is_file() and p.suffix.lower() in extensions and not _under_output(p)
]
log.info(f"Prefix={prefix} Намерени {len(files)} файла в {input_dir}")
current_paths = {str(p) for p in files}
total_sections = 0
try:
for idx, path in enumerate(sorted(files), 1):
n = process_file(path, idx, db, client, output_dir,
prefix=prefix, force=force)
total_sections += n
if purge_missing:
existing = set(db.all_source_files(prefix))
orphans = sorted(existing - current_paths)
if not orphans:
log.info(f"Purge: няма orphan записи в БД за prefix={prefix}.")
else:
log.info(f"Purge ({prefix}): намерени {len(orphans)} orphan източника:")
for o in orphans:
log.info(f" - {o}")
disk_paths = db.section_output_paths_for(prefix, orphans)
removed_files = 0
for op in disk_paths:
try:
opath = Path(op)
if opath.exists():
opath.unlink()
removed_files += 1
code = opath.stem
for img in (output_dir / "images").glob(f"{code}_*"):
try:
img.unlink()
removed_files += 1
except Exception:
pass
except Exception as e:
log.debug(f" не успях да изтрия {op}: {e}")
deleted = db.purge_sources(prefix, orphans)
log.info(f"Purge: изтрити {deleted} секции от БД, {removed_files} файла от диска.")
finally:
db.close()
log.info(f"Готово. Prefix={prefix}. Общо нови/обновени секции: {total_sections}")
# ──────────────────────────────────────────────
# CLI
# ──────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="Help-файл декомпозитор с SQL Server + Anthropic"
)
parser.add_argument("input_dir", help="Входна директория с help-файлове")
parser.add_argument("output_dir", help="Изходна директория за текстови секции")
parser.add_argument(
"--conn",
default=os.getenv("HELP_DB_CONN"),
help="SQL Server connection string (или HELP_DB_CONN env var)"
)
parser.add_argument(
"--api-key",
default=os.getenv("ANTHROPIC_API_KEY"),
help="Anthropic API ключ (или ANTHROPIC_API_KEY env var)"
)
parser.add_argument(
"--prefix",
default=os.getenv("HELP_PREFIX", "HLP"),
help="Префикс за кодовете/scope в БД (буква + букви/цифри/_, до 50 знака). "
"Default: 'HLP' (или env HELP_PREFIX)."
)
parser.add_argument(
"--force",
action="store_true",
help="Преобработва всички файлове, независимо от hash"
)
parser.add_argument(
"--purge-missing",
action="store_true",
help="След обработката изтрива от БД и диска секциите за източници, "
"които вече не съществуват във входната директория (само в дадения prefix)"
)
args = parser.parse_args()
if not args.api_key:
sys.exit("Грешка: липсва Anthropic API ключ (--api-key или ANTHROPIC_API_KEY).")
if not args.conn:
sys.exit("Грешка: липсва SQL Server connection string (--conn или HELP_DB_CONN).")
process_directory(
input_dir=Path(args.input_dir),
output_dir=Path(args.output_dir),
conn_str=args.conn,
api_key=args.api_key,
prefix=args.prefix,
force=args.force,
purge_missing=args.purge_missing,
)
if __name__ == "__main__":
main()