feat: add scriptable housekeeper rule engine

This commit is contained in:
Marcel Peterkau
2026-06-21 17:43:04 +02:00
parent e63abbae81
commit 4bc1a8a200
18 changed files with 936 additions and 207 deletions
+291 -205
View File
@@ -1,18 +1,21 @@
from __future__ import annotations
import calendar
import copy
import json
import os
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import date, timedelta
from datetime import date, datetime
from pathlib import Path
from typing import Any
from uuid import uuid4
from ccma.domain.dates import (
DateValidationError,
parse_iso_date,
validate_birth_date,
validate_member_dates,
)
from ccma.domain.models import HousekeeperFinding
from ccma.rules.api import RuleAction, RuleContext
from ccma.rules.loader import LoadedRule, load_rules
from ccma.services.intervals import AnniversaryInterval, parse_anniversary_intervals
from ccma.storage.repository import MemberRepository
from ccma.storage.atomic import read_json, write_json_atomic
from ccma.storage.repository import MemberRepository, RepositoryError
@dataclass(frozen=True, slots=True)
@@ -48,212 +51,295 @@ class Housekeeper:
def __init__(self, repository: MemberRepository, settings: HousekeeperSettings | None = None):
self.repository = repository
self.settings = settings or HousekeeperSettings()
self.state_path = repository.root / "hausmeister.json"
self.lock_path = repository.root / ".hausmeister.lock"
def run(self, today: date | None = None) -> list[HousekeeperFinding]:
current_date = today or date.today()
findings: list[HousekeeperFinding] = []
for member in self.repository.list_members():
try:
validate_member_dates(
birth_date=member.birth_date,
accepted_at=member.accepted_at,
membership_started_at=member.membership_started_at,
today=current_date,
)
except DateValidationError as exc:
findings.append(
HousekeeperFinding(
severity="error",
member_id=member.member_id,
code="invalid_member_dates",
title=f"{member.display_name}: Ungültige Datumsangabe",
detail=str(exc),
)
)
if member.status in {
"active",
"suspended_contribution",
"resigned_end_of_year",
"honorary",
}:
birthday = self._birthday_finding(
member.member_id, member.display_name, member.birth_date, current_date
)
if birthday:
findings.append(birthday)
findings.extend(
self._anniversary_findings(
member.member_id,
member.display_name,
member.membership_started_at,
current_date,
)
)
if not member.contribution_rule_id and not member.honorary:
findings.append(
HousekeeperFinding(
severity="error",
member_id=member.member_id,
code="missing_contribution_rule",
title=f"{member.display_name}: Beitragsregel fehlt",
detail="Dem Mitglied ist keine Beitragsregel zugeordnet.",
)
)
with _exclusive_lock(self.lock_path):
original = self._load_state()
working = copy.deepcopy(original)
counter = int(original.get("run_counter", 0)) + 1
run_id = f"{current_date.isoformat()}:{counter:06d}"
now = datetime.now().astimezone().isoformat(timespec="seconds")
items = _items_by_key(working)
successful_scopes: set[tuple[str, str]] = set()
if member.status == "accepted_pending_payment" and member.accepted_at:
accepted = _parse_date(member.accepted_at)
if accepted:
deadline = accepted + timedelta(days=28)
days = (deadline - current_date).days
if days < 0:
findings.append(
HousekeeperFinding(
severity="error",
rules = load_rules(self.repository.root)
repository_config = self.repository.get_configuration()
for member in self.repository.list_members():
for rule in rules:
scope = (rule.rule_id, member.member_id)
try:
context = RuleContext(
member=member,
contributions=self.repository.get_contributions(member.member_id),
today=current_date,
settings=self.settings,
repository_config=repository_config,
)
actions = rule.evaluate(context)
if not isinstance(actions, list):
raise TypeError("evaluate(context) muss eine Liste zurückgeben")
for action in actions:
self._apply_action(items, action, rule, run_id, now)
except Exception as exc:
self._refresh_task(
items,
RuleAction(
key=f"{rule.rule_id}:{member.member_id}:rule-error",
action="task",
member_id=member.member_id,
code="initial_payment_overdue",
title=f"{member.display_name}: Erstzahlung überfällig",
detail=f"Die Vierwochenfrist ist seit {-days} Tagen überschritten.",
due_date=deadline,
)
)
elif days <= 7:
findings.append(
HousekeeperFinding(
severity="warning",
member_id=member.member_id,
code="initial_payment_due_soon",
title=f"{member.display_name}: Erstzahlung bald fällig",
detail=f"Die Vierwochenfrist endet in {days} Tagen.",
due_date=deadline,
)
payload={
"severity": "error",
"title": f"{member.display_name}: Regel fehlgeschlagen",
"detail": f"{rule.filename}: {exc}",
},
),
rule,
run_id,
now,
)
else:
successful_scopes.add(scope)
contributions = self.repository.get_contributions(member.member_id)
for claim in contributions.claims:
if str(claim.get("status", "open")) not in {"open", "partially_paid"}:
continue
due = _parse_date(str(claim.get("due_date", "")))
if not due:
continue
days = (due - current_date).days
title = str(claim.get("title") or "Beitragsforderung")
if days < 0:
findings.append(
HousekeeperFinding(
severity="error",
member_id=member.member_id,
code="claim_overdue",
title=f"{member.display_name}: {title} überfällig",
detail=f"Fälligkeit war vor {-days} Tagen.",
due_date=due,
)
)
elif days <= 14:
findings.append(
HousekeeperFinding(
severity="info",
member_id=member.member_id,
code="claim_due_soon",
title=f"{member.display_name}: {title} bald fällig",
detail=f"Fälligkeit in {days} Tagen.",
due_date=due,
)
)
severity_order = {"error": 0, "warning": 1, "info": 2}
return sorted(
findings, key=lambda item: (severity_order.get(item.severity, 9), item.due_date or date.max)
)
def _birthday_finding(
self,
member_id: str,
name: str,
birth_date_value: str,
today: date,
) -> HousekeeperFinding | None:
try:
birth_date = validate_birth_date(birth_date_value, today=today)
except DateValidationError:
return None
if not birth_date:
return None
occurrences = [_birthday_in_year(birth_date, year) for year in range(today.year - 1, today.year + 2)]
occurrence = min(occurrences, key=lambda value: abs((value - today).days))
delta = (occurrence - today).days
if delta > self.settings.birthday_days_before or delta < -self.settings.birthday_days_after:
return None
age = occurrence.year - birth_date.year
title = _relative_title(name, delta, "Geburtstag")
detail = f"Wird {age} Jahre alt." if delta >= 0 else f"Ist {age} Jahre alt geworden."
return HousekeeperFinding(
severity="info",
member_id=member_id,
code="birthday",
title=title,
detail=detail,
due_date=occurrence,
)
def _anniversary_findings(
self,
member_id: str,
name: str,
started_at_value: str,
today: date,
) -> list[HousekeeperFinding]:
try:
started_at = parse_iso_date(started_at_value, "Mitglied seit")
except DateValidationError:
return []
if not started_at:
return []
findings: list[HousekeeperFinding] = []
for interval in self.settings.anniversary_intervals:
try:
target = interval.target_date(started_at)
except (OverflowError, ValueError):
continue
delta = (target - today).days
if delta > self.settings.anniversary_days_before or delta < -self.settings.anniversary_days_after:
continue
occasion = _anniversary_name(interval)
findings.append(
HousekeeperFinding(
severity="info",
member_id=member_id,
code="membership_anniversary",
title=_relative_title(name, delta, occasion),
detail=f"Mitglied seit {started_at:%d.%m.%Y}.",
due_date=target,
)
self._resolve_stale_tasks(items, successful_scopes, run_id, now)
working.update(
{
"schema_version": 1,
"run_counter": counter,
"last_completed_run": run_id,
"last_completed_at": now,
"rules": [_rule_record(rule) for rule in rules],
"items": sorted(items.values(), key=lambda item: str(item.get("key", ""))),
}
)
return findings
write_json_atomic(self.state_path, working)
return _open_findings(working["items"])
def _apply_action(
self,
items: dict[str, dict[str, Any]],
action: RuleAction,
rule: LoadedRule,
run_id: str,
now: str,
) -> None:
self._validate_action(action, rule)
if action.action == "task":
self._refresh_task(items, action, rule, run_id, now)
return
if action.action == "create_claim":
self._create_claim(items, action, rule, run_id, now)
return
raise ValueError(f"Unbekannte Aktion: {action.action}")
def _create_claim(
self,
items: dict[str, dict[str, Any]],
action: RuleAction,
rule: LoadedRule,
run_id: str,
now: str,
) -> None:
claim_key = str(action.payload.get("claim_key", "")).strip()
if not claim_key:
raise ValueError("create_claim benötigt claim_key")
data = self.repository.get_contributions(action.member_id)
existing = next(
(claim for claim in data.claims if str(claim.get("claim_key", "")) == claim_key),
None,
)
if existing is None:
claim = {
**dict(action.payload),
"claim_id": str(uuid4()),
"claim_key": claim_key,
"status": "open",
"created_at": now,
"rule": {
"id": rule.rule_id,
"filename": rule.filename,
"source": rule.source,
"script_hash": rule.script_hash,
},
}
data.claims.append(claim)
self.repository.save_contributions(action.member_id, data)
self.repository.append_event(
action.member_id,
event_type="claim_created",
summary=f"Forderung automatisch angelegt: {claim.get('title', claim_key)}",
references={"claim_id": str(claim["claim_id"]), "claim_key": claim_key},
data={"amount": claim.get("amount", ""), "due_date": claim.get("due_date", "")},
)
target_id = claim["claim_id"]
else:
target_id = existing.get("claim_id", "")
item = items.get(action.key, {})
item.update(
{
"key": action.key,
"rule_id": rule.rule_id,
"rule_file": rule.filename,
"rule_source": rule.source,
"member_id": action.member_id,
"action": "create_claim",
"status": "applied",
"first_seen_run": item.get("first_seen_run", run_id),
"last_seen_run": run_id,
"applied_run": item.get("applied_run", run_id),
"applied_at": item.get("applied_at", now),
"target": {"claim_id": target_id, "claim_key": claim_key},
}
)
items[action.key] = item
@staticmethod
def _refresh_task(
items: dict[str, dict[str, Any]],
action: RuleAction,
rule: LoadedRule,
run_id: str,
now: str,
) -> None:
item = items.get(action.key, {})
was_resolved = item.get("status") == "resolved"
payload = dict(action.payload)
item.update(
{
"key": action.key,
"rule_id": rule.rule_id,
"rule_file": rule.filename,
"rule_source": rule.source,
"member_id": action.member_id,
"action": "task",
"status": "open",
"severity": str(payload.get("severity", "info")),
"code": str(payload.get("code", rule.rule_id)),
"title": str(payload.get("title", action.key)),
"detail": str(payload.get("detail", "")),
"due_date": payload.get("due_date"),
"first_seen_run": item.get("first_seen_run", run_id),
"first_seen_at": item.get("first_seen_at", now),
"last_seen_run": run_id,
"last_seen_at": now,
"seen_count": int(item.get("seen_count", 0)) + 1,
"reopened_count": int(item.get("reopened_count", 0)) + (1 if was_resolved else 0),
"resolved_run": None,
"resolved_at": None,
}
)
items[action.key] = item
@staticmethod
def _validate_action(action: RuleAction, rule: LoadedRule) -> None:
if not isinstance(action, RuleAction):
raise TypeError("Regeln dürfen nur RuleAction-Objekte zurückgeben")
prefix = f"{rule.rule_id}:{action.member_id}:"
if not action.key.startswith(prefix):
raise ValueError(f"Aktions-Key muss mit {prefix} beginnen")
@staticmethod
def _resolve_stale_tasks(
items: dict[str, dict[str, Any]],
successful_scopes: set[tuple[str, str]],
run_id: str,
now: str,
) -> None:
for item in items.values():
if item.get("action") != "task" or item.get("status") != "open":
continue
scope = (str(item.get("rule_id", "")), str(item.get("member_id", "")))
if scope not in successful_scopes or item.get("last_seen_run") == run_id:
continue
item["status"] = "resolved"
item["resolved_run"] = run_id
item["resolved_at"] = now
def _load_state(self) -> dict[str, Any]:
if not self.state_path.exists():
return {"schema_version": 1, "run_counter": 0, "items": []}
try:
state = read_json(self.state_path)
if not isinstance(state, dict) or not isinstance(state.get("items", []), list):
raise ValueError("ungültige Struktur")
return state
except (OSError, ValueError, TypeError, json.JSONDecodeError) as exc:
raise RepositoryError(f"hausmeister.json konnte nicht gelesen werden: {exc}") from exc
def _parse_date(value: str) -> date | None:
def _items_by_key(state: dict[str, Any]) -> dict[str, dict[str, Any]]:
items: dict[str, dict[str, Any]] = {}
for raw in state.get("items", []):
if isinstance(raw, dict) and raw.get("key"):
items[str(raw["key"])] = dict(raw)
return items
def _rule_record(rule: LoadedRule) -> dict[str, str]:
return {
"rule_id": rule.rule_id,
"filename": rule.filename,
"source": rule.source,
"script_hash": rule.script_hash,
}
def _open_findings(items: list[dict[str, Any]]) -> list[HousekeeperFinding]:
findings: list[HousekeeperFinding] = []
for item in items:
if item.get("action") != "task" or item.get("status") != "open":
continue
due_date = None
try:
if item.get("due_date"):
due_date = date.fromisoformat(str(item["due_date"]))
except ValueError:
pass
findings.append(
HousekeeperFinding(
severity=str(item.get("severity", "info")),
member_id=str(item.get("member_id", "")),
code=str(item.get("code", item.get("rule_id", "housekeeper"))),
title=str(item.get("title", item.get("key", "Hausmeister"))),
detail=str(item.get("detail", "")),
due_date=due_date,
)
)
severity_order = {"error": 0, "warning": 1, "info": 2}
return sorted(
findings,
key=lambda item: (severity_order.get(item.severity, 9), item.due_date or date.max, item.title),
)
@contextmanager
def _exclusive_lock(path: Path):
path.parent.mkdir(parents=True, exist_ok=True)
descriptor: int | None = None
for _attempt in range(2):
try:
descriptor = os.open(path, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o600)
os.write(descriptor, str(os.getpid()).encode("ascii"))
break
except FileExistsError:
if _lock_owner_is_running(path):
raise RepositoryError("Der Hausmeister läuft bereits.") from None
path.unlink(missing_ok=True)
if descriptor is None:
raise RepositoryError("Hausmeister-Sperre konnte nicht angelegt werden.")
try:
return date.fromisoformat(value[:10])
except (TypeError, ValueError):
return None
yield
finally:
os.close(descriptor)
path.unlink(missing_ok=True)
def _birthday_in_year(birth_date: date, year: int) -> date:
day = min(birth_date.day, calendar.monthrange(year, birth_date.month)[1])
return date(year, birth_date.month, day)
def _relative_title(name: str, delta: int, occasion: str) -> str:
if delta == 0:
return f"{name} hat heute {occasion}"
days = "Tag" if abs(delta) == 1 else "Tagen"
if delta > 0:
return f"{name} hat in {delta} {days} {occasion}"
return f"{name} hatte vor {-delta} {days} {occasion}"
def _anniversary_name(interval: AnniversaryInterval) -> str:
if interval.unit == "D":
return f"{interval.value}-Tage-Mitgliedsjubiläum"
if interval.unit == "M":
return f"{interval.value}-Monats-Mitgliedsjubiläum"
return f"{interval.value}-jähriges Mitgliedsjubiläum"
def _lock_owner_is_running(path: Path) -> bool:
try:
pid = int(path.read_text(encoding="ascii").strip())
os.kill(pid, 0)
except (OSError, ValueError):
return False
return True