You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

528 lines
19 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
"""ITD - общ модул за връзка и зареждане на данни от базата."""
import json
import os
from datetime import datetime, timezone
try:
import pyodbc
except ImportError:
pyodbc = None
_LAST_APPSETTINGS_PATH = None
_LAST_CONNECTION_STRING_REDACTED = None
def get_last_appsettings_path():
"""Връща пълния път към последно намерения appsettings.json (или None)."""
return _LAST_APPSETTINGS_PATH
def get_last_connection_string_redacted():
"""Връща последно използвания connection string с маскирани пароли (или None)."""
return _LAST_CONNECTION_STRING_REDACTED
def _redact_conn_str(s: str) -> str:
if not s:
return s
redacted_parts = []
for part in str(s).split(";"):
p = part.strip()
if not p:
continue
if "=" not in p:
redacted_parts.append(p)
continue
k, v = p.split("=", 1)
kl = k.strip().lower()
if kl in ("pwd", "password"):
redacted_parts.append(f"{k}=***")
else:
redacted_parts.append(f"{k}={v}")
return ";".join(redacted_parts) + ";"
def load_connection_string():
"""Зарежда connection string от appsettings.json."""
import sys
global _LAST_APPSETTINGS_PATH
global _LAST_CONNECTION_STRING_REDACTED
paths = []
if getattr(sys, "frozen", False):
paths.append(os.path.join(os.path.dirname(sys.executable), "appsettings.json"))
paths.extend(("appsettings.json", os.path.join(os.path.dirname(__file__), "appsettings.json")))
for path in paths:
if os.path.isfile(path):
_LAST_APPSETTINGS_PATH = os.path.abspath(path)
with open(path, "r", encoding="utf-8") as f:
cfg = json.load(f)
cs = cfg.get("ConnectionStrings", {})
sql = cs.get("SqlServer", "")
if sql:
parts = {}
for part in sql.split(";"):
part = part.strip()
if not part or "=" not in part:
continue
k, v = part.split("=", 1)
parts[k.strip().lower()] = v.strip()
driver = "ODBC Driver 17 for SQL Server"
conn_str = (
f"DRIVER={{{driver}}};"
f"SERVER={parts.get('server', '')};"
f"DATABASE={parts.get('database', '')};"
f"UID={parts.get('user id', '')};"
f"PWD={parts.get('password', '')};"
+ ("TrustServerCertificate=yes;" if parts.get("trustservercertificate", "").lower() == "true" else "")
)
_LAST_CONNECTION_STRING_REDACTED = _redact_conn_str(conn_str)
return conn_str
if cs.get("Odbc"):
conn_str = cs["Odbc"]
_LAST_CONNECTION_STRING_REDACTED = _redact_conn_str(conn_str)
return conn_str
raise ValueError("В appsettings.json липсва ConnectionStrings:SqlServer или Odbc")
tried = [os.path.abspath(p) for p in paths]
raise FileNotFoundError("Не е намерен appsettings.json. Пробвани пътища:\n- " + "\n- ".join(tried))
def get_connection():
"""Отваря връзка към SQL Server (autocommit)."""
if pyodbc is None:
raise RuntimeError("Инсталирайте pyodbc: pip install pyodbc")
conn = pyodbc.connect(load_connection_string())
conn.autocommit = True
return conn
def _parse_iso(s):
if s is None:
return None
if hasattr(s, "isoformat"):
return s
try:
return datetime.fromisoformat(str(s).replace("Z", "+00:00"))
except Exception:
return None
def _utc_to_local(dt):
"""
Преобразува datetime от БД (съхранен като UTC чрез SYSUTCDATETIME()) в локално време за показване.
Така няма разминаване с 2 часа (UTC+2) при потребители в България.
"""
if dt is None:
return None
if hasattr(dt, "tzinfo") and dt.tzinfo is not None:
return dt.astimezone().replace(tzinfo=None)
# naive datetime от pyodbc = считаме за UTC
return dt.replace(tzinfo=timezone.utc).astimezone().replace(tzinfo=None)
def utc_to_local_for_display(dt):
"""Публична функция за показване на дата от БД в локално време (за main.py и др.)."""
return _utc_to_local(dt)
def _format_duration(minutes):
if minutes is None:
return ""
if minutes < 60:
return f"{int(minutes)}м"
h, m = divmod(int(minutes), 60)
return f"{h}ч {m}м"
def load_cycles(conn, limit=200):
"""
Зарежда последните цикли от ITD_Cycles.
Всеки ред = един цикъл с 5 полета за време (Post1At..Post5At) + ServiceClosed.
"""
cursor = conn.cursor()
cursor.execute(
"""
SELECT y.Id, y.CardId, y.CycleNo, y.Post1At, y.Post2At, y.Post3At, y.Post4At, y.Post5At,
y.ServiceClosed, y.ServiceClosedAt,
c.Code, c.DisplayText, c.IsActive, c.UpdatedAt
FROM dbo.ITD_Cycles y
JOIN dbo.ITD_Cards c ON c.Id = y.CardId
ORDER BY y.Post1At DESC
"""
)
rows = cursor.fetchall()
result = []
for r in rows:
cycle_id = r.Id
card_id = r.CardId
t1 = _utc_to_local(_parse_iso(r.Post1At))
t2 = _utc_to_local(_parse_iso(r.Post2At))
t3 = _utc_to_local(_parse_iso(r.Post3At))
t4 = _utc_to_local(_parse_iso(r.Post4At))
t5 = _utc_to_local(_parse_iso(r.Post5At))
service_closed = bool(getattr(r, "ServiceClosed", False))
service_closed_at = getattr(r, "ServiceClosedAt", None)
manual_dt = _utc_to_local(_parse_iso(service_closed_at)) if service_closed_at else None
def _fmt_dt(dt):
return dt.strftime("%d.%m %H:%M") if dt else ""
post_times = {
1: _fmt_dt(t1),
2: _fmt_dt(t2),
3: _fmt_dt(t3),
4: _fmt_dt(t4),
5: _fmt_dt(t5),
}
post_durations = {1: "", 2: "", 3: "", 4: "", 5: ""}
if t1 and t2:
post_durations[1] = _format_duration((t2 - t1).total_seconds() / 60)
if t2 and t3:
post_durations[2] = _format_duration((t3 - t2).total_seconds() / 60)
if t3 and t4:
post_durations[3] = _format_duration((t4 - t3).total_seconds() / 60)
if t4 and t5:
post_durations[4] = _format_duration((t5 - t4).total_seconds() / 60)
reg_dt = t2 or t1
reg_str = reg_dt.strftime("%d.%m.%Y %H:%M") if reg_dt else ""
if t1 and t2:
time_reg_entry = _format_duration((t2 - t1).total_seconds() / 60)
else:
time_reg_entry = ""
parts = []
if t2 and t3:
parts.append(f"2→3: {_format_duration((t3 - t2).total_seconds() / 60)}")
if t3 and t4:
parts.append(f"3→4: {_format_duration((t4 - t3).total_seconds() / 60)}")
if t4 and t5:
parts.append(f"4→5: {_format_duration((t5 - t4).total_seconds() / 60)}")
time_between = " | ".join(parts) if parts else ""
exit_dt = t5 or manual_dt
service_exit = bool(service_closed and not t5)
if not exit_dt:
exit_str = ""
elif service_closed and not t5:
# Служебно затваряне запазваме сегашния текст (дата/час)
exit_str = manual_dt.strftime("%d.%m.%Y %H:%M") if manual_dt else "Служебен изход"
if manual_dt:
exit_str = f"Служебен изход - {exit_str}"
else:
# Нормално напускане (има пост 5) показваме общ престой от пост 1 до пост 5
if t1 and t5:
total_mins = (t5 - t1).total_seconds() / 60
exit_str = _format_duration(total_mins)
else:
exit_str = exit_dt.strftime("%d.%m.%Y %H:%M")
can_close = t5 is None and not service_closed
cycle_started_raw = r.Post1At
cycle_started_norm = _normalize_cycle_started_at(cycle_started_raw)
result.append(
{
"cycle_id": cycle_id,
"code": getattr(r, "Code", None) or "",
"display_text": r.DisplayText,
"reg_datetime": reg_str,
"time_reg_entry": time_reg_entry,
"time_between_posts": time_between,
"exit_datetime": exit_str,
"post1": f"{post_times[1]} {post_durations[1]}",
"post2": f"{post_times[2]} {post_durations[2]}",
"post3": f"{post_times[3]} {post_durations[3]}",
"post4": f"{post_times[4]} {post_durations[4]}",
"post5": f"{post_times[5]} {post_durations[5]}",
"service_exit": service_exit,
"can_close": can_close,
"card_id": card_id,
"is_active": bool(getattr(r, "IsActive", 1)),
"card_updated_at": getattr(r, "UpdatedAt", None),
"cycle_started_at": cycle_started_raw,
"cycle_started_at_normalized": cycle_started_norm,
}
)
return result[:limit]
def _normalize_cycle_started_at(cycle_started_at):
"""Нормализира датата за цикъл до naive UTC без микросекунди един и същ ключ при запис, търсене и показване."""
if cycle_started_at is None:
return None
if hasattr(cycle_started_at, "year"):
dt = cycle_started_at
else:
dt = _parse_iso(cycle_started_at)
if dt is None:
return None
try:
if getattr(dt, "tzinfo", None):
dt = dt.astimezone(timezone.utc)
return dt.replace(microsecond=0, tzinfo=None)
except Exception:
try:
return dt.replace(microsecond=0, tzinfo=None)
except Exception:
return dt
def close_cycle_manually(conn, cycle_id):
"""Служебно затваряне на цикъл: маркира този цикъл в ITD_Cycles (ServiceClosed=1)."""
try:
cycle_id = int(cycle_id)
except (TypeError, ValueError):
raise ValueError("Невалиден идентификатор на цикъл.")
cursor = conn.cursor()
cursor.execute(
"""
UPDATE dbo.ITD_Cycles
SET ServiceClosed = 1, ServiceClosedAt = SYSUTCDATETIME()
WHERE Id = ? AND ServiceClosed = 0
""",
(cycle_id,),
)
return cursor.rowcount > 0
def undo_manual_cycle_close(conn, cycle_id):
"""Отмяна на служебно затваряне: нулира ServiceClosed за този цикъл в ITD_Cycles."""
try:
cycle_id = int(cycle_id)
except (TypeError, ValueError):
raise ValueError("Невалиден идентификатор на цикъл.")
cursor = conn.cursor()
cursor.execute(
"""
UPDATE dbo.ITD_Cycles
SET ServiceClosed = 0, ServiceClosedAt = NULL
WHERE Id = ?
""",
(cycle_id,),
)
return cursor.rowcount > 0
def get_card_by_code(conn, code):
"""Търси карта по Code (QR стойност). Връща None или dict с Id, Code, DisplayText, IsActive."""
if not code or not str(code).strip():
return None
cursor = conn.cursor()
cursor.execute(
"SELECT Id, Code, DisplayText, IsActive FROM dbo.ITD_Cards WHERE Code = ? AND IsActive = 1",
(str(code).strip(),),
)
row = cursor.fetchone()
if not row:
return None
return {"id": row.Id, "code": row.Code, "display_text": row.DisplayText, "is_active": bool(row.IsActive)}
def add_registration(conn, card_id, post_index):
"""Добавя регистрация за карта на даден пост (15). Пост 1 = нов ред в ITD_Cycles; постове 25 = UPDATE на текущия цикъл."""
if post_index == 1 and has_open_cycle(conn, card_id):
raise ValueError("Вече сте влезли. Моля, излезте преди нова регистрация.")
cursor = conn.cursor()
if post_index == 1:
cursor.execute(
"""
INSERT INTO dbo.ITD_Cycles (CardId, CycleNo, Post1At, Post2At, Post3At, Post4At, Post5At, ServiceClosed)
VALUES (?, (SELECT ISNULL(MAX(CycleNo), 0) + 1 FROM dbo.ITD_Cycles WHERE CardId = ?), SYSUTCDATETIME(), NULL, NULL, NULL, NULL, 0)
""",
(card_id, card_id),
)
else:
cursor.execute(
"""
SELECT TOP 1 Id FROM dbo.ITD_Cycles
WHERE CardId = ? AND Post5At IS NULL AND ServiceClosed = 0
ORDER BY Post1At DESC
""",
(card_id,),
)
row = cursor.fetchone()
if not row:
raise ValueError("Няма отворен цикъл за тази карта.")
col = f"Post{post_index}At"
cursor.execute(f"UPDATE dbo.ITD_Cycles SET {col} = SYSUTCDATETIME() WHERE Id = ?", (row.Id,))
def set_card_active(conn, card_id, is_active):
"""Активира или деактивира карта. При деактивирана карта не може да се влиза (регистрира)."""
cursor = conn.cursor()
cursor.execute(
"UPDATE dbo.ITD_Cards SET IsActive = ?, UpdatedAt = SYSUTCDATETIME() WHERE Id = ?",
(1 if is_active else 0, card_id),
)
def get_last_post_index(conn, card_id):
"""Връща последния попълнен пост (15) за дадена карта от последния цикъл, или None ако няма цикли."""
cursor = conn.cursor()
cursor.execute(
"""
SELECT TOP 1 Post1At, Post2At, Post3At, Post4At, Post5At, ServiceClosed
FROM dbo.ITD_Cycles WHERE CardId = ? ORDER BY Post1At DESC
""",
(card_id,),
)
row = cursor.fetchone()
if not row:
return None
if row.Post5At or getattr(row, "ServiceClosed", False):
return 5
if row.Post4At:
return 4
if row.Post3At:
return 3
if row.Post2At:
return 2
return 1
def has_open_cycle(conn, card_id):
"""True ако картата има отворен цикъл (ред в ITD_Cycles с Post5At IS NULL и ServiceClosed = 0)."""
cursor = conn.cursor()
cursor.execute(
"""
SELECT 1 FROM dbo.ITD_Cycles
WHERE CardId = ? AND Post5At IS NULL AND ServiceClosed = 0
""",
(card_id,),
)
return cursor.fetchone() is not None
def get_next_post_index(conn, card_id):
"""Връща следващия пост за регистрация (15). След служебно затваряне или пост 5 следва пост 1."""
cursor = conn.cursor()
cursor.execute(
"""
SELECT TOP 1 Post2At, Post3At, Post4At, Post5At, ServiceClosed
FROM dbo.ITD_Cycles WHERE CardId = ? ORDER BY Post1At DESC
""",
(card_id,),
)
row = cursor.fetchone()
if not row:
return 1
if getattr(row, "ServiceClosed", False) or row.Post5At:
return 1
if not row.Post2At:
return 2
if not row.Post3At:
return 3
if not row.Post4At:
return 4
return 5
def load_cards(conn):
"""Връща списък с всички карти за административния модул."""
cursor = conn.cursor()
cursor.execute(
"""
SELECT Id, Code, DisplayText, IsActive, CreatedAt, UpdatedAt
FROM dbo.ITD_Cards
ORDER BY DisplayText
"""
)
rows = cursor.fetchall()
result = []
for r in rows:
result.append(
{
"id": r.Id,
"code": r.Code,
"display_text": r.DisplayText,
"is_active": bool(r.IsActive),
"created_at": r.CreatedAt,
"updated_at": r.UpdatedAt,
}
)
return result
def delete_parking_only_cycles_for_card(conn, card_code):
"""
Изтрива всички цикли „само паркинг“ (само Post1At е попълнен) за карта с даден Code.
Връща (брой изтрити цикли, 0) за съвместимост със стария API.
"""
card_code = str(card_code).strip()
if not card_code:
raise ValueError("Кодът на картата е задължителен.")
cursor = conn.cursor()
cursor.execute("SELECT Id FROM dbo.ITD_Cards WHERE Code = ?", (card_code,))
row = cursor.fetchone()
if not row:
return (0, 0)
card_id = int(row.Id)
cursor.execute(
"""
DELETE FROM dbo.ITD_Cycles
WHERE CardId = ? AND Post2At IS NULL AND Post3At IS NULL AND Post4At IS NULL AND Post5At IS NULL
""",
(card_id,),
)
return (cursor.rowcount, 0)
def create_card(conn, display_text, is_active=True):
"""Създава нова карта. Кодът се задава служебно равен на Id. Връща Id на картата."""
display_text = str(display_text).strip()
if not display_text:
raise ValueError("Текстът на картата е задължителен.")
cursor = conn.cursor()
# Временен уникален код (NEWID), след което Code се обновява на Id
cursor.execute(
"""
INSERT INTO dbo.ITD_Cards (Code, DisplayText, IsActive)
OUTPUT INSERTED.Id
VALUES (CONVERT(NVARCHAR(36), NEWID()), ?, ?)
""",
(display_text, 1 if is_active else 0),
)
row = cursor.fetchone()
new_id = row[0] if row else None
if new_id is None:
raise RuntimeError("Картата е записана, но не може да се прочете новото Id.")
new_id = int(new_id)
cursor.execute("UPDATE dbo.ITD_Cards SET Code = ? WHERE Id = ?", (str(new_id), new_id))
return new_id
def update_card(conn, card_id, display_text):
"""Обновява текста на карта (DisplayText). Кодът не се променя."""
display_text = str(display_text).strip()
if not display_text:
raise ValueError("Текстът на картата е задължителен.")
cursor = conn.cursor()
cursor.execute(
"UPDATE dbo.ITD_Cards SET DisplayText = ?, UpdatedAt = SYSUTCDATETIME() WHERE Id = ?",
(display_text, card_id),
)
if cursor.rowcount == 0:
raise ValueError("Картата не е намерена.")
def delete_card(conn, card_id):
"""Изтрива карта и всички нейни цикли (ITD_Cycles)."""
card_id = int(card_id)
cursor = conn.cursor()
cursor.execute("DELETE FROM dbo.ITD_Cycles WHERE CardId = ?", (card_id,))
cursor.execute("DELETE FROM dbo.ITD_Cards WHERE Id = ?", (card_id,))
if cursor.rowcount == 0:
raise ValueError("Картата не е намерена.")