mirror of
https://git.hiabuto.net/C3MA/CCMA.git
synced 2026-07-01 11:14:52 +02:00
feat: initialize CCMA member administration
This commit is contained in:
@@ -0,0 +1,431 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import unicodedata
|
||||
from collections.abc import Iterable
|
||||
from datetime import date, datetime
|
||||
from pathlib import Path
|
||||
from string import Formatter
|
||||
from uuid import uuid4
|
||||
|
||||
from ccma.domain.dates import DateValidationError, normalize_date_input, validate_member_dates
|
||||
from ccma.domain.models import MEMBERSHIP_STATUS_LABELS, ContributionData, Event, Member
|
||||
from ccma.storage.atomic import read_json, write_json_atomic
|
||||
|
||||
|
||||
class RepositoryError(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
DEFAULT_MEMBER_NUMBER_PATTERN = "CCMA-{number:04d}"
|
||||
|
||||
|
||||
DEFAULT_CONFIGURATION = {
|
||||
"schema_version": 1,
|
||||
"organization": "Chaos Computer Club Mannheim e.V.",
|
||||
"member_number_policy": {
|
||||
"mode": "automatic",
|
||||
"pattern": DEFAULT_MEMBER_NUMBER_PATTERN,
|
||||
},
|
||||
"member_number_sequences": {},
|
||||
"contribution_rules": [
|
||||
{
|
||||
"rule_id": "standard-2022",
|
||||
"name": "Regulärer Beitrag ab 2022",
|
||||
"valid_from": "2022-01-01",
|
||||
"annual_amount": "150.00",
|
||||
"admission_fee": "10.00",
|
||||
"annual_due": "01-31",
|
||||
"semiannual_due": ["01-31", "07-31"],
|
||||
"first_payment_due_days_after_acceptance": 28,
|
||||
"reminder_fee": "5.00",
|
||||
"failed_debit_fee": "5.00",
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
class MemberRepository:
|
||||
def __init__(self, root: Path | str):
|
||||
self.root = Path(root).expanduser().resolve()
|
||||
self.members_root = self.root / "members"
|
||||
|
||||
def initialize(self) -> None:
|
||||
self.members_root.mkdir(parents=True, exist_ok=True)
|
||||
config_path = self.root / "repository.json"
|
||||
if not config_path.exists():
|
||||
write_json_atomic(config_path, DEFAULT_CONFIGURATION)
|
||||
|
||||
def validate(self) -> list[str]:
|
||||
errors: list[str] = []
|
||||
try:
|
||||
config = read_json(self.root / "repository.json")
|
||||
if int(config.get("schema_version", 0)) != 1:
|
||||
errors.append("repository.json: nicht unterstützte schema_version")
|
||||
policy = config.get("member_number_policy") or {}
|
||||
if str(policy.get("mode", "automatic")) not in {"automatic", "manual"}:
|
||||
errors.append("repository.json: ungültiger Mitgliedsnummernmodus")
|
||||
validate_member_number_pattern(str(policy.get("pattern", DEFAULT_MEMBER_NUMBER_PATTERN)))
|
||||
except (OSError, ValueError, TypeError, json.JSONDecodeError, RepositoryError) as exc:
|
||||
errors.append(f"repository.json: {exc}")
|
||||
|
||||
seen_numbers: dict[str, str] = {}
|
||||
for member_dir in self._member_directories():
|
||||
try:
|
||||
member = self.get_member(member_dir.name)
|
||||
validate_member_dates(
|
||||
birth_date=member.birth_date,
|
||||
accepted_at=member.accepted_at,
|
||||
membership_started_at=member.membership_started_at,
|
||||
)
|
||||
if member.member_id != member_dir.name:
|
||||
errors.append(f"{member_dir.name}/member.json: member_id stimmt nicht mit Ordner überein")
|
||||
normalized_number = member.member_number.casefold().strip()
|
||||
if normalized_number and normalized_number in seen_numbers:
|
||||
errors.append(
|
||||
f"{member_dir.name}/member.json: Mitgliedsnummer {member.member_number} ist doppelt"
|
||||
)
|
||||
elif normalized_number:
|
||||
seen_numbers[normalized_number] = member.member_id
|
||||
except (
|
||||
OSError,
|
||||
ValueError,
|
||||
TypeError,
|
||||
KeyError,
|
||||
json.JSONDecodeError,
|
||||
DateValidationError,
|
||||
) as exc:
|
||||
errors.append(f"{member_dir.name}/member.json: {exc}")
|
||||
return errors
|
||||
|
||||
def list_members(self) -> list[Member]:
|
||||
members: list[Member] = []
|
||||
for directory in self._member_directories():
|
||||
try:
|
||||
members.append(self.get_member(directory.name))
|
||||
except (OSError, ValueError, TypeError, KeyError, json.JSONDecodeError):
|
||||
continue
|
||||
return sorted(members, key=lambda item: (item.last_name.casefold(), item.first_name.casefold()))
|
||||
|
||||
def get_member(self, member_id: str) -> Member:
|
||||
path = self._member_path(member_id) / "member.json"
|
||||
if not path.is_file():
|
||||
raise RepositoryError(f"Mitglied nicht gefunden: {member_id}")
|
||||
return Member.from_dict(read_json(path))
|
||||
|
||||
def create_member(
|
||||
self,
|
||||
*,
|
||||
first_name: str,
|
||||
last_name: str,
|
||||
email: str = "",
|
||||
birth_date: str = "",
|
||||
member_number: str = "",
|
||||
) -> Member:
|
||||
if not first_name.strip() or not last_name.strip():
|
||||
raise RepositoryError("Vorname und Nachname sind erforderlich.")
|
||||
try:
|
||||
birth_date = normalize_date_input(birth_date, "Geburtsdatum")
|
||||
validate_member_dates(birth_date=birth_date)
|
||||
except DateValidationError as exc:
|
||||
raise RepositoryError(str(exc)) from exc
|
||||
selected_number = member_number.strip()
|
||||
policy = self.get_member_number_policy()
|
||||
if selected_number:
|
||||
self._assert_member_number_available(selected_number)
|
||||
elif policy["mode"] == "manual":
|
||||
raise RepositoryError("Eine Mitgliedsnummer ist erforderlich.")
|
||||
else:
|
||||
selected_number = self._allocate_member_number(policy["pattern"])
|
||||
member_id = str(uuid4())
|
||||
directory = self._member_path(member_id)
|
||||
directory.mkdir(parents=True, exist_ok=False)
|
||||
(directory / "files").mkdir()
|
||||
member = Member(
|
||||
member_id=member_id,
|
||||
member_number=selected_number,
|
||||
first_name=first_name.strip(),
|
||||
last_name=last_name.strip(),
|
||||
email=email.strip(),
|
||||
birth_date=birth_date,
|
||||
)
|
||||
write_json_atomic(directory / "member.json", member.to_dict())
|
||||
write_json_atomic(directory / "contributions.json", ContributionData().to_dict())
|
||||
self.append_event(
|
||||
member_id,
|
||||
event_type="member_created",
|
||||
summary="Mitgliederakte angelegt",
|
||||
actor_type="user",
|
||||
actor_name="Vorstand",
|
||||
)
|
||||
return member
|
||||
|
||||
def save_member(self, member: Member, *, actor_name: str = "Vorstand") -> None:
|
||||
existing = self.get_member(member.member_id)
|
||||
try:
|
||||
member.birth_date = normalize_date_input(member.birth_date, "Geburtsdatum")
|
||||
member.accepted_at = normalize_date_input(member.accepted_at, "Aufnahmebeschluss")
|
||||
member.membership_started_at = normalize_date_input(member.membership_started_at, "Mitglied seit")
|
||||
validate_member_dates(
|
||||
birth_date=member.birth_date,
|
||||
accepted_at=member.accepted_at,
|
||||
membership_started_at=member.membership_started_at,
|
||||
)
|
||||
except DateValidationError as exc:
|
||||
raise RepositoryError(str(exc)) from exc
|
||||
if member.member_number != existing.member_number:
|
||||
self._assert_member_number_available(member.member_number, exclude_member_id=member.member_id)
|
||||
changes = self._summarize_changes(existing, member)
|
||||
member.updated_at = datetime.now().astimezone().isoformat(timespec="seconds")
|
||||
write_json_atomic(self._member_path(member.member_id) / "member.json", member.to_dict())
|
||||
if changes:
|
||||
self.append_event(
|
||||
member.member_id,
|
||||
event_type="member_data_changed",
|
||||
summary=f"Mitgliedsdaten geändert: {', '.join(changes)}",
|
||||
actor_type="user",
|
||||
actor_name=actor_name,
|
||||
)
|
||||
|
||||
def get_contributions(self, member_id: str) -> ContributionData:
|
||||
path = self._member_path(member_id) / "contributions.json"
|
||||
if not path.exists():
|
||||
return ContributionData()
|
||||
return ContributionData.from_dict(read_json(path))
|
||||
|
||||
def save_contributions(self, member_id: str, data: ContributionData) -> None:
|
||||
self.get_member(member_id)
|
||||
write_json_atomic(self._member_path(member_id) / "contributions.json", data.to_dict())
|
||||
|
||||
def append_event(
|
||||
self,
|
||||
member_id: str,
|
||||
*,
|
||||
event_type: str,
|
||||
summary: str,
|
||||
actor_type: str = "system",
|
||||
actor_name: str = "CCMA",
|
||||
references: dict[str, str] | None = None,
|
||||
data: dict[str, object] | None = None,
|
||||
) -> Event:
|
||||
directory = self._member_path(member_id)
|
||||
if not (directory / "member.json").is_file():
|
||||
raise RepositoryError(f"Mitglied nicht gefunden: {member_id}")
|
||||
event = Event(
|
||||
event_id=str(uuid4()),
|
||||
timestamp=datetime.now().astimezone().isoformat(timespec="seconds"),
|
||||
event_type=event_type,
|
||||
summary=summary.strip(),
|
||||
actor_type=actor_type,
|
||||
actor_name=actor_name,
|
||||
references=references or {},
|
||||
data=data or {},
|
||||
)
|
||||
path = directory / "events.jsonl"
|
||||
line = json.dumps(event.to_dict(), ensure_ascii=False, separators=(",", ":")) + "\n"
|
||||
with path.open("a", encoding="utf-8", newline="\n") as handle:
|
||||
handle.write(line)
|
||||
handle.flush()
|
||||
os.fsync(handle.fileno())
|
||||
return event
|
||||
|
||||
def get_events(self, member_id: str) -> list[Event]:
|
||||
path = self._member_path(member_id) / "events.jsonl"
|
||||
if not path.exists():
|
||||
return []
|
||||
events: list[Event] = []
|
||||
with path.open("r", encoding="utf-8") as handle:
|
||||
for line_number, line in enumerate(handle, start=1):
|
||||
if not line.strip():
|
||||
continue
|
||||
try:
|
||||
events.append(Event.from_dict(json.loads(line)))
|
||||
except (ValueError, TypeError, KeyError, json.JSONDecodeError) as exc:
|
||||
raise RepositoryError(f"Ungültiges Event in Zeile {line_number}: {exc}") from exc
|
||||
return events
|
||||
|
||||
def search(self, query: str) -> list[Member]:
|
||||
tokens = [_normalize(token) for token in query.split() if token.strip()]
|
||||
if not tokens:
|
||||
return self.list_members()
|
||||
scored: list[tuple[int, Member]] = []
|
||||
for member in self.list_members():
|
||||
fields = [
|
||||
member.member_number,
|
||||
member.first_name,
|
||||
member.last_name,
|
||||
member.display_name,
|
||||
member.email,
|
||||
member.birth_date,
|
||||
_german_date(member.birth_date),
|
||||
]
|
||||
normalized = [_normalize(value) for value in fields if value]
|
||||
if not all(any(token in value for value in normalized) for token in tokens):
|
||||
continue
|
||||
exact = sum(token == value for token in tokens for value in normalized)
|
||||
prefix = sum(value.startswith(token) for token in tokens for value in normalized)
|
||||
scored.append((exact * 100 + prefix * 10, member))
|
||||
scored.sort(key=lambda item: (-item[0], item[1].last_name.casefold(), item[1].first_name.casefold()))
|
||||
return [member for _, member in scored]
|
||||
|
||||
def member_count(self) -> int:
|
||||
return sum(1 for _ in self._member_directories())
|
||||
|
||||
def get_member_number_policy(self) -> dict[str, str]:
|
||||
try:
|
||||
config = read_json(self.root / "repository.json")
|
||||
except (OSError, ValueError, TypeError, json.JSONDecodeError) as exc:
|
||||
raise RepositoryError(f"Mitgliedsnummernregel konnte nicht gelesen werden: {exc}") from exc
|
||||
policy = config.get("member_number_policy") or {}
|
||||
mode = str(policy.get("mode", "automatic"))
|
||||
if mode not in {"automatic", "manual"}:
|
||||
mode = "automatic"
|
||||
pattern = str(policy.get("pattern", DEFAULT_MEMBER_NUMBER_PATTERN))
|
||||
validate_member_number_pattern(pattern)
|
||||
return {"mode": mode, "pattern": pattern}
|
||||
|
||||
def save_member_number_policy(self, *, mode: str, pattern: str) -> None:
|
||||
if mode not in {"automatic", "manual"}:
|
||||
raise RepositoryError("Ungültiger Mitgliedsnummernmodus.")
|
||||
validate_member_number_pattern(pattern)
|
||||
config = read_json(self.root / "repository.json")
|
||||
config["member_number_policy"] = {"mode": mode, "pattern": pattern.strip()}
|
||||
config.setdefault("member_number_sequences", {})
|
||||
write_json_atomic(self.root / "repository.json", config)
|
||||
|
||||
def preview_member_number(self, pattern: str | None = None) -> str:
|
||||
selected_pattern = pattern or self.get_member_number_policy()["pattern"]
|
||||
validate_member_number_pattern(selected_pattern)
|
||||
config = read_json(self.root / "repository.json")
|
||||
return self._next_available_member_number(config, selected_pattern)[0]
|
||||
|
||||
def _member_directories(self) -> Iterable[Path]:
|
||||
if not self.members_root.exists():
|
||||
return []
|
||||
return (
|
||||
path for path in self.members_root.iterdir() if path.is_dir() and not path.name.startswith(".")
|
||||
)
|
||||
|
||||
def _member_path(self, member_id: str) -> Path:
|
||||
if not member_id or Path(member_id).name != member_id or member_id in {".", ".."}:
|
||||
raise RepositoryError("Ungültige Mitglieds-ID.")
|
||||
return self.members_root / member_id
|
||||
|
||||
def _allocate_member_number(self, pattern: str) -> str:
|
||||
config = read_json(self.root / "repository.json")
|
||||
member_number, next_value = self._next_available_member_number(config, pattern)
|
||||
sequences = config.get("member_number_sequences")
|
||||
if not isinstance(sequences, dict):
|
||||
sequences = {}
|
||||
config["member_number_sequences"] = sequences
|
||||
sequences[pattern] = next_value
|
||||
write_json_atomic(self.root / "repository.json", config)
|
||||
return member_number
|
||||
|
||||
def _next_available_member_number(self, config: dict, pattern: str) -> tuple[str, int]:
|
||||
sequences = config.get("member_number_sequences")
|
||||
if not isinstance(sequences, dict):
|
||||
sequences = {}
|
||||
try:
|
||||
number = max(1, int(sequences.get(pattern, 1)))
|
||||
except (TypeError, ValueError):
|
||||
number = 1
|
||||
existing = {member.member_number.casefold() for member in self.list_members() if member.member_number}
|
||||
for _attempt in range(1_000_000):
|
||||
candidate = format_member_number(pattern, number)
|
||||
number += 1
|
||||
if candidate.casefold() not in existing:
|
||||
return candidate, number
|
||||
raise RepositoryError("Keine freie Mitgliedsnummer im konfigurierten Nummernbereich gefunden.")
|
||||
|
||||
def _assert_member_number_available(
|
||||
self,
|
||||
member_number: str,
|
||||
*,
|
||||
exclude_member_id: str | None = None,
|
||||
) -> None:
|
||||
normalized = member_number.casefold().strip()
|
||||
if not normalized:
|
||||
raise RepositoryError("Eine Mitgliedsnummer ist erforderlich.")
|
||||
for member in self.list_members():
|
||||
if member.member_id != exclude_member_id and member.member_number.casefold() == normalized:
|
||||
raise RepositoryError(f"Die Mitgliedsnummer {member_number} ist bereits vergeben.")
|
||||
|
||||
@staticmethod
|
||||
def _summarize_changes(before: Member, after: Member) -> list[str]:
|
||||
labels = {
|
||||
"member_number": "Mitgliedsnummer",
|
||||
"first_name": "Vorname",
|
||||
"last_name": "Nachname",
|
||||
"email": "E-Mail-Adresse",
|
||||
"birth_date": "Geburtsdatum",
|
||||
"status": "Status",
|
||||
"payment_frequency": "Zahlungsweise",
|
||||
"contribution_rule_id": "Beitragsregel",
|
||||
"honorary": "Ehrenmitgliedschaft",
|
||||
"notes": "interne Notiz",
|
||||
}
|
||||
changes: list[str] = []
|
||||
for field, label in labels.items():
|
||||
old_value = getattr(before, field)
|
||||
new_value = getattr(after, field)
|
||||
if old_value == new_value:
|
||||
continue
|
||||
if field == "status":
|
||||
old_label = MEMBERSHIP_STATUS_LABELS.get(str(old_value), str(old_value))
|
||||
new_label = MEMBERSHIP_STATUS_LABELS.get(str(new_value), str(new_value))
|
||||
changes.append(f"Status von {old_label} zu {new_label}")
|
||||
else:
|
||||
changes.append(label)
|
||||
return changes
|
||||
|
||||
|
||||
def _normalize(value: str) -> str:
|
||||
normalized = unicodedata.normalize("NFKD", value.casefold().strip())
|
||||
return "".join(character for character in normalized if not unicodedata.combining(character))
|
||||
|
||||
|
||||
def _german_date(value: str) -> str:
|
||||
try:
|
||||
return date.fromisoformat(value).strftime("%d.%m.%Y")
|
||||
except ValueError:
|
||||
return ""
|
||||
|
||||
|
||||
def validate_member_number_pattern(pattern: str) -> None:
|
||||
selected = pattern.strip()
|
||||
if not selected:
|
||||
raise RepositoryError("Das Mitgliedsnummern-Pattern darf nicht leer sein.")
|
||||
has_number = False
|
||||
try:
|
||||
for _literal, field_name, format_spec, conversion in Formatter().parse(selected):
|
||||
if field_name is None:
|
||||
continue
|
||||
if field_name not in {"number", "year"}:
|
||||
raise RepositoryError(f"Unbekannter Platzhalter: {{{field_name}}}")
|
||||
if conversion:
|
||||
raise RepositoryError("Konvertierungen wie !r sind im Pattern nicht erlaubt.")
|
||||
if "{" in format_spec or "}" in format_spec:
|
||||
raise RepositoryError("Verschachtelte Formatierungen sind nicht erlaubt.")
|
||||
has_number = has_number or field_name == "number"
|
||||
if not has_number:
|
||||
raise RepositoryError("Das Pattern muss den Platzhalter {number} enthalten.")
|
||||
first = format_member_number(selected, 1)
|
||||
second = format_member_number(selected, 2)
|
||||
if first == second:
|
||||
raise RepositoryError("Das Pattern erzeugt keine eindeutigen Mitgliedsnummern.")
|
||||
except RepositoryError:
|
||||
raise
|
||||
except (KeyError, ValueError) as exc:
|
||||
raise RepositoryError(f"Ungültiges Mitgliedsnummern-Pattern: {exc}") from exc
|
||||
|
||||
|
||||
def format_member_number(pattern: str, number: int, *, year: int | None = None) -> str:
|
||||
try:
|
||||
value = pattern.strip().format(number=number, year=year or date.today().year)
|
||||
except (KeyError, ValueError) as exc:
|
||||
raise RepositoryError(f"Mitgliedsnummer konnte nicht formatiert werden: {exc}") from exc
|
||||
if not value or len(value) > 80 or any(character in value for character in "\r\n\t"):
|
||||
raise RepositoryError("Das Pattern erzeugt eine ungültige Mitgliedsnummer.")
|
||||
return value
|
||||
Reference in New Issue
Block a user