from __future__ import annotations import copy import json import math import os import time from contextlib import contextmanager from dataclasses import dataclass, field from datetime import date, datetime from pathlib import Path from typing import Any from uuid import uuid4 from ccma.domain.models import HousekeeperFinding from ccma.rules.api import RuleAction, RuleContext from ccma.rules.loader import LoadedRule, load_rules from ccma.services.intervals import AnniversaryInterval, parse_anniversary_intervals from ccma.storage.atomic import read_json, write_json_atomic from ccma.storage.repository import MemberRepository, RepositoryError @dataclass(frozen=True, slots=True) class HousekeeperSettings: birthday_days_before: int = 7 birthday_days_after: int = 2 anniversary_days_before: int = 14 anniversary_days_after: int = 7 anniversary_intervals: tuple[AnniversaryInterval, ...] = field( default_factory=lambda: tuple(parse_anniversary_intervals("1Y;5Y;10Y;25Y;50Y")) ) @classmethod def from_values( cls, *, birthday_days_before: int, birthday_days_after: int, anniversary_days_before: int, anniversary_days_after: int, anniversary_intervals: str, ) -> HousekeeperSettings: return cls( birthday_days_before=min(365, max(0, birthday_days_before)), birthday_days_after=min(365, max(0, birthday_days_after)), anniversary_days_before=min(365, max(0, anniversary_days_before)), anniversary_days_after=min(365, max(0, anniversary_days_after)), anniversary_intervals=tuple(parse_anniversary_intervals(anniversary_intervals)), ) class Housekeeper: def __init__(self, repository: MemberRepository, settings: HousekeeperSettings | None = None): self.repository = repository self.settings = settings or HousekeeperSettings() self.state_path = repository.root / "housekeeper.json" self.lock_path = repository.root / ".housekeeper.lock" def run( self, today: date | None = None, *, member_delay: float = 0.0, ) -> list[HousekeeperFinding]: current_date = today or date.today() delay = _non_negative_delay(member_delay) with _exclusive_lock(self.lock_path): original = self._load_state() working = copy.deepcopy(original) counter = int(original.get("run_counter", 0)) + 1 run_id = f"{current_date.isoformat()}:{counter:06d}" now = datetime.now().astimezone().isoformat(timespec="seconds") items = _items_by_key(working) successful_scopes: set[tuple[str, str]] = set() member_ids = set(self.repository.list_member_ids()) _remove_orphaned_member_items(items, member_ids) rules = load_rules(self.repository.root) repository_config = self.repository.get_configuration() for index, member_id in enumerate(sorted(member_ids)): if index and delay: time.sleep(delay) try: member, contributions = self.repository.preflight_member_record(member_id) except RepositoryError as exc: self._refresh_record_integrity_task(items, member_id, str(exc), run_id, now) successful_scopes.add(("member-record-check", member_id)) continue successful_scopes.add(("member-record-check", member_id)) for rule in rules: scope = (rule.rule_id, member.member_id) try: context = RuleContext( member=member, contributions=contributions, today=current_date, settings=self.settings, repository_config=repository_config, ) actions = rule.evaluate(context) if not isinstance(actions, list): raise TypeError("evaluate(context) muss eine Liste zurückgeben") for action in actions: self._apply_action(items, action, rule, run_id, now) if action.action == "create_claim": contributions = self.repository.get_contributions(member.member_id) except Exception as exc: self._refresh_task( items, RuleAction( key=f"{rule.rule_id}:{member.member_id}:rule-error", action="task", member_id=member.member_id, payload={ "severity": "error", "title": f"{member.display_name}: Regel fehlgeschlagen", "detail": f"{rule.filename}: {exc}", }, ), rule, run_id, now, ) else: successful_scopes.add(scope) self._resolve_stale_tasks(items, successful_scopes, run_id, now) working.update( { "schema_version": 1, "run_counter": counter, "last_completed_run": run_id, "last_completed_at": now, "rules": [_rule_record(rule) for rule in rules], "items": sorted(items.values(), key=lambda item: str(item.get("key", ""))), } ) self._write_state(working) return _open_findings(working["items"]) def delete_task(self, key: str) -> list[HousekeeperFinding]: selected_key = key.strip() if not selected_key: raise RepositoryError("Der Task hat keinen gültigen Key.") with _exclusive_lock(self.lock_path): state = self._load_state() items = _items_by_key(state) item = items.get(selected_key) if item is None: return _open_findings(list(items.values())) if item.get("action") != "task": raise RepositoryError("Nur Hausmeister-Tasks können manuell gelöscht werden.") del items[selected_key] state["items"] = sorted(items.values(), key=lambda value: str(value.get("key", ""))) self._write_state(state) return _open_findings(state["items"]) @staticmethod def _refresh_record_integrity_task( items: dict[str, dict[str, Any]], member_id: str, detail: str, run_id: str, now: str, ) -> None: key = f"member-record-check:{member_id}:invalid-record" item = items.get(key, {}) was_resolved = item.get("status") == "resolved" item.update( { "key": key, "rule_id": "member-record-check", "rule_file": "", "rule_source": "housekeeper", "member_id": member_id, "action": "task", "status": "open", "severity": "error", "code": "invalid_member_record", "title": f"Mitgliederakte {member_id}: Daten beschädigt", "detail": f"{detail}. Für diese Akte wurden keine Regeln ausgeführt.", "due_date": None, "first_seen_run": item.get("first_seen_run", run_id), "first_seen_at": item.get("first_seen_at", now), "last_seen_run": run_id, "last_seen_at": now, "seen_count": int(item.get("seen_count", 0)) + 1, "reopened_count": int(item.get("reopened_count", 0)) + (1 if was_resolved else 0), "resolved_run": None, "resolved_at": None, } ) items[key] = item def _apply_action( self, items: dict[str, dict[str, Any]], action: RuleAction, rule: LoadedRule, run_id: str, now: str, ) -> None: self._validate_action(action, rule) if action.action == "task": self._refresh_task(items, action, rule, run_id, now) return if action.action == "create_claim": self._create_claim(items, action, rule, run_id, now) return raise ValueError(f"Unbekannte Aktion: {action.action}") def _create_claim( self, items: dict[str, dict[str, Any]], action: RuleAction, rule: LoadedRule, run_id: str, now: str, ) -> None: claim_key = str(action.payload.get("claim_key", "")).strip() if not claim_key: raise ValueError("create_claim benötigt claim_key") data = self.repository.get_contributions(action.member_id) existing = next( (claim for claim in data.claims if str(claim.get("claim_key", "")) == claim_key), None, ) if existing is None: claim = { **dict(action.payload), "claim_id": str(uuid4()), "claim_key": claim_key, "status": "open", "created_at": now, "rule": { "id": rule.rule_id, "filename": rule.filename, "source": rule.source, "script_hash": rule.script_hash, }, } if not claim.get("items"): claim["items"] = [ { "item_id": str(uuid4()), "type": "base", "description": str(claim.get("title", claim_key)), "quantity": "1.00", "unit_price": str(claim.get("amount", "0.00")), "amount": str(claim.get("amount", "0.00")), "created_at": now, } ] data.claims.append(claim) self.repository.save_contributions(action.member_id, data) self.repository.append_event( action.member_id, event_type="claim_created", summary=f"Forderung automatisch angelegt: {claim.get('title', claim_key)}", references={"claim_id": str(claim["claim_id"]), "claim_key": claim_key}, data={"amount": claim.get("amount", ""), "due_date": claim.get("due_date", "")}, ) target_id = claim["claim_id"] else: target_id = existing.get("claim_id", "") item = items.get(action.key, {}) item.update( { "key": action.key, "rule_id": rule.rule_id, "rule_file": rule.filename, "rule_source": rule.source, "member_id": action.member_id, "action": "create_claim", "status": "applied", "first_seen_run": item.get("first_seen_run", run_id), "last_seen_run": run_id, "applied_run": item.get("applied_run", run_id), "applied_at": item.get("applied_at", now), "target": {"claim_id": target_id, "claim_key": claim_key}, } ) items[action.key] = item @staticmethod def _refresh_task( items: dict[str, dict[str, Any]], action: RuleAction, rule: LoadedRule, run_id: str, now: str, ) -> None: item = items.get(action.key, {}) was_resolved = item.get("status") == "resolved" payload = dict(action.payload) item.update( { "key": action.key, "rule_id": rule.rule_id, "rule_file": rule.filename, "rule_source": rule.source, "member_id": action.member_id, "action": "task", "status": "open", "severity": str(payload.get("severity", "info")), "code": str(payload.get("code", rule.rule_id)), "title": str(payload.get("title", action.key)), "detail": str(payload.get("detail", "")), "due_date": payload.get("due_date"), "first_seen_run": item.get("first_seen_run", run_id), "first_seen_at": item.get("first_seen_at", now), "last_seen_run": run_id, "last_seen_at": now, "seen_count": int(item.get("seen_count", 0)) + 1, "reopened_count": int(item.get("reopened_count", 0)) + (1 if was_resolved else 0), "resolved_run": None, "resolved_at": None, } ) items[action.key] = item @staticmethod def _validate_action(action: RuleAction, rule: LoadedRule) -> None: if not isinstance(action, RuleAction): raise TypeError("Regeln dürfen nur RuleAction-Objekte zurückgeben") prefix = f"{rule.rule_id}:{action.member_id}:" if not action.key.startswith(prefix): raise ValueError(f"Aktions-Key muss mit {prefix} beginnen") @staticmethod def _resolve_stale_tasks( items: dict[str, dict[str, Any]], successful_scopes: set[tuple[str, str]], run_id: str, now: str, ) -> None: for item in items.values(): if item.get("action") != "task" or item.get("status") != "open": continue scope = (str(item.get("rule_id", "")), str(item.get("member_id", ""))) if scope not in successful_scopes or item.get("last_seen_run") == run_id: continue item["status"] = "resolved" item["resolved_run"] = run_id item["resolved_at"] = now def _load_state(self) -> dict[str, Any]: if not self.state_path.exists(): return {"schema_version": 1, "run_counter": 0, "items": []} try: state = read_json(self.state_path) if not isinstance(state, dict) or not isinstance(state.get("items", []), list): raise ValueError("ungültige Struktur") return state except (OSError, ValueError, TypeError, json.JSONDecodeError) as exc: raise RepositoryError(f"housekeeper.json konnte nicht gelesen werden: {exc}") from exc def _write_state(self, state: dict[str, Any]) -> None: write_json_atomic(self.state_path, state) def _items_by_key(state: dict[str, Any]) -> dict[str, dict[str, Any]]: items: dict[str, dict[str, Any]] = {} for raw in state.get("items", []): if isinstance(raw, dict) and raw.get("key"): items[str(raw["key"])] = dict(raw) return items def _rule_record(rule: LoadedRule) -> dict[str, str]: return { "rule_id": rule.rule_id, "filename": rule.filename, "source": rule.source, "script_hash": rule.script_hash, } def _open_findings(items: list[dict[str, Any]]) -> list[HousekeeperFinding]: findings: list[HousekeeperFinding] = [] for item in items: if item.get("action") != "task" or item.get("status") != "open": continue due_date = None try: if item.get("due_date"): due_date = date.fromisoformat(str(item["due_date"])) except ValueError: pass findings.append( HousekeeperFinding( severity=str(item.get("severity", "info")), member_id=str(item.get("member_id", "")), code=str(item.get("code", item.get("rule_id", "housekeeper"))), title=str(item.get("title", item.get("key", "Hausmeister"))), detail=str(item.get("detail", "")), due_date=due_date, key=str(item.get("key", "")), ) ) severity_order = {"error": 0, "warning": 1, "info": 2} return sorted( findings, key=lambda item: (severity_order.get(item.severity, 9), item.due_date or date.max, item.title), ) def _remove_orphaned_member_items(items: dict[str, dict[str, Any]], member_ids: set[str]) -> None: orphaned_keys = [ key for key, item in items.items() if item.get("member_id") and str(item["member_id"]) not in member_ids ] for key in orphaned_keys: del items[key] def _non_negative_delay(value: float) -> float: try: delay = float(value) except (TypeError, ValueError): return 0.0 return max(0.0, delay) if math.isfinite(delay) else 0.0 @contextmanager def _exclusive_lock(path: Path): path.parent.mkdir(parents=True, exist_ok=True) descriptor: int | None = None for _attempt in range(2): try: descriptor = os.open(path, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o600) os.write(descriptor, str(os.getpid()).encode("ascii")) break except FileExistsError: if _lock_owner_is_running(path): raise RepositoryError("Der Hausmeister läuft bereits.") from None path.unlink(missing_ok=True) if descriptor is None: raise RepositoryError("Hausmeister-Sperre konnte nicht angelegt werden.") try: yield finally: os.close(descriptor) path.unlink(missing_ok=True) def _lock_owner_is_running(path: Path) -> bool: try: pid = int(path.read_text(encoding="ascii").strip()) os.kill(pid, 0) except (OSError, ValueError): return False return True