- help_processor.py: parses .docx/.html/.pdf/.doc/.txt, extracts images, classifies sections via Claude API, writes to SQL Server - generate_html.py: builds interactive HTML viewer (Home/Editor/Search/Generator) - save_keywords.py: applies keyword edits back to DB - Prefix-scoped DB schema (RIP_help_files, RIP_help_sections) so multiple projects share the same database without collision - BAT launchers per project (RIP_load.bat, INEX_TM_load.bat, ...) load credentials from gitignored .env via _load_env.bat - Rich HTML preservation for .html sources (html_text column) - Image extraction for all formats with MS Word / LibreOffice fallback for .doc Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1163 lines
44 KiB
Python
1163 lines
44 KiB
Python
"""
|
||
help_processor.py
|
||
=================
|
||
Обработва help-файлове (.doc, .docx, .html, .htm, .txt, .pdf),
|
||
декомпозира ги на смислови секции, извлича ключови думи чрез Anthropic API
|
||
и записва резултатите в SQL Server + изходна директория.
|
||
|
||
Поддържа инкрементална обработка: файлове, чийто hash не се е променил,
|
||
се прескачат при повторно пускане.
|
||
|
||
Изисквания (pip install):
|
||
pip install anthropic pyodbc python-docx beautifulsoup4 lxml
|
||
pip install pdfplumber striprtf chardet
|
||
pip install pywin32 # за MS Word fallback на Windows
|
||
|
||
За .doc (стар формат) е необходим един от:
|
||
- LibreOffice (soffice в PATH) — кросплатформено
|
||
- MS Word — Windows, чрез pywin32 COM (автоматичен fallback)
|
||
- antiword — Linux (apt install antiword)
|
||
"""
|
||
|
||
import os
|
||
import re
|
||
import sys
|
||
import json
|
||
import hashlib
|
||
import logging
|
||
import argparse
|
||
import subprocess
|
||
import tempfile
|
||
from pathlib import Path
|
||
from datetime import datetime
|
||
from dataclasses import dataclass, field
|
||
from typing import Optional
|
||
|
||
import pyodbc
|
||
import anthropic
|
||
from docx import Document
|
||
from bs4 import BeautifulSoup
|
||
|
||
try:
|
||
import pdfplumber
|
||
HAS_PDF = True
|
||
except ImportError:
|
||
HAS_PDF = False
|
||
|
||
try:
|
||
from PIL import Image
|
||
HAS_PIL = True
|
||
except ImportError:
|
||
HAS_PIL = False
|
||
|
||
# ──────────────────────────────────────────────
|
||
# Конфигурация
|
||
# ──────────────────────────────────────────────
|
||
|
||
# На Windows конзолата често е cp1251 → пренастройваме stdout на utf-8
|
||
try:
|
||
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
|
||
sys.stderr.reconfigure(encoding="utf-8", errors="replace")
|
||
except AttributeError:
|
||
pass
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s %(levelname)-8s %(message)s",
|
||
handlers=[
|
||
logging.StreamHandler(sys.stdout),
|
||
logging.FileHandler("help_processor.log", encoding="utf-8"),
|
||
],
|
||
)
|
||
log = logging.getLogger(__name__)
|
||
|
||
MIN_SECTION_TOKENS = 60 # секции под тази граница се сливат с предишната
|
||
MAX_AI_CHARS = 4000 # максимален текст, изпращан към Claude за класификация
|
||
AI_MODEL = "claude-sonnet-4-6"
|
||
MIN_IMAGE_PX = 50 # картинки под NxN px се пропускат (иконки/булети)
|
||
|
||
|
||
# ──────────────────────────────────────────────
|
||
# Изображения — помощни
|
||
# ──────────────────────────────────────────────
|
||
|
||
@dataclass
|
||
class ImageRef:
|
||
placeholder: str # вътрешен ID в текста, напр. "img_01"
|
||
data: bytes
|
||
ext: str # "png", "jpg", "gif"...
|
||
|
||
|
||
def _img_dimensions(data: bytes) -> Optional[tuple[int, int]]:
|
||
if not HAS_PIL:
|
||
return None
|
||
try:
|
||
from io import BytesIO
|
||
with Image.open(BytesIO(data)) as im:
|
||
return im.size
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _should_keep_image(data: bytes) -> bool:
|
||
"""Връща False за дребни иконки/булети под MIN_IMAGE_PX × MIN_IMAGE_PX."""
|
||
if not data:
|
||
return False
|
||
dims = _img_dimensions(data)
|
||
if dims is None:
|
||
# Не можем да преценим — пазим по подразбиране
|
||
return True
|
||
w, h = dims
|
||
return w >= MIN_IMAGE_PX and h >= MIN_IMAGE_PX
|
||
|
||
|
||
def _ext_from_content_type(ct: str) -> str:
|
||
ct = (ct or "").lower()
|
||
if "png" in ct: return "png"
|
||
if "jpeg" in ct or "jpg" in ct: return "jpg"
|
||
if "gif" in ct: return "gif"
|
||
if "bmp" in ct: return "bmp"
|
||
if "svg" in ct: return "svg"
|
||
if "webp" in ct: return "webp"
|
||
return "png"
|
||
|
||
|
||
_IMG_PLACEHOLDER_RE = re.compile(r"\[IMG:\s*([A-Za-z0-9_./\\-]+)\s*\]")
|
||
|
||
|
||
# ──────────────────────────────────────────────
|
||
# Структури
|
||
# ──────────────────────────────────────────────
|
||
|
||
@dataclass
|
||
class Section:
|
||
title: str
|
||
text: str
|
||
level: int = 1 # 1=H1, 2=H2, 3=H3, 0=без заглавие
|
||
images: list = field(default_factory=list) # list[ImageRef]
|
||
html_text: Optional[str] = None # rich HTML с [IMG: ...] placeholders
|
||
|
||
|
||
@dataclass
|
||
class ProcessedSection:
|
||
code: str # DOC_003_SEC_012
|
||
source_file: str
|
||
title: str
|
||
keywords: str # "кл1, кл2, кл3"
|
||
text: str
|
||
images_json: str = "[]" # JSON масив с относителни пътища
|
||
html_text: str = "" # rich HTML (само за HTML-source файлове)
|
||
char_count: int = 0
|
||
|
||
def __post_init__(self):
|
||
self.char_count = len(self.text)
|
||
|
||
|
||
# ──────────────────────────────────────────────
|
||
# База данни
|
||
# ──────────────────────────────────────────────
|
||
|
||
def _ensure_trust_server_certificate(conn_str: str) -> str:
|
||
"""Добавя TrustServerCertificate=yes към connection string ако липсва."""
|
||
if not conn_str:
|
||
return conn_str
|
||
if re.search(r"TrustServerCertificate\s*=", conn_str, re.IGNORECASE):
|
||
return conn_str
|
||
sep = "" if conn_str.rstrip().endswith(";") else ";"
|
||
return f"{conn_str}{sep}TrustServerCertificate=yes;"
|
||
|
||
|
||
class Database:
|
||
def __init__(self, conn_str: str):
|
||
self.conn_str = _ensure_trust_server_certificate(conn_str)
|
||
self.conn = pyodbc.connect(self.conn_str, autocommit=False)
|
||
self._ensure_schema()
|
||
|
||
def _ensure_schema(self):
|
||
"""Създава таблиците ако не съществуват."""
|
||
cur = self.conn.cursor()
|
||
cur.execute("""
|
||
IF NOT EXISTS (SELECT 1 FROM sys.tables WHERE name='RIP_help_files')
|
||
CREATE TABLE RIP_help_files (
|
||
id INT IDENTITY PRIMARY KEY,
|
||
prefix NVARCHAR(50) NOT NULL DEFAULT 'HLP',
|
||
file_path NVARCHAR(1000) NOT NULL,
|
||
file_hash CHAR(64) NOT NULL,
|
||
processed_at DATETIME2 NOT NULL DEFAULT GETDATE(),
|
||
section_count INT NOT NULL DEFAULT 0,
|
||
CONSTRAINT UQ_RIP_help_files_prefix_path UNIQUE (prefix, file_path)
|
||
)""")
|
||
# Migrate: добавяме колонка prefix ако таблицата е по-стара версия
|
||
cur.execute("""
|
||
IF NOT EXISTS (
|
||
SELECT 1 FROM sys.columns
|
||
WHERE object_id=OBJECT_ID('RIP_help_files') AND name='prefix'
|
||
)
|
||
BEGIN
|
||
ALTER TABLE RIP_help_files ADD prefix NVARCHAR(50) NOT NULL
|
||
CONSTRAINT DF_RIP_help_files_prefix DEFAULT 'HLP' WITH VALUES;
|
||
END
|
||
""")
|
||
# Migrate: ако има стара UNIQUE на file_path сама (без prefix), сваляме я
|
||
cur.execute("""
|
||
DECLARE @c NVARCHAR(200);
|
||
SELECT @c = i.name FROM sys.indexes i
|
||
WHERE i.object_id=OBJECT_ID('RIP_help_files')
|
||
AND i.is_unique=1
|
||
AND i.name <> 'UQ_RIP_help_files_prefix_path'
|
||
AND i.name NOT LIKE 'PK_%'
|
||
AND (SELECT COUNT(*) FROM sys.index_columns ic
|
||
WHERE ic.object_id=i.object_id AND ic.index_id=i.index_id) = 1;
|
||
IF @c IS NOT NULL EXEC('ALTER TABLE RIP_help_files DROP CONSTRAINT [' + @c + ']');
|
||
""")
|
||
# Migrate: създаваме новата composite UNIQUE ако липсва
|
||
cur.execute("""
|
||
IF NOT EXISTS (
|
||
SELECT 1 FROM sys.indexes
|
||
WHERE name='UQ_RIP_help_files_prefix_path'
|
||
AND object_id=OBJECT_ID('RIP_help_files')
|
||
)
|
||
ALTER TABLE RIP_help_files
|
||
ADD CONSTRAINT UQ_RIP_help_files_prefix_path UNIQUE (prefix, file_path)
|
||
""")
|
||
cur.execute("""
|
||
IF NOT EXISTS (SELECT 1 FROM sys.tables WHERE name='RIP_help_sections')
|
||
CREATE TABLE RIP_help_sections (
|
||
id INT IDENTITY PRIMARY KEY,
|
||
prefix NVARCHAR(50) NOT NULL DEFAULT 'HLP',
|
||
code NVARCHAR(80) NOT NULL UNIQUE,
|
||
source_file NVARCHAR(1000) NOT NULL,
|
||
title NVARCHAR(500),
|
||
keywords NVARCHAR(300),
|
||
char_count INT,
|
||
output_path NVARCHAR(1000),
|
||
images NVARCHAR(MAX),
|
||
created_at DATETIME2 NOT NULL DEFAULT GETDATE(),
|
||
updated_at DATETIME2 NOT NULL DEFAULT GETDATE()
|
||
)""")
|
||
# Migrate: добавяме колонка prefix ако таблицата е по-стара версия
|
||
cur.execute("""
|
||
IF NOT EXISTS (
|
||
SELECT 1 FROM sys.columns
|
||
WHERE object_id=OBJECT_ID('RIP_help_sections') AND name='prefix'
|
||
)
|
||
ALTER TABLE RIP_help_sections ADD prefix NVARCHAR(50) NOT NULL
|
||
CONSTRAINT DF_RIP_help_sections_prefix DEFAULT 'HLP' WITH VALUES
|
||
""")
|
||
# Migrate: добавяме колонка 'images' ако таблицата е създадена по-стара версия
|
||
cur.execute("""
|
||
IF NOT EXISTS (
|
||
SELECT 1 FROM sys.columns
|
||
WHERE object_id=OBJECT_ID('RIP_help_sections') AND name='images'
|
||
)
|
||
ALTER TABLE RIP_help_sections ADD images NVARCHAR(MAX) NULL
|
||
""")
|
||
# Migrate: добавяме колонка 'html_text' (rich HTML с форматиране)
|
||
cur.execute("""
|
||
IF NOT EXISTS (
|
||
SELECT 1 FROM sys.columns
|
||
WHERE object_id=OBJECT_ID('RIP_help_sections') AND name='html_text'
|
||
)
|
||
ALTER TABLE RIP_help_sections ADD html_text NVARCHAR(MAX) NULL
|
||
""")
|
||
# Индекси за търсене по ключови думи и заглавие
|
||
cur.execute("""
|
||
IF NOT EXISTS (
|
||
SELECT 1 FROM sys.indexes
|
||
WHERE name='IX_RIP_help_sections_keywords' AND object_id=OBJECT_ID('RIP_help_sections')
|
||
)
|
||
CREATE INDEX IX_RIP_help_sections_keywords ON RIP_help_sections(keywords)
|
||
""")
|
||
self.conn.commit()
|
||
log.info("Схемата е проверена / създадена.")
|
||
|
||
def get_file_hash(self, prefix: str, file_path: str) -> Optional[str]:
|
||
cur = self.conn.cursor()
|
||
cur.execute(
|
||
"SELECT file_hash FROM RIP_help_files WHERE prefix=? AND file_path=?",
|
||
prefix, file_path
|
||
)
|
||
row = cur.fetchone()
|
||
return row[0] if row else None
|
||
|
||
def upsert_file(self, prefix: str, file_path: str, file_hash: str, section_count: int):
|
||
cur = self.conn.cursor()
|
||
cur.execute("""
|
||
MERGE RIP_help_files AS t
|
||
USING (SELECT ? AS prefix, ? AS file_path, ? AS file_hash, ? AS section_count) AS s
|
||
ON t.prefix = s.prefix AND t.file_path = s.file_path
|
||
WHEN MATCHED THEN
|
||
UPDATE SET file_hash=s.file_hash, section_count=s.section_count,
|
||
processed_at=GETDATE()
|
||
WHEN NOT MATCHED THEN
|
||
INSERT (prefix, file_path, file_hash, section_count)
|
||
VALUES (s.prefix, s.file_path, s.file_hash, s.section_count);
|
||
""", prefix, file_path, file_hash, section_count)
|
||
self.conn.commit()
|
||
|
||
def delete_sections_for_file(self, prefix: str, file_path: str):
|
||
cur = self.conn.cursor()
|
||
cur.execute(
|
||
"DELETE FROM RIP_help_sections WHERE prefix=? AND source_file=?",
|
||
prefix, file_path
|
||
)
|
||
self.conn.commit()
|
||
|
||
def all_source_files(self, prefix: str) -> list[str]:
|
||
"""Връща всички source_file пътища за даден префикс."""
|
||
cur = self.conn.cursor()
|
||
cur.execute("""
|
||
SELECT file_path FROM RIP_help_files WHERE prefix=?
|
||
UNION
|
||
SELECT source_file FROM RIP_help_sections WHERE prefix=?
|
||
""", prefix, prefix)
|
||
return [r[0] for r in cur.fetchall()]
|
||
|
||
def section_output_paths_for(self, prefix: str, source_files: list[str]) -> list[str]:
|
||
if not source_files:
|
||
return []
|
||
cur = self.conn.cursor()
|
||
placeholders = ",".join("?" for _ in source_files)
|
||
cur.execute(
|
||
f"SELECT output_path FROM RIP_help_sections "
|
||
f"WHERE prefix=? AND source_file IN ({placeholders})",
|
||
prefix, *source_files
|
||
)
|
||
return [r[0] for r in cur.fetchall() if r[0]]
|
||
|
||
def purge_sources(self, prefix: str, source_files: list[str]) -> int:
|
||
if not source_files:
|
||
return 0
|
||
cur = self.conn.cursor()
|
||
placeholders = ",".join("?" for _ in source_files)
|
||
cur.execute(
|
||
f"DELETE FROM RIP_help_sections "
|
||
f"WHERE prefix=? AND source_file IN ({placeholders})",
|
||
prefix, *source_files
|
||
)
|
||
sec_deleted = cur.rowcount
|
||
cur.execute(
|
||
f"DELETE FROM RIP_help_files "
|
||
f"WHERE prefix=? AND file_path IN ({placeholders})",
|
||
prefix, *source_files
|
||
)
|
||
self.conn.commit()
|
||
return sec_deleted
|
||
|
||
def insert_section(self, prefix: str, ps: ProcessedSection, output_path: str):
|
||
cur = self.conn.cursor()
|
||
cur.execute("""
|
||
MERGE RIP_help_sections AS t
|
||
USING (SELECT ? AS code) AS s ON t.code = s.code
|
||
WHEN MATCHED THEN
|
||
UPDATE SET prefix=?, source_file=?, title=?, keywords=?,
|
||
char_count=?, output_path=?, images=?, html_text=?,
|
||
updated_at=GETDATE()
|
||
WHEN NOT MATCHED THEN
|
||
INSERT (prefix, code, source_file, title, keywords, char_count, output_path,
|
||
images, html_text)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?);
|
||
""",
|
||
ps.code, # USING
|
||
prefix, ps.source_file, ps.title, ps.keywords, # UPDATE SET
|
||
ps.char_count, output_path, ps.images_json, ps.html_text,
|
||
prefix, ps.code, ps.source_file, ps.title, ps.keywords, # INSERT
|
||
ps.char_count, output_path, ps.images_json, ps.html_text)
|
||
self.conn.commit()
|
||
|
||
def close(self):
|
||
self.conn.close()
|
||
|
||
|
||
# ──────────────────────────────────────────────
|
||
# Парсъри
|
||
# ──────────────────────────────────────────────
|
||
|
||
def file_hash(path: Path) -> str:
|
||
h = hashlib.sha256()
|
||
with open(path, "rb") as f:
|
||
for chunk in iter(lambda: f.read(65536), b""):
|
||
h.update(chunk)
|
||
return h.hexdigest()
|
||
|
||
|
||
def _load_html_image(src: str, base_dir: Path) -> Optional[tuple[bytes, str]]:
|
||
"""Връща (data, ext) или None. Пропуска HTTP/HTTPS."""
|
||
if not src:
|
||
return None
|
||
s = src.strip()
|
||
if s.startswith("data:"):
|
||
# data:image/png;base64,XXXX
|
||
m = re.match(r"data:([^;]+);base64,(.+)$", s, re.DOTALL)
|
||
if not m:
|
||
return None
|
||
import base64
|
||
try:
|
||
data = base64.b64decode(m.group(2))
|
||
except Exception:
|
||
return None
|
||
return data, _ext_from_content_type(m.group(1))
|
||
if s.startswith(("http://", "https://")):
|
||
return None # по правило пропускаме мрежови картинки
|
||
# локален път, относителен или абсолютен
|
||
p = (base_dir / s).resolve() if not Path(s).is_absolute() else Path(s)
|
||
try:
|
||
if p.is_file():
|
||
data = p.read_bytes()
|
||
ext = p.suffix.lstrip(".").lower() or "png"
|
||
return data, ext
|
||
except Exception:
|
||
return None
|
||
return None
|
||
|
||
|
||
def _detect_html_encoding(raw: bytes) -> str:
|
||
"""Връща име на encoding: BOM → chardet → fallback (utf-8 ако ASCII, иначе windows-1251)."""
|
||
# BOM-и
|
||
if raw.startswith(b"\xef\xbb\xbf"):
|
||
return "utf-8"
|
||
if raw.startswith((b"\xff\xfe", b"\xfe\xff")):
|
||
return "utf-16"
|
||
# chardet
|
||
try:
|
||
import chardet
|
||
det = chardet.detect(raw[:65536]) or {}
|
||
enc = (det.get("encoding") or "").lower()
|
||
conf = det.get("confidence", 0) or 0
|
||
if enc and conf >= 0.6:
|
||
# нормализиране на често срещани имена
|
||
if enc in ("cp1251", "ms-cyrl", "windows-1251"):
|
||
return "windows-1251"
|
||
if enc.startswith("utf"):
|
||
return enc
|
||
return enc
|
||
except Exception:
|
||
pass
|
||
# fallback: ако байтовете изглеждат "над 127" (т.е. има не-ASCII), приемаме CP1251
|
||
if any(b > 127 for b in raw[:8192]):
|
||
return "windows-1251"
|
||
return "utf-8"
|
||
|
||
|
||
_HTML_BLOCK_TAGS = ["h1", "h2", "h3", "h4", "h5", "h6",
|
||
"p", "ul", "ol", "table", "dl", "pre",
|
||
"blockquote", "figure", "hr"]
|
||
_HTML_DROP_ATTRS = ("class", "style", "id", "lang", "dir", "align",
|
||
"valign", "width", "height", "bgcolor", "border")
|
||
|
||
|
||
def _strip_attrs(el):
|
||
"""Премахва decorative атрибути (class, style, on*, data-*)."""
|
||
for t in el.find_all(True):
|
||
for a in list(t.attrs):
|
||
if a in _HTML_DROP_ATTRS or a.startswith("on") or a.startswith("data-"):
|
||
del t[a]
|
||
|
||
|
||
def _swap_imgs_in_block(el, base_dir: Path, sec_images: list, img_counter: list) -> None:
|
||
"""Намира всички <img> в подадения елемент, извлича данните и подменя с
|
||
NavigableString placeholder ([IMG: img_NN])."""
|
||
from bs4 import NavigableString
|
||
for img in el.find_all("img"):
|
||
src = img.get("src") or img.get("data-src") or ""
|
||
loaded = _load_html_image(src, base_dir)
|
||
if not loaded:
|
||
img.decompose()
|
||
continue
|
||
data, ext = loaded
|
||
if not _should_keep_image(data):
|
||
img.decompose()
|
||
continue
|
||
img_counter[0] += 1
|
||
ref = ImageRef(placeholder=f"img_{img_counter[0]:02d}", data=data, ext=ext)
|
||
sec_images.append(ref)
|
||
img.replace_with(NavigableString(f"[IMG: {ref.placeholder}]"))
|
||
|
||
|
||
def parse_html(path: Path) -> list[Section]:
|
||
raw = path.read_bytes()
|
||
enc = _detect_html_encoding(raw)
|
||
log.debug(f" {path.name} encoding: {enc}")
|
||
try:
|
||
soup = BeautifulSoup(raw, "lxml", from_encoding=enc)
|
||
except Exception:
|
||
soup = BeautifulSoup(raw, "lxml")
|
||
|
||
# Премахваме скриптове и стилове
|
||
for tag in soup(["script", "style", "nav", "footer", "header", "noscript"]):
|
||
tag.decompose()
|
||
|
||
base_dir = path.parent
|
||
body = soup.body or soup
|
||
|
||
heading_map = {"h1": 1, "h2": 2, "h3": 3, "h4": 3, "h5": 3, "h6": 3}
|
||
|
||
# Събираме top-level блокови елементи (без да включваме вложените в тях)
|
||
consumed = set()
|
||
blocks = []
|
||
for el in body.find_all(_HTML_BLOCK_TAGS + ["img"]):
|
||
if any(id(par) in consumed for par in el.parents):
|
||
continue
|
||
consumed.add(id(el))
|
||
blocks.append(el)
|
||
|
||
sections: list[Section] = []
|
||
current_title = ""
|
||
current_level = 1
|
||
sec_text: list[str] = []
|
||
sec_html: list[str] = []
|
||
sec_images: list[ImageRef] = []
|
||
img_counter = [0]
|
||
|
||
def flush():
|
||
if sec_text or sec_html or sec_images:
|
||
sec = Section(current_title, "\n".join(sec_text), current_level)
|
||
sec.images = list(sec_images)
|
||
sec.html_text = "\n".join(sec_html) if sec_html else None
|
||
sections.append(sec)
|
||
|
||
for el in blocks:
|
||
if el.name in heading_map:
|
||
txt = el.get_text(" ", strip=True)
|
||
if not txt:
|
||
continue
|
||
flush()
|
||
current_title = txt
|
||
current_level = heading_map[el.name]
|
||
sec_text, sec_html, sec_images = [], [], []
|
||
continue
|
||
|
||
if el.name == "img":
|
||
# самостоятелен <img> (не вътре в блок)
|
||
_swap_imgs_in_block(el.parent if el.parent and el.parent.name else el,
|
||
base_dir, sec_images, img_counter)
|
||
# ако е заменен с placeholder, добавяме като текст
|
||
txt = el.get_text(" ", strip=True) if el.name else ""
|
||
if txt:
|
||
sec_text.append(txt)
|
||
sec_html.append(f"<p>{txt}</p>")
|
||
continue
|
||
|
||
_swap_imgs_in_block(el, base_dir, sec_images, img_counter)
|
||
_strip_attrs(el)
|
||
txt = el.get_text(" ", strip=True)
|
||
if txt:
|
||
sec_text.append(txt)
|
||
try:
|
||
sec_html.append(str(el))
|
||
except Exception:
|
||
pass
|
||
|
||
flush()
|
||
|
||
if not sections:
|
||
plain = body.get_text(" ", strip=True)
|
||
return [Section("", plain, 0)]
|
||
return sections
|
||
|
||
|
||
def _extract_docx_paragraph_images(para, doc) -> list[ImageRef]:
|
||
"""Намира drawing-и в параграф; връща ImageRef-и за филтрираните по размер."""
|
||
from docx.oxml.ns import qn
|
||
imgs: list[ImageRef] = []
|
||
try:
|
||
blips = para._element.findall(".//" + qn("a:blip"))
|
||
except Exception:
|
||
return imgs
|
||
|
||
embed_attr = qn("r:embed")
|
||
for blip in blips:
|
||
rId = blip.get(embed_attr)
|
||
if not rId:
|
||
continue
|
||
try:
|
||
part = doc.part.related_parts[rId]
|
||
data = part.blob
|
||
ct = getattr(part, "content_type", "") or ""
|
||
except Exception:
|
||
continue
|
||
if not _should_keep_image(data):
|
||
continue
|
||
ext = _ext_from_content_type(ct)
|
||
imgs.append(ImageRef(placeholder=f"__IMG_{len(imgs)+1}__", data=data, ext=ext))
|
||
return imgs
|
||
|
||
|
||
def parse_docx(path: Path) -> list[Section]:
|
||
doc = Document(path)
|
||
sections: list[Section] = []
|
||
current_title, current_level = "", 1
|
||
buf: list[str] = []
|
||
sec_images: list[ImageRef] = []
|
||
img_counter = [0] # списък за nonlocal-стил мутация
|
||
|
||
HEADING_STYLES = {"heading 1": 1, "heading 2": 2, "heading 3": 3,
|
||
"title": 1, "subtitle": 2}
|
||
|
||
def flush():
|
||
if buf or sec_images:
|
||
sec = Section(current_title, "\n".join(buf), current_level)
|
||
sec.images = list(sec_images)
|
||
sections.append(sec)
|
||
|
||
for para in doc.paragraphs:
|
||
style_name = para.style.name.lower() if para.style else ""
|
||
text = para.text.strip()
|
||
para_imgs = _extract_docx_paragraph_images(para, doc)
|
||
|
||
if not text and not para_imgs:
|
||
continue
|
||
|
||
level = HEADING_STYLES.get(style_name)
|
||
is_bold_heading = bool(text and len(text) < 120 and not style_name.startswith("list")
|
||
and para.runs
|
||
and all(run.bold for run in para.runs if run.text.strip()))
|
||
|
||
if level or (is_bold_heading and not para_imgs):
|
||
flush()
|
||
buf, sec_images = [], []
|
||
current_title = text
|
||
current_level = level or 2
|
||
continue
|
||
|
||
if text:
|
||
buf.append(text)
|
||
for im in para_imgs:
|
||
img_counter[0] += 1
|
||
im.placeholder = f"img_{img_counter[0]:02d}"
|
||
sec_images.append(im)
|
||
buf.append(f"[IMG: {im.placeholder}]")
|
||
|
||
flush()
|
||
|
||
if not sections:
|
||
fallback_text = "\n".join(p.text for p in doc.paragraphs if p.text.strip())
|
||
return [Section("", fallback_text, 0)]
|
||
return sections
|
||
|
||
|
||
def _convert_doc_with_libreoffice(path: Path, out_dir: Path) -> Optional[Path]:
|
||
try:
|
||
subprocess.run(
|
||
["soffice", "--headless", "--convert-to", "docx",
|
||
"--outdir", str(out_dir), str(path)],
|
||
check=True, capture_output=True, timeout=60
|
||
)
|
||
except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired) as e:
|
||
log.debug(f"LibreOffice конверсия неуспешна: {e}")
|
||
return None
|
||
out = list(out_dir.glob("*.docx"))
|
||
return out[0] if out else None
|
||
|
||
|
||
def _convert_doc_with_word(path: Path, out_dir: Path) -> Optional[Path]:
|
||
"""Fallback: ползва MS Word през COM на Windows."""
|
||
try:
|
||
import win32com.client # noqa: F401
|
||
import pythoncom
|
||
except ImportError:
|
||
log.debug("pywin32 не е инсталиран — MS Word fallback недостъпен.")
|
||
return None
|
||
|
||
import win32com.client as wcc
|
||
pythoncom.CoInitialize()
|
||
word = None
|
||
doc = None
|
||
try:
|
||
word = wcc.DispatchEx("Word.Application")
|
||
word.Visible = False
|
||
word.DisplayAlerts = False
|
||
doc = word.Documents.Open(str(path.resolve()), ReadOnly=True)
|
||
out_path = out_dir / (path.stem + ".docx")
|
||
# FileFormat=16 → wdFormatXMLDocument (.docx)
|
||
doc.SaveAs2(str(out_path.resolve()), FileFormat=16)
|
||
return out_path if out_path.exists() else None
|
||
except Exception as e:
|
||
log.debug(f"MS Word конверсия неуспешна: {e}")
|
||
return None
|
||
finally:
|
||
try:
|
||
if doc is not None:
|
||
doc.Close(SaveChanges=False)
|
||
except Exception:
|
||
pass
|
||
try:
|
||
if word is not None:
|
||
word.Quit()
|
||
except Exception:
|
||
pass
|
||
pythoncom.CoUninitialize()
|
||
|
||
|
||
def parse_doc_old(path: Path) -> list[Section]:
|
||
"""Конвертира стар .doc до .docx чрез LibreOffice или MS Word, после парси."""
|
||
with tempfile.TemporaryDirectory() as tmp:
|
||
tmp_dir = Path(tmp)
|
||
|
||
converted = _convert_doc_with_libreoffice(path, tmp_dir)
|
||
engine = "LibreOffice"
|
||
|
||
if not converted:
|
||
converted = _convert_doc_with_word(path, tmp_dir)
|
||
engine = "MS Word"
|
||
|
||
if not converted:
|
||
log.warning(
|
||
f"Нито LibreOffice, нито MS Word успяха да конвертират {path.name}. "
|
||
f"Пробваме като текст."
|
||
)
|
||
return parse_txt(path)
|
||
|
||
log.info(f" {path.name} конвертиран чрез {engine}")
|
||
return parse_docx(converted)
|
||
|
||
|
||
def _render_pdf_image(page, img_info, resolution: int = 150) -> Optional[bytes]:
|
||
"""Кропва картинката от PDF страницата и я записва като PNG bytes."""
|
||
try:
|
||
x0 = float(img_info.get("x0", 0))
|
||
x1 = float(img_info.get("x1", 0))
|
||
top = float(img_info.get("top", img_info.get("y0", 0)))
|
||
bot = float(img_info.get("bottom", img_info.get("y1", 0)))
|
||
if x1 <= x0 or bot <= top:
|
||
return None
|
||
# ограничаваме до страницата (pdfplumber иначе хвърля)
|
||
x0 = max(0, x0); top = max(0, top)
|
||
x1 = min(page.width, x1); bot = min(page.height, bot)
|
||
if x1 - x0 < 1 or bot - top < 1:
|
||
return None
|
||
cropped = page.crop((x0, top, x1, bot))
|
||
pil = cropped.to_image(resolution=resolution).original
|
||
from io import BytesIO
|
||
buf = BytesIO()
|
||
pil.save(buf, format="PNG")
|
||
return buf.getvalue()
|
||
except Exception as e:
|
||
log.debug(f"PDF image render failed: {e}")
|
||
return None
|
||
|
||
|
||
def parse_pdf(path: Path) -> list[Section]:
|
||
if not HAS_PDF:
|
||
log.warning("pdfplumber не е инсталиран. PDF се прескача.")
|
||
return []
|
||
|
||
sections: list[Section] = []
|
||
current_title = ""
|
||
buf: list[str] = []
|
||
sec_images: list[ImageRef] = []
|
||
img_counter = [0]
|
||
prev_size = None
|
||
|
||
def flush():
|
||
if buf or sec_images:
|
||
sec = Section(current_title, "\n".join(buf), 2)
|
||
sec.images = list(sec_images)
|
||
sections.append(sec)
|
||
|
||
with pdfplumber.open(path) as pdf:
|
||
for page in pdf.pages:
|
||
# Картинките за страницата (сортирани по y отгоре надолу)
|
||
page_images = sorted(
|
||
page.images or [],
|
||
key=lambda im: float(im.get("top", im.get("y0", 0)))
|
||
)
|
||
img_queue = []
|
||
for im in page_images:
|
||
data = _render_pdf_image(page, im)
|
||
if not data or not _should_keep_image(data):
|
||
continue
|
||
img_queue.append((float(im.get("top", 0)), data))
|
||
|
||
words = page.extract_words(extra_attrs=["size"])
|
||
line_buf, line_size = [], None
|
||
|
||
def emit_images_before(y: float):
|
||
while img_queue and img_queue[0][0] <= y:
|
||
_, data = img_queue.pop(0)
|
||
img_counter[0] += 1
|
||
ref = ImageRef(placeholder=f"img_{img_counter[0]:02d}",
|
||
data=data, ext="png")
|
||
sec_images.append(ref)
|
||
buf.append(f"[IMG: {ref.placeholder}]")
|
||
|
||
for w in words:
|
||
sz = round(float(w.get("size", 10)), 1)
|
||
y = float(w.get("top", 0))
|
||
if line_size is None:
|
||
line_size = sz
|
||
if abs(sz - line_size) > 1:
|
||
line_text = " ".join(line_buf).strip()
|
||
if line_text:
|
||
if line_size > (prev_size or 10) + 1 and len(line_text) < 150:
|
||
flush()
|
||
buf, sec_images = [], []
|
||
current_title = line_text
|
||
else:
|
||
emit_images_before(y)
|
||
buf.append(line_text)
|
||
prev_size = line_size
|
||
line_buf, line_size = [w["text"]], sz
|
||
else:
|
||
line_buf.append(w["text"])
|
||
|
||
if line_buf:
|
||
emit_images_before(page.height)
|
||
buf.append(" ".join(line_buf))
|
||
|
||
# картинките след всичкия текст на страницата
|
||
emit_images_before(page.height + 1)
|
||
|
||
flush()
|
||
|
||
return sections or [Section("", "", 0)]
|
||
|
||
|
||
def parse_txt(path: Path) -> list[Section]:
|
||
import chardet
|
||
raw = path.read_bytes()
|
||
enc = chardet.detect(raw)["encoding"] or "utf-8"
|
||
text = raw.decode(enc, errors="replace")
|
||
return [Section("", text, 0)]
|
||
|
||
|
||
PARSERS = {
|
||
".html": parse_html,
|
||
".htm": parse_html,
|
||
".docx": parse_docx,
|
||
".doc": parse_doc_old,
|
||
".txt": parse_txt,
|
||
".pdf": parse_pdf,
|
||
}
|
||
|
||
|
||
# ──────────────────────────────────────────────
|
||
# Сегментиране и почистване
|
||
# ──────────────────────────────────────────────
|
||
|
||
def merge_short_sections(sections: list[Section]) -> list[Section]:
|
||
"""Слива секции, по-кратки от MIN_SECTION_TOKENS думи, с предишната."""
|
||
result: list[Section] = []
|
||
for sec in sections:
|
||
words = len(sec.text.split())
|
||
if result and words < MIN_SECTION_TOKENS:
|
||
prev = result[-1]
|
||
merged = Section(
|
||
prev.title,
|
||
prev.text + "\n" + sec.text,
|
||
prev.level,
|
||
)
|
||
merged.images = (prev.images or []) + (sec.images or [])
|
||
html_parts = [h for h in (prev.html_text, sec.html_text) if h]
|
||
merged.html_text = "\n".join(html_parts) if html_parts else None
|
||
result[-1] = merged
|
||
else:
|
||
result.append(sec)
|
||
return result
|
||
|
||
|
||
def clean_text(text: str) -> str:
|
||
text = re.sub(r"\s+", " ", text)
|
||
text = re.sub(r" {2,}", " ", text)
|
||
return text.strip()
|
||
|
||
|
||
# ──────────────────────────────────────────────
|
||
# AI класификация
|
||
# ──────────────────────────────────────────────
|
||
|
||
def classify_section(client: anthropic.Anthropic, title: str, text: str) -> tuple[str, str]:
|
||
"""Връща (наименование, 'кл1, кл2, кл3') чрез Claude."""
|
||
snippet = text[:MAX_AI_CHARS]
|
||
prompt = f"""Анализирай следната секция от help-документация и върни JSON обект с два ключа:
|
||
- "title": кратко наименование на секцията (до 8 думи, на езика на текста)
|
||
- "keywords": списък от до 5 ключови думи/фрази, разделени със запетая (на езика на текста)
|
||
|
||
Съществуващо заглавие (може да е празно): {title!r}
|
||
|
||
Текст:
|
||
{snippet}
|
||
|
||
Върни САМО валиден JSON без markdown, без коментари."""
|
||
|
||
msg = client.messages.create(
|
||
model=AI_MODEL,
|
||
max_tokens=200,
|
||
messages=[{"role": "user", "content": prompt}]
|
||
)
|
||
raw = msg.content[0].text.strip()
|
||
raw = re.sub(r"^```[a-z]*\n?", "", raw)
|
||
raw = re.sub(r"\n?```$", "", raw)
|
||
|
||
try:
|
||
data = json.loads(raw)
|
||
t = str(data.get("title", title or "Секция"))[:200]
|
||
k = str(data.get("keywords", ""))[:300]
|
||
return t, k
|
||
except json.JSONDecodeError:
|
||
log.warning(f"AI върна невалиден JSON: {raw[:120]}")
|
||
return title or "Секция", ""
|
||
|
||
|
||
# ──────────────────────────────────────────────
|
||
# Генериране на кодове
|
||
# ──────────────────────────────────────────────
|
||
|
||
def make_code(prefix: str, file_index: int, sec_index: int) -> str:
|
||
return f"{prefix}_{file_index:04d}_SEC_{sec_index:04d}"
|
||
|
||
|
||
# ──────────────────────────────────────────────
|
||
# Основна обработка
|
||
# ──────────────────────────────────────────────
|
||
|
||
def process_file(
|
||
path: Path,
|
||
file_index: int,
|
||
db: Database,
|
||
client: anthropic.Anthropic,
|
||
output_dir: Path,
|
||
prefix: str = "HLP",
|
||
force: bool = False,
|
||
) -> int:
|
||
"""Обработва един файл. Връща броя записани секции (0 = пропуснат)."""
|
||
rel = str(path)
|
||
fh = file_hash(path)
|
||
|
||
if not force:
|
||
stored = db.get_file_hash(prefix, rel)
|
||
if stored == fh:
|
||
log.info(f" [SKIP] {path.name} (непроменен)")
|
||
return 0
|
||
|
||
log.info(f" [PROC] {path.name}")
|
||
ext = path.suffix.lower()
|
||
parser = PARSERS.get(ext)
|
||
if not parser:
|
||
log.warning(f" Неподдържан формат: {ext}")
|
||
return 0
|
||
|
||
try:
|
||
sections = parser(path)
|
||
except Exception as e:
|
||
log.error(f" Грешка при парсване: {e}")
|
||
return 0
|
||
|
||
sections = merge_short_sections(sections)
|
||
|
||
# Изтриваме старите секции за файла при повторна обработка
|
||
db.delete_sections_for_file(prefix, rel)
|
||
|
||
images_dir = output_dir / "images"
|
||
images_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
saved = 0
|
||
for i, sec in enumerate(sections, 1):
|
||
text = clean_text(sec.text)
|
||
html_text = sec.html_text or ""
|
||
if not text and not sec.images and not html_text:
|
||
continue
|
||
|
||
code = make_code(prefix, file_index, i)
|
||
|
||
# Записваме картинките на диск и заменяме placeholder-ите в текста + HTML
|
||
image_rel_paths: list[str] = []
|
||
for ref in sec.images or []:
|
||
fname = f"{code}_{ref.placeholder}.{ref.ext}"
|
||
disk_path = images_dir / fname
|
||
try:
|
||
disk_path.write_bytes(ref.data)
|
||
except Exception as e:
|
||
log.warning(f" Грешка при запис на картинка {fname}: {e}")
|
||
continue
|
||
rel_path = f"images/{fname}"
|
||
image_rel_paths.append(rel_path)
|
||
old_ph = f"[IMG: {ref.placeholder}]"
|
||
new_ph = f"[IMG: {rel_path}]"
|
||
text = text.replace(old_ph, new_ph)
|
||
html_text = html_text.replace(old_ph, new_ph)
|
||
|
||
# Премахваме placeholder-и, останали без файл
|
||
text = _IMG_PLACEHOLDER_RE.sub(
|
||
lambda m: m.group(0) if "/" in m.group(1) or "\\" in m.group(1) else "",
|
||
text
|
||
).strip()
|
||
html_text = _IMG_PLACEHOLDER_RE.sub(
|
||
lambda m: m.group(0) if "/" in m.group(1) or "\\" in m.group(1) else "",
|
||
html_text
|
||
).strip()
|
||
if not text and not image_rel_paths and not html_text:
|
||
continue
|
||
|
||
try:
|
||
title, keywords = classify_section(client, sec.title, text)
|
||
except Exception as e:
|
||
log.warning(f" AI грешка за {code}: {e}")
|
||
title, keywords = sec.title or f"Секция {i}", ""
|
||
|
||
images_json = json.dumps(image_rel_paths, ensure_ascii=False)
|
||
ps = ProcessedSection(
|
||
code=code,
|
||
source_file=rel,
|
||
title=title,
|
||
keywords=keywords,
|
||
text=text,
|
||
images_json=images_json,
|
||
html_text=html_text,
|
||
)
|
||
|
||
# Записваме текста в изходна директория
|
||
out_path = output_dir / f"{code}.txt"
|
||
out_path.write_text(
|
||
f"КОД: {code}\nФАЙЛ: {rel}\nЗАГЛАВИЕ: {title}\nКЛЮЧОВИ ДУМИ: {keywords}\n"
|
||
f"КАРТИНКИ: {len(image_rel_paths)}\n"
|
||
f"{'─'*60}\n{text}",
|
||
encoding="utf-8"
|
||
)
|
||
|
||
db.insert_section(prefix, ps, str(out_path))
|
||
saved += 1
|
||
log.debug(f" {code}: {title[:60]} ({len(image_rel_paths)} img)")
|
||
|
||
db.upsert_file(prefix, rel, fh, saved)
|
||
log.info(f" → {saved} секции записани")
|
||
return saved
|
||
|
||
|
||
_PREFIX_RE = re.compile(r"^[A-Za-z][A-Za-z0-9_]{0,49}$")
|
||
|
||
|
||
def process_directory(
|
||
input_dir: Path,
|
||
output_dir: Path,
|
||
conn_str: str,
|
||
api_key: str,
|
||
prefix: str = "HLP",
|
||
force: bool = False,
|
||
purge_missing: bool = False,
|
||
):
|
||
if not _PREFIX_RE.match(prefix):
|
||
raise ValueError(
|
||
f"Невалиден prefix {prefix!r}. Допустими: буква + букви/цифри/подчертавки, до 50 символа."
|
||
)
|
||
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
db = Database(conn_str)
|
||
client = anthropic.Anthropic(api_key=api_key)
|
||
|
||
extensions = set(PARSERS.keys())
|
||
output_resolved = output_dir.resolve()
|
||
|
||
def _under_output(p: Path) -> bool:
|
||
try:
|
||
p.resolve().relative_to(output_resolved)
|
||
return True
|
||
except ValueError:
|
||
return False
|
||
|
||
files = [
|
||
p for p in input_dir.rglob("*")
|
||
if p.is_file() and p.suffix.lower() in extensions and not _under_output(p)
|
||
]
|
||
log.info(f"Prefix={prefix} Намерени {len(files)} файла в {input_dir}")
|
||
|
||
current_paths = {str(p) for p in files}
|
||
total_sections = 0
|
||
try:
|
||
for idx, path in enumerate(sorted(files), 1):
|
||
n = process_file(path, idx, db, client, output_dir,
|
||
prefix=prefix, force=force)
|
||
total_sections += n
|
||
|
||
if purge_missing:
|
||
existing = set(db.all_source_files(prefix))
|
||
orphans = sorted(existing - current_paths)
|
||
if not orphans:
|
||
log.info(f"Purge: няма orphan записи в БД за prefix={prefix}.")
|
||
else:
|
||
log.info(f"Purge ({prefix}): намерени {len(orphans)} orphan източника:")
|
||
for o in orphans:
|
||
log.info(f" - {o}")
|
||
disk_paths = db.section_output_paths_for(prefix, orphans)
|
||
removed_files = 0
|
||
for op in disk_paths:
|
||
try:
|
||
opath = Path(op)
|
||
if opath.exists():
|
||
opath.unlink()
|
||
removed_files += 1
|
||
code = opath.stem
|
||
for img in (output_dir / "images").glob(f"{code}_*"):
|
||
try:
|
||
img.unlink()
|
||
removed_files += 1
|
||
except Exception:
|
||
pass
|
||
except Exception as e:
|
||
log.debug(f" не успях да изтрия {op}: {e}")
|
||
deleted = db.purge_sources(prefix, orphans)
|
||
log.info(f"Purge: изтрити {deleted} секции от БД, {removed_files} файла от диска.")
|
||
finally:
|
||
db.close()
|
||
|
||
log.info(f"Готово. Prefix={prefix}. Общо нови/обновени секции: {total_sections}")
|
||
|
||
|
||
# ──────────────────────────────────────────────
|
||
# CLI
|
||
# ──────────────────────────────────────────────
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(
|
||
description="Help-файл декомпозитор с SQL Server + Anthropic"
|
||
)
|
||
parser.add_argument("input_dir", help="Входна директория с help-файлове")
|
||
parser.add_argument("output_dir", help="Изходна директория за текстови секции")
|
||
parser.add_argument(
|
||
"--conn",
|
||
default=os.getenv("HELP_DB_CONN"),
|
||
help="SQL Server connection string (или HELP_DB_CONN env var)"
|
||
)
|
||
parser.add_argument(
|
||
"--api-key",
|
||
default=os.getenv("ANTHROPIC_API_KEY"),
|
||
help="Anthropic API ключ (или ANTHROPIC_API_KEY env var)"
|
||
)
|
||
parser.add_argument(
|
||
"--prefix",
|
||
default=os.getenv("HELP_PREFIX", "HLP"),
|
||
help="Префикс за кодовете/scope в БД (буква + букви/цифри/_, до 50 знака). "
|
||
"Default: 'HLP' (или env HELP_PREFIX)."
|
||
)
|
||
parser.add_argument(
|
||
"--force",
|
||
action="store_true",
|
||
help="Преобработва всички файлове, независимо от hash"
|
||
)
|
||
parser.add_argument(
|
||
"--purge-missing",
|
||
action="store_true",
|
||
help="След обработката изтрива от БД и диска секциите за източници, "
|
||
"които вече не съществуват във входната директория (само в дадения prefix)"
|
||
)
|
||
args = parser.parse_args()
|
||
|
||
if not args.api_key:
|
||
sys.exit("Грешка: липсва Anthropic API ключ (--api-key или ANTHROPIC_API_KEY).")
|
||
if not args.conn:
|
||
sys.exit("Грешка: липсва SQL Server connection string (--conn или HELP_DB_CONN).")
|
||
|
||
process_directory(
|
||
input_dir=Path(args.input_dir),
|
||
output_dir=Path(args.output_dir),
|
||
conn_str=args.conn,
|
||
api_key=args.api_key,
|
||
prefix=args.prefix,
|
||
force=args.force,
|
||
purge_missing=args.purge_missing,
|
||
)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|