Files
CCMA/src/ccma/storage/repository.py
T
2026-06-21 17:50:56 +02:00

456 lines
19 KiB
Python

from __future__ import annotations
import json
import os
import unicodedata
from collections.abc import Iterable
from datetime import date, datetime
from pathlib import Path
from string import Formatter
from uuid import uuid4
from ccma.domain.dates import DateValidationError, normalize_date_input, validate_member_dates
from ccma.domain.models import MEMBERSHIP_STATUS_LABELS, ContributionData, Event, Member
from ccma.storage.atomic import read_json, write_json_atomic
class RepositoryError(RuntimeError):
pass
DEFAULT_MEMBER_NUMBER_PATTERN = "CCMA-{number:04d}"
DEFAULT_CONFIGURATION = {
"schema_version": 1,
"organization": "Chaos Computer Club Mannheim e.V.",
"member_number_policy": {
"mode": "automatic",
"pattern": DEFAULT_MEMBER_NUMBER_PATTERN,
},
"member_number_sequences": {},
"contribution_rules": [
{
"rule_id": "standard-2022",
"name": "Regulärer Beitrag ab 2022",
"valid_from": "2022-01-01",
"valid_until": None,
"annual_amount": "150.00",
"admission_fee": "10.00",
"annual_due": "01-31",
"semiannual_due": ["01-31", "07-31"],
"entry_proration": {"mode": "monthly", "started_month": "included"},
"first_payment_due_days_after_acceptance": 28,
"issue_days_before_due": 30,
"reminder_fee": "5.00",
"failed_debit_fee": "5.00",
}
],
}
class MemberRepository:
def __init__(self, root: Path | str):
self.root = Path(root).expanduser().resolve()
self.members_root = self.root / "members"
def initialize(self) -> None:
self.members_root.mkdir(parents=True, exist_ok=True)
(self.root / "rules").mkdir(parents=True, exist_ok=True)
config_path = self.root / "repository.json"
if not config_path.exists():
write_json_atomic(config_path, DEFAULT_CONFIGURATION)
def validate(self) -> list[str]:
errors: list[str] = []
try:
config = read_json(self.root / "repository.json")
if int(config.get("schema_version", 0)) != 1:
errors.append("repository.json: nicht unterstützte schema_version")
policy = config.get("member_number_policy") or {}
if str(policy.get("mode", "automatic")) not in {"automatic", "manual"}:
errors.append("repository.json: ungültiger Mitgliedsnummernmodus")
validate_member_number_pattern(str(policy.get("pattern", DEFAULT_MEMBER_NUMBER_PATTERN)))
except (OSError, ValueError, TypeError, json.JSONDecodeError, RepositoryError) as exc:
errors.append(f"repository.json: {exc}")
seen_numbers: dict[str, str] = {}
for member_dir in self._member_directories():
try:
member = self.get_member(member_dir.name)
validate_member_dates(
birth_date=member.birth_date,
accepted_at=member.accepted_at,
membership_started_at=member.membership_started_at,
)
self.get_contributions(member.member_id)
if member.member_id != member_dir.name:
errors.append(f"{member_dir.name}/member.json: member_id stimmt nicht mit Ordner überein")
normalized_number = member.member_number.casefold().strip()
if normalized_number and normalized_number in seen_numbers:
errors.append(
f"{member_dir.name}/member.json: Mitgliedsnummer {member.member_number} ist doppelt"
)
elif normalized_number:
seen_numbers[normalized_number] = member.member_id
except RepositoryError as exc:
errors.append(str(exc))
except (
OSError,
ValueError,
TypeError,
KeyError,
json.JSONDecodeError,
DateValidationError,
) as exc:
errors.append(f"{member_dir.name}/member.json: {exc}")
return errors
def list_members(self) -> list[Member]:
members: list[Member] = []
for directory in self._member_directories():
try:
members.append(self.get_member(directory.name))
except (OSError, ValueError, TypeError, KeyError, json.JSONDecodeError):
continue
return sorted(members, key=lambda item: (item.last_name.casefold(), item.first_name.casefold()))
def get_member(self, member_id: str) -> Member:
path = self._member_path(member_id) / "member.json"
if not path.is_file():
raise RepositoryError(f"Mitglied nicht gefunden: {member_id}")
return Member.from_dict(read_json(path))
def create_member(
self,
*,
first_name: str,
last_name: str,
email: str = "",
birth_date: str = "",
member_number: str = "",
) -> Member:
if not first_name.strip() or not last_name.strip():
raise RepositoryError("Vorname und Nachname sind erforderlich.")
try:
birth_date = normalize_date_input(birth_date, "Geburtsdatum")
validate_member_dates(birth_date=birth_date)
except DateValidationError as exc:
raise RepositoryError(str(exc)) from exc
selected_number = member_number.strip()
policy = self.get_member_number_policy()
if selected_number:
self._assert_member_number_available(selected_number)
elif policy["mode"] == "manual":
raise RepositoryError("Eine Mitgliedsnummer ist erforderlich.")
else:
selected_number = self._allocate_member_number(policy["pattern"])
member_id = str(uuid4())
directory = self._member_path(member_id)
directory.mkdir(parents=True, exist_ok=False)
(directory / "files").mkdir()
member = Member(
member_id=member_id,
member_number=selected_number,
first_name=first_name.strip(),
last_name=last_name.strip(),
email=email.strip(),
birth_date=birth_date,
)
write_json_atomic(directory / "member.json", member.to_dict())
write_json_atomic(directory / "contributions.json", ContributionData().to_dict())
self.append_event(
member_id,
event_type="member_created",
summary="Mitgliederakte angelegt",
actor_type="user",
actor_name="Vorstand",
)
return member
def save_member(self, member: Member, *, actor_name: str = "Vorstand") -> None:
existing = self.get_member(member.member_id)
try:
member.birth_date = normalize_date_input(member.birth_date, "Geburtsdatum")
member.accepted_at = normalize_date_input(member.accepted_at, "Aufnahmebeschluss")
member.membership_started_at = normalize_date_input(member.membership_started_at, "Mitglied seit")
validate_member_dates(
birth_date=member.birth_date,
accepted_at=member.accepted_at,
membership_started_at=member.membership_started_at,
)
except DateValidationError as exc:
raise RepositoryError(str(exc)) from exc
if member.member_number != existing.member_number:
self._assert_member_number_available(member.member_number, exclude_member_id=member.member_id)
changes = self._summarize_changes(existing, member)
member.updated_at = datetime.now().astimezone().isoformat(timespec="seconds")
write_json_atomic(self._member_path(member.member_id) / "member.json", member.to_dict())
if changes:
self.append_event(
member.member_id,
event_type="member_data_changed",
summary=f"Mitgliedsdaten geändert: {', '.join(changes)}",
actor_type="user",
actor_name=actor_name,
)
def get_contributions(self, member_id: str) -> ContributionData:
path = self._member_path(member_id) / "contributions.json"
if not path.exists():
return ContributionData()
try:
raw = read_json(path)
if not isinstance(raw, dict):
raise TypeError("Wurzelelement muss ein JSON-Objekt sein")
return ContributionData.from_dict(raw)
except (OSError, ValueError, TypeError, json.JSONDecodeError) as exc:
raise RepositoryError(
f"{member_id}/contributions.json konnte nicht gelesen werden: {exc}"
) from exc
def save_contributions(self, member_id: str, data: ContributionData) -> None:
self.get_member(member_id)
write_json_atomic(self._member_path(member_id) / "contributions.json", data.to_dict())
def append_event(
self,
member_id: str,
*,
event_type: str,
summary: str,
actor_type: str = "system",
actor_name: str = "CCMA",
references: dict[str, str] | None = None,
data: dict[str, object] | None = None,
) -> Event:
directory = self._member_path(member_id)
if not (directory / "member.json").is_file():
raise RepositoryError(f"Mitglied nicht gefunden: {member_id}")
event = Event(
event_id=str(uuid4()),
timestamp=datetime.now().astimezone().isoformat(timespec="seconds"),
event_type=event_type,
summary=summary.strip(),
actor_type=actor_type,
actor_name=actor_name,
references=references or {},
data=data or {},
)
path = directory / "events.jsonl"
line = json.dumps(event.to_dict(), ensure_ascii=False, separators=(",", ":")) + "\n"
with path.open("a", encoding="utf-8", newline="\n") as handle:
handle.write(line)
handle.flush()
os.fsync(handle.fileno())
return event
def get_events(self, member_id: str) -> list[Event]:
path = self._member_path(member_id) / "events.jsonl"
if not path.exists():
return []
events: list[Event] = []
with path.open("r", encoding="utf-8") as handle:
for line_number, line in enumerate(handle, start=1):
if not line.strip():
continue
try:
events.append(Event.from_dict(json.loads(line)))
except (ValueError, TypeError, KeyError, json.JSONDecodeError) as exc:
raise RepositoryError(f"Ungültiges Event in Zeile {line_number}: {exc}") from exc
return events
def search(self, query: str) -> list[Member]:
tokens = [_normalize(token) for token in query.split() if token.strip()]
if not tokens:
return self.list_members()
scored: list[tuple[int, Member]] = []
for member in self.list_members():
fields = [
member.member_number,
member.first_name,
member.last_name,
member.display_name,
member.email,
member.birth_date,
_german_date(member.birth_date),
]
normalized = [_normalize(value) for value in fields if value]
if not all(any(token in value for value in normalized) for token in tokens):
continue
exact = sum(token == value for token in tokens for value in normalized)
prefix = sum(value.startswith(token) for token in tokens for value in normalized)
scored.append((exact * 100 + prefix * 10, member))
scored.sort(key=lambda item: (-item[0], item[1].last_name.casefold(), item[1].first_name.casefold()))
return [member for _, member in scored]
def member_count(self) -> int:
return sum(1 for _ in self._member_directories())
def get_configuration(self) -> dict:
try:
configuration = read_json(self.root / "repository.json")
except (OSError, ValueError, TypeError, json.JSONDecodeError) as exc:
raise RepositoryError(f"repository.json konnte nicht gelesen werden: {exc}") from exc
if not isinstance(configuration, dict):
raise RepositoryError("repository.json enthält keine gültige Konfiguration.")
return configuration
def get_member_number_policy(self) -> dict[str, str]:
try:
config = read_json(self.root / "repository.json")
except (OSError, ValueError, TypeError, json.JSONDecodeError) as exc:
raise RepositoryError(f"Mitgliedsnummernregel konnte nicht gelesen werden: {exc}") from exc
policy = config.get("member_number_policy") or {}
mode = str(policy.get("mode", "automatic"))
if mode not in {"automatic", "manual"}:
mode = "automatic"
pattern = str(policy.get("pattern", DEFAULT_MEMBER_NUMBER_PATTERN))
validate_member_number_pattern(pattern)
return {"mode": mode, "pattern": pattern}
def save_member_number_policy(self, *, mode: str, pattern: str) -> None:
if mode not in {"automatic", "manual"}:
raise RepositoryError("Ungültiger Mitgliedsnummernmodus.")
validate_member_number_pattern(pattern)
config = read_json(self.root / "repository.json")
config["member_number_policy"] = {"mode": mode, "pattern": pattern.strip()}
config.setdefault("member_number_sequences", {})
write_json_atomic(self.root / "repository.json", config)
def preview_member_number(self, pattern: str | None = None) -> str:
selected_pattern = pattern or self.get_member_number_policy()["pattern"]
validate_member_number_pattern(selected_pattern)
config = read_json(self.root / "repository.json")
return self._next_available_member_number(config, selected_pattern)[0]
def _member_directories(self) -> Iterable[Path]:
if not self.members_root.exists():
return []
return (
path for path in self.members_root.iterdir() if path.is_dir() and not path.name.startswith(".")
)
def _member_path(self, member_id: str) -> Path:
if not member_id or Path(member_id).name != member_id or member_id in {".", ".."}:
raise RepositoryError("Ungültige Mitglieds-ID.")
return self.members_root / member_id
def _allocate_member_number(self, pattern: str) -> str:
config = read_json(self.root / "repository.json")
member_number, next_value = self._next_available_member_number(config, pattern)
sequences = config.get("member_number_sequences")
if not isinstance(sequences, dict):
sequences = {}
config["member_number_sequences"] = sequences
sequences[pattern] = next_value
write_json_atomic(self.root / "repository.json", config)
return member_number
def _next_available_member_number(self, config: dict, pattern: str) -> tuple[str, int]:
sequences = config.get("member_number_sequences")
if not isinstance(sequences, dict):
sequences = {}
try:
number = max(1, int(sequences.get(pattern, 1)))
except (TypeError, ValueError):
number = 1
existing = {member.member_number.casefold() for member in self.list_members() if member.member_number}
for _attempt in range(1_000_000):
candidate = format_member_number(pattern, number)
number += 1
if candidate.casefold() not in existing:
return candidate, number
raise RepositoryError("Keine freie Mitgliedsnummer im konfigurierten Nummernbereich gefunden.")
def _assert_member_number_available(
self,
member_number: str,
*,
exclude_member_id: str | None = None,
) -> None:
normalized = member_number.casefold().strip()
if not normalized:
raise RepositoryError("Eine Mitgliedsnummer ist erforderlich.")
for member in self.list_members():
if member.member_id != exclude_member_id and member.member_number.casefold() == normalized:
raise RepositoryError(f"Die Mitgliedsnummer {member_number} ist bereits vergeben.")
@staticmethod
def _summarize_changes(before: Member, after: Member) -> list[str]:
labels = {
"member_number": "Mitgliedsnummer",
"first_name": "Vorname",
"last_name": "Nachname",
"email": "E-Mail-Adresse",
"birth_date": "Geburtsdatum",
"status": "Status",
"payment_frequency": "Zahlungsweise",
"contribution_rule_id": "Beitragsregel",
"honorary": "Ehrenmitgliedschaft",
"notes": "interne Notiz",
}
changes: list[str] = []
for field, label in labels.items():
old_value = getattr(before, field)
new_value = getattr(after, field)
if old_value == new_value:
continue
if field == "status":
old_label = MEMBERSHIP_STATUS_LABELS.get(str(old_value), str(old_value))
new_label = MEMBERSHIP_STATUS_LABELS.get(str(new_value), str(new_value))
changes.append(f"Status von {old_label} zu {new_label}")
else:
changes.append(label)
return changes
def _normalize(value: str) -> str:
normalized = unicodedata.normalize("NFKD", value.casefold().strip())
return "".join(character for character in normalized if not unicodedata.combining(character))
def _german_date(value: str) -> str:
try:
return date.fromisoformat(value).strftime("%d.%m.%Y")
except ValueError:
return ""
def validate_member_number_pattern(pattern: str) -> None:
selected = pattern.strip()
if not selected:
raise RepositoryError("Das Mitgliedsnummern-Pattern darf nicht leer sein.")
has_number = False
try:
for _literal, field_name, format_spec, conversion in Formatter().parse(selected):
if field_name is None:
continue
if field_name not in {"number", "year"}:
raise RepositoryError(f"Unbekannter Platzhalter: {{{field_name}}}")
if conversion:
raise RepositoryError("Konvertierungen wie !r sind im Pattern nicht erlaubt.")
if "{" in format_spec or "}" in format_spec:
raise RepositoryError("Verschachtelte Formatierungen sind nicht erlaubt.")
has_number = has_number or field_name == "number"
if not has_number:
raise RepositoryError("Das Pattern muss den Platzhalter {number} enthalten.")
first = format_member_number(selected, 1)
second = format_member_number(selected, 2)
if first == second:
raise RepositoryError("Das Pattern erzeugt keine eindeutigen Mitgliedsnummern.")
except RepositoryError:
raise
except (KeyError, ValueError) as exc:
raise RepositoryError(f"Ungültiges Mitgliedsnummern-Pattern: {exc}") from exc
def format_member_number(pattern: str, number: int, *, year: int | None = None) -> str:
try:
value = pattern.strip().format(number=number, year=year or date.today().year)
except (KeyError, ValueError) as exc:
raise RepositoryError(f"Mitgliedsnummer konnte nicht formatiert werden: {exc}") from exc
if not value or len(value) > 80 or any(character in value for character in "\r\n\t"):
raise RepositoryError("Das Pattern erzeugt eine ungültige Mitgliedsnummer.")
return value