Add JSON integrity hash checks

This commit is contained in:
Marcel Peterkau
2026-06-27 10:35:35 +02:00
parent d1dab793a6
commit 87e972bb43
9 changed files with 302 additions and 11 deletions
+89 -2
View File
@@ -79,7 +79,9 @@ class Housekeeper:
items = _items_by_key(working)
successful_scopes: set[tuple[str, str]] = set()
member_ids = set(self.repository.list_member_ids())
asset_ids = set(self.repository.list_asset_ids())
_remove_orphaned_member_items(items, member_ids)
_remove_orphaned_asset_items(items, asset_ids)
rules = load_rules(self.repository.root)
repository_config = self.repository.get_configuration()
@@ -93,6 +95,15 @@ class Housekeeper:
successful_scopes.add(("member-record-check", member_id))
continue
successful_scopes.add(("member-record-check", member_id))
self._refresh_hash_integrity_tasks(
items,
target_type="member",
target_id=member_id,
warnings=self.repository.member_hash_warnings(member_id),
run_id=run_id,
now=now,
)
successful_scopes.add(("member-hash-check", member_id))
for rule in rules:
scope = (rule.rule_id, member.member_id)
try:
@@ -130,6 +141,17 @@ class Housekeeper:
else:
successful_scopes.add(scope)
for asset_id in sorted(asset_ids):
self._refresh_hash_integrity_tasks(
items,
target_type="asset",
target_id=asset_id,
warnings=self.repository.asset_hash_warnings(asset_id),
run_id=run_id,
now=now,
)
successful_scopes.add(("asset-hash-check", asset_id))
self._resolve_stale_tasks(items, successful_scopes, run_id, now)
working.update(
{
@@ -198,6 +220,58 @@ class Housekeeper:
)
items[key] = item
@staticmethod
def _refresh_hash_integrity_tasks(
items: dict[str, dict[str, Any]],
*,
target_type: str,
target_id: str,
warnings: list[str],
run_id: str,
now: str,
) -> None:
key = f"{target_type}-hash-check:{target_id}:json-hash-mismatch"
if not warnings:
item = items.get(key)
if item and item.get("status") == "open":
item["status"] = "resolved"
item["resolved_run"] = run_id
item["resolved_at"] = now
return
item = items.get(key, {})
was_resolved = item.get("status") == "resolved"
item.update(
{
"key": key,
"rule_id": f"{target_type}-hash-check",
"rule_file": "<hash-check>",
"rule_source": "housekeeper",
"member_id": target_id if target_type == "member" else "",
"asset_id": target_id if target_type == "asset" else "",
"target_type": target_type,
"action": "task",
"status": "open",
"severity": "warning",
"code": "json_hash_mismatch",
"title": (
f"Mitgliederakte {target_id}: JSON extern geändert"
if target_type == "member"
else f"Assetakte {target_id}: JSON extern geändert"
),
"detail": " ; ".join(warnings),
"due_date": None,
"first_seen_run": item.get("first_seen_run", run_id),
"first_seen_at": item.get("first_seen_at", now),
"last_seen_run": run_id,
"last_seen_at": now,
"seen_count": int(item.get("seen_count", 0)) + 1,
"reopened_count": int(item.get("reopened_count", 0)) + (1 if was_resolved else 0),
"resolved_run": None,
"resolved_at": None,
}
)
items[key] = item
def _apply_action(
self,
items: dict[str, dict[str, Any]],
@@ -343,7 +417,8 @@ class Housekeeper:
for item in items.values():
if item.get("action") != "task" or item.get("status") != "open":
continue
scope = (str(item.get("rule_id", "")), str(item.get("member_id", "")))
target_id = str(item.get("member_id", "") or item.get("asset_id", ""))
scope = (str(item.get("rule_id", "")), target_id)
if scope not in successful_scopes or item.get("last_seen_run") == run_id:
continue
item["status"] = "resolved"
@@ -396,10 +471,12 @@ def _open_findings(items: list[dict[str, Any]]) -> list[HousekeeperFinding]:
findings.append(
HousekeeperFinding(
severity=str(item.get("severity", "info")),
member_id=str(item.get("member_id", "")),
code=str(item.get("code", item.get("rule_id", "housekeeper"))),
title=str(item.get("title", item.get("key", "Hausmeister"))),
detail=str(item.get("detail", "")),
member_id=str(item.get("member_id", "")),
asset_id=str(item.get("asset_id", "")),
target_type=str(item.get("target_type", "member")),
due_date=due_date,
key=str(item.get("key", "")),
)
@@ -421,6 +498,16 @@ def _remove_orphaned_member_items(items: dict[str, dict[str, Any]], member_ids:
del items[key]
def _remove_orphaned_asset_items(items: dict[str, dict[str, Any]], asset_ids: set[str]) -> None:
orphaned_keys = [
key
for key, item in items.items()
if item.get("asset_id") and str(item["asset_id"]) not in asset_ids
]
for key in orphaned_keys:
del items[key]
def _non_negative_delay(value: float) -> float:
try:
delay = float(value)