# -*- coding: utf-8 -*- """ITD - общ модул за връзка и зареждане на данни от базата.""" import json import os from datetime import datetime, timezone try: import pyodbc except ImportError: pyodbc = None _LAST_APPSETTINGS_PATH = None _LAST_CONNECTION_STRING_REDACTED = None def get_last_appsettings_path(): """Връща пълния път към последно намерения appsettings.json (или None).""" return _LAST_APPSETTINGS_PATH def get_last_connection_string_redacted(): """Връща последно използвания connection string с маскирани пароли (или None).""" return _LAST_CONNECTION_STRING_REDACTED def _redact_conn_str(s: str) -> str: if not s: return s redacted_parts = [] for part in str(s).split(";"): p = part.strip() if not p: continue if "=" not in p: redacted_parts.append(p) continue k, v = p.split("=", 1) kl = k.strip().lower() if kl in ("pwd", "password"): redacted_parts.append(f"{k}=***") else: redacted_parts.append(f"{k}={v}") return ";".join(redacted_parts) + ";" def load_connection_string(): """Зарежда connection string от appsettings.json.""" import sys global _LAST_APPSETTINGS_PATH global _LAST_CONNECTION_STRING_REDACTED paths = [] if getattr(sys, "frozen", False): paths.append(os.path.join(os.path.dirname(sys.executable), "appsettings.json")) paths.extend(("appsettings.json", os.path.join(os.path.dirname(__file__), "appsettings.json"))) for path in paths: if os.path.isfile(path): _LAST_APPSETTINGS_PATH = os.path.abspath(path) with open(path, "r", encoding="utf-8") as f: cfg = json.load(f) cs = cfg.get("ConnectionStrings", {}) sql = cs.get("SqlServer", "") if sql: parts = {} for part in sql.split(";"): part = part.strip() if not part or "=" not in part: continue k, v = part.split("=", 1) parts[k.strip().lower()] = v.strip() driver = "ODBC Driver 17 for SQL Server" conn_str = ( f"DRIVER={{{driver}}};" f"SERVER={parts.get('server', '')};" f"DATABASE={parts.get('database', '')};" f"UID={parts.get('user id', '')};" f"PWD={parts.get('password', '')};" + ("TrustServerCertificate=yes;" if parts.get("trustservercertificate", "").lower() == "true" else "") ) _LAST_CONNECTION_STRING_REDACTED = _redact_conn_str(conn_str) return conn_str if cs.get("Odbc"): conn_str = cs["Odbc"] _LAST_CONNECTION_STRING_REDACTED = _redact_conn_str(conn_str) return conn_str raise ValueError("В appsettings.json липсва ConnectionStrings:SqlServer или Odbc") tried = [os.path.abspath(p) for p in paths] raise FileNotFoundError("Не е намерен appsettings.json. Пробвани пътища:\n- " + "\n- ".join(tried)) def get_connection(): """Отваря връзка към SQL Server (autocommit).""" if pyodbc is None: raise RuntimeError("Инсталирайте pyodbc: pip install pyodbc") conn = pyodbc.connect(load_connection_string()) conn.autocommit = True return conn def _parse_iso(s): if s is None: return None if hasattr(s, "isoformat"): return s try: return datetime.fromisoformat(str(s).replace("Z", "+00:00")) except Exception: return None def _utc_to_local(dt): """ Преобразува datetime от БД (съхранен като UTC чрез SYSUTCDATETIME()) в локално време за показване. Така няма разминаване с 2 часа (UTC+2) при потребители в България. """ if dt is None: return None if hasattr(dt, "tzinfo") and dt.tzinfo is not None: return dt.astimezone().replace(tzinfo=None) # naive datetime от pyodbc = считаме за UTC return dt.replace(tzinfo=timezone.utc).astimezone().replace(tzinfo=None) def utc_to_local_for_display(dt): """Публична функция за показване на дата от БД в локално време (за main.py и др.).""" return _utc_to_local(dt) def _format_duration(minutes): if minutes is None: return "" if minutes < 60: return f"{int(minutes)}м" h, m = divmod(int(minutes), 60) return f"{h}ч {m}м" def load_cycles(conn, limit=200): """ Зарежда последните цикли от ITD_Cycles. Всеки ред = един цикъл с 5 полета за време (Post1At..Post5At) + ServiceClosed. """ cursor = conn.cursor() cursor.execute(""" SELECT y.Id, y.CardId, y.CycleNo, y.Post1At, y.Post2At, y.Post3At, y.Post4At, y.Post5At, y.ServiceClosed, y.ServiceClosedAt, c.Code, c.DisplayText, c.IsActive, c.UpdatedAt FROM dbo.ITD_Cycles y JOIN dbo.ITD_Cards c ON c.Id = y.CardId ORDER BY y.Post1At DESC """) rows = cursor.fetchall() result = [] for r in rows: cycle_id = r.Id card_id = r.CardId t1 = _utc_to_local(_parse_iso(r.Post1At)) t2 = _utc_to_local(_parse_iso(r.Post2At)) t3 = _utc_to_local(_parse_iso(r.Post3At)) t4 = _utc_to_local(_parse_iso(r.Post4At)) t5 = _utc_to_local(_parse_iso(r.Post5At)) service_closed = bool(getattr(r, "ServiceClosed", False)) service_closed_at = getattr(r, "ServiceClosedAt", None) manual_dt = _utc_to_local(_parse_iso(service_closed_at)) if service_closed_at else None def _fmt_dt(dt): return dt.strftime("%d.%m %H:%M") if dt else "" post_times = { 1: _fmt_dt(t1), 2: _fmt_dt(t2), 3: _fmt_dt(t3), 4: _fmt_dt(t4), 5: _fmt_dt(t5), } post_durations = {1: "", 2: "", 3: "", 4: "", 5: ""} if t1 and t2: post_durations[1] = _format_duration((t2 - t1).total_seconds() / 60) if t2 and t3: post_durations[2] = _format_duration((t3 - t2).total_seconds() / 60) if t3 and t4: post_durations[3] = _format_duration((t4 - t3).total_seconds() / 60) if t4 and t5: post_durations[4] = _format_duration((t5 - t4).total_seconds() / 60) reg_dt = t2 or t1 reg_str = reg_dt.strftime("%d.%m.%Y %H:%M") if reg_dt else "" if t1 and t2: time_reg_entry = _format_duration((t2 - t1).total_seconds() / 60) else: time_reg_entry = "" parts = [] if t2 and t3: parts.append(f"2→3: {_format_duration((t3 - t2).total_seconds() / 60)}") if t3 and t4: parts.append(f"3→4: {_format_duration((t4 - t3).total_seconds() / 60)}") if t4 and t5: parts.append(f"4→5: {_format_duration((t5 - t4).total_seconds() / 60)}") time_between = " | ".join(parts) if parts else "" exit_dt = t5 or manual_dt service_exit = bool(service_closed and not t5) if not exit_dt: exit_str = "" elif service_closed and not t5: # Служебно затваряне – запазваме сегашния текст (дата/час) exit_str = manual_dt.strftime("%d.%m.%Y %H:%M") if manual_dt else "Служебен изход" if manual_dt: exit_str = f"Служебен изход - {exit_str}" else: # Нормално напускане (има пост 5) – показваме общ престой от пост 1 до пост 5 if t1 and t5: total_mins = (t5 - t1).total_seconds() / 60 exit_str = _format_duration(total_mins) else: exit_str = exit_dt.strftime("%d.%m.%Y %H:%M") can_close = t5 is None and not service_closed cycle_started_raw = r.Post1At cycle_started_norm = _normalize_cycle_started_at(cycle_started_raw) result.append({ "cycle_id": cycle_id, "code": getattr(r, "Code", None) or "", "display_text": r.DisplayText, "reg_datetime": reg_str, "time_reg_entry": time_reg_entry, "time_between_posts": time_between, "exit_datetime": exit_str, "post1": f"{post_times[1]} {post_durations[1]}", "post2": f"{post_times[2]} {post_durations[2]}", "post3": f"{post_times[3]} {post_durations[3]}", "post4": f"{post_times[4]} {post_durations[4]}", "post5": f"{post_times[5]} {post_durations[5]}", "service_exit": service_exit, "can_close": can_close, "card_id": card_id, "is_active": bool(getattr(r, "IsActive", 1)), "card_updated_at": getattr(r, "UpdatedAt", None), "cycle_started_at": cycle_started_raw, "cycle_started_at_normalized": cycle_started_norm, }) return result[:limit] def _normalize_cycle_started_at(cycle_started_at): """Нормализира датата за цикъл до naive UTC без микросекунди – един и същ ключ при запис, търсене и показване.""" if cycle_started_at is None: return None if hasattr(cycle_started_at, "year"): dt = cycle_started_at else: dt = _parse_iso(cycle_started_at) if dt is None: return None try: if getattr(dt, "tzinfo", None): dt = dt.astimezone(timezone.utc) return dt.replace(microsecond=0, tzinfo=None) except Exception: try: return dt.replace(microsecond=0, tzinfo=None) except Exception: return dt def close_cycle_manually(conn, cycle_id): """Служебно затваряне на цикъл: маркира този цикъл в ITD_Cycles (ServiceClosed=1).""" try: cycle_id = int(cycle_id) except (TypeError, ValueError): raise ValueError("Невалиден идентификатор на цикъл.") cursor = conn.cursor() cursor.execute(""" UPDATE dbo.ITD_Cycles SET ServiceClosed = 1, ServiceClosedAt = SYSUTCDATETIME() WHERE Id = ? AND ServiceClosed = 0 """, (cycle_id,)) return cursor.rowcount > 0 def undo_manual_cycle_close(conn, cycle_id): """Отмяна на служебно затваряне: нулира ServiceClosed за този цикъл в ITD_Cycles.""" try: cycle_id = int(cycle_id) except (TypeError, ValueError): raise ValueError("Невалиден идентификатор на цикъл.") cursor = conn.cursor() cursor.execute(""" UPDATE dbo.ITD_Cycles SET ServiceClosed = 0, ServiceClosedAt = NULL WHERE Id = ? """, (cycle_id,)) return cursor.rowcount > 0 def get_card_by_code(conn, code): """Търси карта по Code (QR стойност). Връща None или dict с Id, Code, DisplayText, IsActive.""" if not code or not str(code).strip(): return None cursor = conn.cursor() cursor.execute( "SELECT Id, Code, DisplayText, IsActive FROM dbo.ITD_Cards WHERE Code = ? AND IsActive = 1", (str(code).strip(),), ) row = cursor.fetchone() if not row: return None return {"id": row.Id, "code": row.Code, "display_text": row.DisplayText, "is_active": bool(row.IsActive)} def add_registration(conn, card_id, post_index): """Добавя регистрация за карта на даден пост (1–5). Пост 1 = нов ред в ITD_Cycles; постове 2–5 = UPDATE на текущия цикъл.""" if post_index == 1 and has_open_cycle(conn, card_id): raise ValueError("Вече сте влезли. Моля, излезте преди нова регистрация.") cursor = conn.cursor() if post_index == 1: cursor.execute(""" INSERT INTO dbo.ITD_Cycles (CardId, CycleNo, Post1At, Post2At, Post3At, Post4At, Post5At, ServiceClosed) VALUES (?, (SELECT ISNULL(MAX(CycleNo), 0) + 1 FROM dbo.ITD_Cycles WHERE CardId = ?), SYSUTCDATETIME(), NULL, NULL, NULL, NULL, 0) """, (card_id, card_id)) else: cursor.execute(""" SELECT TOP 1 Id FROM dbo.ITD_Cycles WHERE CardId = ? AND Post5At IS NULL AND ServiceClosed = 0 ORDER BY Post1At DESC """, (card_id,)) row = cursor.fetchone() if not row: raise ValueError("Няма отворен цикъл за тази карта.") col = f"Post{post_index}At" cursor.execute(f"UPDATE dbo.ITD_Cycles SET {col} = SYSUTCDATETIME() WHERE Id = ?", (row.Id,)) def set_card_active(conn, card_id, is_active): """Активира или деактивира карта. При деактивирана карта не може да се влиза (регистрира).""" cursor = conn.cursor() cursor.execute( "UPDATE dbo.ITD_Cards SET IsActive = ?, UpdatedAt = SYSUTCDATETIME() WHERE Id = ?", (1 if is_active else 0, card_id), ) def get_last_post_index(conn, card_id): """Връща последния попълнен пост (1–5) за дадена карта от последния цикъл, или None ако няма цикли.""" cursor = conn.cursor() cursor.execute( """ SELECT TOP 1 Post1At, Post2At, Post3At, Post4At, Post5At, ServiceClosed FROM dbo.ITD_Cycles WHERE CardId = ? ORDER BY Post1At DESC """, (card_id,), ) row = cursor.fetchone() if not row: return None if row.Post5At or getattr(row, "ServiceClosed", False): return 5 if row.Post4At: return 4 if row.Post3At: return 3 if row.Post2At: return 2 return 1 def has_open_cycle(conn, card_id): """True ако картата има отворен цикъл (ред в ITD_Cycles с Post5At IS NULL и ServiceClosed = 0).""" cursor = conn.cursor() cursor.execute( """ SELECT 1 FROM dbo.ITD_Cycles WHERE CardId = ? AND Post5At IS NULL AND ServiceClosed = 0 """, (card_id,), ) return cursor.fetchone() is not None def get_next_post_index(conn, card_id): """Връща следващия пост за регистрация (1–5). След служебно затваряне или пост 5 следва пост 1.""" cursor = conn.cursor() cursor.execute( """ SELECT TOP 1 Post2At, Post3At, Post4At, Post5At, ServiceClosed FROM dbo.ITD_Cycles WHERE CardId = ? ORDER BY Post1At DESC """, (card_id,), ) row = cursor.fetchone() if not row: return 1 if getattr(row, "ServiceClosed", False) or row.Post5At: return 1 if not row.Post2At: return 2 if not row.Post3At: return 3 if not row.Post4At: return 4 return 5 def load_cards(conn): """Връща списък с всички карти за административния модул.""" cursor = conn.cursor() cursor.execute( """ SELECT Id, Code, DisplayText, IsActive, CreatedAt, UpdatedAt FROM dbo.ITD_Cards ORDER BY DisplayText """ ) rows = cursor.fetchall() result = [] for r in rows: result.append( { "id": r.Id, "code": r.Code, "display_text": r.DisplayText, "is_active": bool(r.IsActive), "created_at": r.CreatedAt, "updated_at": r.UpdatedAt, } ) return result def delete_parking_only_cycles_for_card(conn, card_code): """ Изтрива всички цикли „само паркинг“ (само Post1At е попълнен) за карта с даден Code. Връща (брой изтрити цикли, 0) за съвместимост със стария API. """ card_code = str(card_code).strip() if not card_code: raise ValueError("Кодът на картата е задължителен.") cursor = conn.cursor() cursor.execute("SELECT Id FROM dbo.ITD_Cards WHERE Code = ?", (card_code,)) row = cursor.fetchone() if not row: return (0, 0) card_id = int(row.Id) cursor.execute( """ DELETE FROM dbo.ITD_Cycles WHERE CardId = ? AND Post2At IS NULL AND Post3At IS NULL AND Post4At IS NULL AND Post5At IS NULL """, (card_id,), ) return (cursor.rowcount, 0) def create_card(conn, display_text, is_active=True): """Създава нова карта. Кодът се задава служебно равен на Id. Връща Id на картата.""" display_text = str(display_text).strip() if not display_text: raise ValueError("Текстът на картата е задължителен.") cursor = conn.cursor() # Временен уникален код (NEWID), след което Code се обновява на Id cursor.execute( """ INSERT INTO dbo.ITD_Cards (Code, DisplayText, IsActive) OUTPUT INSERTED.Id VALUES (CONVERT(NVARCHAR(36), NEWID()), ?, ?) """, (display_text, 1 if is_active else 0), ) row = cursor.fetchone() new_id = row[0] if row else None if new_id is None: raise RuntimeError("Картата е записана, но не може да се прочете новото Id.") new_id = int(new_id) cursor.execute("UPDATE dbo.ITD_Cards SET Code = ? WHERE Id = ?", (str(new_id), new_id)) return new_id def update_card(conn, card_id, display_text): """Обновява текста на карта (DisplayText). Кодът не се променя.""" display_text = str(display_text).strip() if not display_text: raise ValueError("Текстът на картата е задължителен.") cursor = conn.cursor() cursor.execute( "UPDATE dbo.ITD_Cards SET DisplayText = ?, UpdatedAt = SYSUTCDATETIME() WHERE Id = ?", (display_text, card_id), ) if cursor.rowcount == 0: raise ValueError("Картата не е намерена.") def delete_card(conn, card_id): """Изтрива карта и всички нейни цикли (ITD_Cycles).""" card_id = int(card_id) cursor = conn.cursor() cursor.execute("DELETE FROM dbo.ITD_Cycles WHERE CardId = ?", (card_id,)) cursor.execute("DELETE FROM dbo.ITD_Cards WHERE Id = ?", (card_id,)) if cursor.rowcount == 0: raise ValueError("Картата не е намерена.")