Files
CCMA/src/ccma/storage/repository.py
T
2026-06-21 22:33:24 +02:00

1002 lines
42 KiB
Python

from __future__ import annotations
import json
import os
import shutil
import unicodedata
from collections.abc import Iterable
from datetime import date, datetime, timedelta
from pathlib import Path
from string import Formatter
from uuid import uuid4
from ccma.domain.contributions import (
claim_balance,
claim_total,
decimal_value,
materialize_claim_items,
money_text,
payment_allocated_total,
)
from ccma.domain.dates import DateValidationError, normalize_date_input, validate_member_dates
from ccma.domain.models import MEMBERSHIP_STATUS_LABELS, ContributionData, Event, Member
from ccma.storage.atomic import read_json, write_json_atomic
class RepositoryError(RuntimeError):
pass
DEFAULT_MEMBER_NUMBER_PATTERN = "CCMA-{number:04d}"
DEFAULT_CONFIGURATION = {
"schema_version": 1,
"organization": {
"name": "Chaos Computer Club Mannheim e.V.",
"street": "",
"postal_code": "",
"city": "Mannheim",
"country": "Deutschland",
"email": "",
"phone": "",
"website": "",
"iban": "",
"bic": "",
"creditor_id": "",
},
"member_number_policy": {
"mode": "automatic",
"pattern": DEFAULT_MEMBER_NUMBER_PATTERN,
},
"member_number_sequences": {},
"reminder_policy": {
"grace_days_after_due": 7,
"levels": [
{
"level": 1,
"name": "Zahlungserinnerung",
"fee": "0.00",
"payment_deadline_days": 14,
},
{
"level": 2,
"name": "Erste Mahnung",
"fee": "5.00",
"payment_deadline_days": 14,
},
{
"level": 3,
"name": "Zweite Mahnung",
"fee": "5.00",
"payment_deadline_days": 14,
},
],
},
"contribution_rules": [
{
"rule_id": "standard-2022",
"name": "Regulärer Beitrag ab 2022",
"valid_from": "2022-01-01",
"valid_until": None,
"annual_amount": "150.00",
"admission_fee": "10.00",
"annual_due": "01-31",
"semiannual_due": ["01-31", "07-31"],
"entry_proration": {"mode": "monthly", "started_month": "included"},
"first_payment_due_days_after_acceptance": 28,
"issue_days_before_due": 30,
"reminder_fee": "5.00",
"failed_debit_fee": "5.00",
}
],
}
class MemberRepository:
def __init__(self, root: Path | str):
self.root = Path(root).expanduser().resolve()
self.members_root = self.root / "members"
def initialize(self) -> None:
self.members_root.mkdir(parents=True, exist_ok=True)
(self.root / "rules").mkdir(parents=True, exist_ok=True)
templates_root = self.root / "templates"
templates_root.mkdir(parents=True, exist_ok=True)
builtin_templates = Path(__file__).resolve().parent.parent / "assets" / "templates"
if builtin_templates.is_dir():
for source in builtin_templates.iterdir():
destination_name = (
"Forderung mit Positionen.fodt"
if source.name == "Forderung.fodt"
else source.name
)
destination = templates_root / destination_name
if source.is_file() and not destination.exists():
shutil.copyfile(source, destination)
config_path = self.root / "repository.json"
if not config_path.exists():
write_json_atomic(config_path, DEFAULT_CONFIGURATION)
def validate(self) -> list[str]:
errors: list[str] = []
try:
config = read_json(self.root / "repository.json")
if int(config.get("schema_version", 0)) != 1:
errors.append("repository.json: nicht unterstützte schema_version")
policy = config.get("member_number_policy") or {}
if str(policy.get("mode", "automatic")) not in {"automatic", "manual"}:
errors.append("repository.json: ungültiger Mitgliedsnummernmodus")
validate_member_number_pattern(str(policy.get("pattern", DEFAULT_MEMBER_NUMBER_PATTERN)))
except (OSError, ValueError, TypeError, json.JSONDecodeError, RepositoryError) as exc:
errors.append(f"repository.json: {exc}")
seen_numbers: dict[str, str] = {}
for member_dir in self._member_directories():
try:
member, _contributions = self.preflight_member_record(member_dir.name)
validate_member_dates(
birth_date=member.birth_date,
accepted_at=member.accepted_at,
membership_started_at=member.membership_started_at,
)
if member.member_id != member_dir.name:
errors.append(f"{member_dir.name}/member.json: member_id stimmt nicht mit Ordner überein")
normalized_number = member.member_number.casefold().strip()
if normalized_number and normalized_number in seen_numbers:
errors.append(
f"{member_dir.name}/member.json: Mitgliedsnummer {member.member_number} ist doppelt"
)
elif normalized_number:
seen_numbers[normalized_number] = member.member_id
except RepositoryError as exc:
errors.append(str(exc))
except (
OSError,
ValueError,
TypeError,
KeyError,
json.JSONDecodeError,
DateValidationError,
) as exc:
errors.append(f"{member_dir.name}/member.json: {exc}")
return errors
def list_members(self) -> list[Member]:
members: list[Member] = []
for member_id in self.list_member_ids():
try:
members.append(self.get_member(member_id))
except RepositoryError:
continue
return sorted(members, key=lambda item: (item.last_name.casefold(), item.first_name.casefold()))
def list_member_ids(self) -> list[str]:
return sorted(directory.name for directory in self._member_directories())
def get_member(self, member_id: str) -> Member:
path = self._member_path(member_id) / "member.json"
if not path.is_file():
raise RepositoryError(f"Mitglied nicht gefunden: {member_id}")
try:
raw = read_json(path)
if not isinstance(raw, dict):
raise TypeError("Wurzelelement muss ein JSON-Objekt sein")
for section_name in ("person", "address", "banking", "membership", "contribution_profile"):
if section_name in raw and not isinstance(raw[section_name], dict):
raise TypeError(f"{section_name} muss ein JSON-Objekt sein")
return Member.from_dict(raw)
except (OSError, ValueError, TypeError, KeyError, json.JSONDecodeError) as exc:
raise RepositoryError(f"{member_id}/member.json konnte nicht gelesen werden: {exc}") from exc
def preflight_member_record(self, member_id: str) -> tuple[Member, ContributionData]:
member = self.get_member(member_id)
if member.member_id != member_id:
raise RepositoryError(f"{member_id}/member.json: member_id stimmt nicht mit Ordner überein")
if member.schema_version != 1:
raise RepositoryError(
f"{member_id}/member.json: nicht unterstützte schema_version {member.schema_version}"
)
contributions = self.get_contributions(member_id)
try:
self.get_events(member_id)
except (OSError, UnicodeError) as exc:
raise RepositoryError(f"{member_id}/events.jsonl konnte nicht gelesen werden: {exc}") from exc
return member, contributions
def create_member(
self,
*,
first_name: str,
last_name: str,
email: str = "",
phone: str = "",
birth_date: str = "",
member_number: str = "",
) -> Member:
if not first_name.strip() or not last_name.strip():
raise RepositoryError("Vorname und Nachname sind erforderlich.")
try:
birth_date = normalize_date_input(birth_date, "Geburtsdatum")
validate_member_dates(birth_date=birth_date)
except DateValidationError as exc:
raise RepositoryError(str(exc)) from exc
selected_number = member_number.strip()
policy = self.get_member_number_policy()
if selected_number:
self._assert_member_number_available(selected_number)
elif policy["mode"] == "manual":
raise RepositoryError("Eine Mitgliedsnummer ist erforderlich.")
else:
selected_number = self._allocate_member_number(policy["pattern"])
member_id = str(uuid4())
directory = self._member_path(member_id)
directory.mkdir(parents=True, exist_ok=False)
(directory / "files").mkdir()
member = Member(
member_id=member_id,
member_number=selected_number,
first_name=first_name.strip(),
last_name=last_name.strip(),
email=email.strip(),
phone=phone.strip(),
birth_date=birth_date,
)
write_json_atomic(directory / "member.json", member.to_dict())
write_json_atomic(directory / "contributions.json", ContributionData().to_dict())
self.append_event(
member_id,
event_type="member_created",
summary="Mitgliederakte angelegt",
actor_type="user",
actor_name="Vorstand",
)
return member
def save_member(self, member: Member, *, actor_name: str = "Vorstand") -> None:
existing = self.get_member(member.member_id)
try:
member.birth_date = normalize_date_input(member.birth_date, "Geburtsdatum")
member.accepted_at = normalize_date_input(member.accepted_at, "Aufnahmebeschluss")
member.membership_started_at = normalize_date_input(member.membership_started_at, "Mitglied seit")
member.mandate_signed_at = normalize_date_input(member.mandate_signed_at, "Mandat erteilt am")
member.mandate_revoked_at = normalize_date_input(
member.mandate_revoked_at, "Mandat widerrufen am"
)
validate_member_dates(
birth_date=member.birth_date,
accepted_at=member.accepted_at,
membership_started_at=member.membership_started_at,
)
except DateValidationError as exc:
raise RepositoryError(str(exc)) from exc
member.iban = normalize_iban(member.iban)
member.bic = member.bic.replace(" ", "").upper()
validate_iban(member.iban)
validate_bic(member.bic)
if member.mandate_active and not (
member.iban and member.mandate_reference.strip() and member.mandate_signed_at
):
raise RepositoryError(
"Ein aktives Lastschriftmandat benötigt IBAN, Mandatsreferenz und Erteilungsdatum."
)
if member.member_number != existing.member_number:
self._assert_member_number_available(member.member_number, exclude_member_id=member.member_id)
changes = self._summarize_changes(existing, member)
member.updated_at = datetime.now().astimezone().isoformat(timespec="seconds")
write_json_atomic(self._member_path(member.member_id) / "member.json", member.to_dict())
if changes:
self.append_event(
member.member_id,
event_type="member_data_changed",
summary=f"Mitgliedsdaten geändert: {', '.join(changes)}",
actor_type="user",
actor_name=actor_name,
)
def get_contributions(self, member_id: str) -> ContributionData:
path = self._member_path(member_id) / "contributions.json"
if not path.exists():
return ContributionData()
try:
raw = read_json(path)
if not isinstance(raw, dict):
raise TypeError("Wurzelelement muss ein JSON-Objekt sein")
for field_name in ("claims", "payments", "allocations", "reminders"):
if field_name in raw and not isinstance(raw[field_name], list):
raise TypeError(f"{field_name} muss eine JSON-Liste sein")
if field_name in raw and any(not isinstance(item, dict) for item in raw[field_name]):
raise TypeError(f"Alle Einträge in {field_name} müssen JSON-Objekte sein")
return ContributionData.from_dict(raw)
except (OSError, ValueError, TypeError, json.JSONDecodeError) as exc:
raise RepositoryError(
f"{member_id}/contributions.json konnte nicht gelesen werden: {exc}"
) from exc
def save_contributions(self, member_id: str, data: ContributionData) -> None:
self.get_member(member_id)
write_json_atomic(self._member_path(member_id) / "contributions.json", data.to_dict())
def get_claim(self, member_id: str, claim_id: str) -> tuple[ContributionData, dict]:
data = self.get_contributions(member_id)
claim = next(
(item for item in data.claims if str(item.get("claim_id", "")) == claim_id),
None,
)
if claim is None:
raise RepositoryError(f"Forderung nicht gefunden: {claim_id}")
return data, claim
def add_claim_item(
self,
member_id: str,
claim_id: str,
*,
description: str,
quantity: str,
unit_price: str,
item_type: str = "correction",
) -> dict:
if not description.strip():
raise RepositoryError("Eine Beschreibung ist erforderlich.")
try:
selected_quantity = decimal_value(quantity, "Menge")
selected_unit_price = decimal_value(unit_price, "Einzelpreis")
except ValueError as exc:
raise RepositoryError(str(exc)) from exc
if selected_quantity == 0:
raise RepositoryError("Die Menge darf nicht null sein.")
data, claim = self.get_claim(member_id, claim_id)
if str(claim.get("status", "")) == "cancelled":
raise RepositoryError("Eine stornierte Forderung kann nicht verändert werden.")
amount = selected_quantity * selected_unit_price
item = {
"item_id": str(uuid4()),
"type": item_type.strip() or "correction",
"description": description.strip(),
"quantity": money_text(selected_quantity),
"unit_price": money_text(selected_unit_price),
"amount": money_text(amount),
"created_at": datetime.now().astimezone().isoformat(timespec="seconds"),
}
materialize_claim_items(claim).append(item)
claim["amount"] = money_text(claim_total(claim))
self.save_contributions(member_id, data)
self.append_event(
member_id,
event_type="claim_item_added",
summary=f"Forderungsposition ergänzt: {item['description']} ({item['amount']} EUR)",
references={"claim_id": claim_id, "item_id": str(item["item_id"])},
)
return item
def record_payment(
self,
member_id: str,
claim_id: str,
*,
payment_date: str,
amount: str,
allocation_amount: str,
gnucash_transaction_id: str = "",
reference: str = "",
) -> dict:
try:
normalized_date = normalize_date_input(payment_date, "Zahlungsdatum")
selected_amount = decimal_value(amount)
selected_allocation = decimal_value(allocation_amount, "Zuordnung")
except (DateValidationError, ValueError) as exc:
raise RepositoryError(str(exc)) from exc
if not normalized_date:
raise RepositoryError("Zahlungsdatum ist erforderlich.")
if selected_amount <= 0:
raise RepositoryError("Der Zahlungsbetrag muss größer als null sein.")
if selected_allocation <= 0 or selected_allocation > selected_amount:
raise RepositoryError(
"Die Zuordnung muss größer als null und höchstens so hoch wie die Zahlung sein."
)
gnucash_id = gnucash_transaction_id.strip()
if gnucash_id:
self._assert_gnucash_id_available(gnucash_id)
data, claim = self.get_claim(member_id, claim_id)
payment = {
"payment_id": str(uuid4()),
"date": normalized_date,
"amount": money_text(selected_amount),
"method": "bank_transfer",
"gnucash_transaction_id": gnucash_id,
"reference": reference.strip(),
"created_at": datetime.now().astimezone().isoformat(timespec="seconds"),
}
allocation = {
"allocation_id": str(uuid4()),
"payment_id": payment["payment_id"],
"claim_id": claim_id,
"amount": money_text(selected_allocation),
}
data.payments.append(payment)
data.allocations.append(allocation)
self.save_contributions(member_id, data)
self.append_event(
member_id,
event_type="payment_recorded",
summary=f"Zahlung eingegangen: {payment['amount']} EUR",
references={"claim_id": claim_id, "payment_id": str(payment["payment_id"])},
data={"allocation_amount": allocation["amount"]},
)
return payment
def allocate_payment(self, member_id: str, claim_id: str, *, payment_id: str, amount: str) -> dict:
data, _claim = self.get_claim(member_id, claim_id)
payment = next(
(item for item in data.payments if str(item.get("payment_id", "")) == payment_id),
None,
)
if payment is None:
raise RepositoryError("Zahlung nicht gefunden.")
try:
selected_amount = decimal_value(amount, "Zuordnung")
available = decimal_value(payment.get("amount", "0")) - payment_allocated_total(data, payment_id)
except ValueError as exc:
raise RepositoryError(str(exc)) from exc
if selected_amount <= 0 or selected_amount > available:
raise RepositoryError(f"Es sind nur {money_text(available)} EUR dieser Zahlung verfügbar.")
allocation = {
"allocation_id": str(uuid4()),
"payment_id": payment_id,
"claim_id": claim_id,
"amount": money_text(selected_amount),
}
data.allocations.append(allocation)
self.save_contributions(member_id, data)
self.append_event(
member_id,
event_type="payment_allocated",
summary=f"Zahlung zugeordnet: {allocation['amount']} EUR",
references={"claim_id": claim_id, "payment_id": payment_id},
)
return allocation
def create_reminder_draft(
self,
member_id: str,
claim_id: str,
*,
level: int,
name: str,
payment_deadline_days: int,
detail: str = "",
fee: str = "0",
channel: str = "email",
) -> dict:
if level < 1:
raise RepositoryError("Die Mahnstufe muss mindestens 1 sein.")
try:
selected_fee = decimal_value(fee, "Mahngebühr")
except ValueError as exc:
raise RepositoryError(str(exc)) from exc
if selected_fee < 0:
raise RepositoryError("Die Mahngebühr darf nicht negativ sein.")
if payment_deadline_days < 1 or payment_deadline_days > 365:
raise RepositoryError("Die Zahlungsfrist muss zwischen 1 und 365 Tagen liegen.")
data, claim = self.get_claim(member_id, claim_id)
if str(claim.get("status", "")) == "cancelled":
raise RepositoryError("Eine stornierte Forderung kann nicht gemahnt werden.")
if _dunning_hold_is_active(claim):
raise RepositoryError("Für diese Forderung ist eine Mahnsperre aktiv.")
if claim_balance(data, claim) <= 0:
raise RepositoryError("Die Forderung hat keinen offenen Betrag.")
if any(
str(item.get("claim_id", "")) == claim_id
and int(item.get("level", 0)) == level
and str(item.get("status", "draft")) in {"draft", "generated", "sent"}
for item in data.reminders
):
raise RepositoryError(f"Mahnstufe {level} existiert bereits.")
if level > 1 and not any(
str(item.get("claim_id", "")) == claim_id
and int(item.get("level", 0)) == level - 1
and str(item.get("status", "")) == "sent"
for item in data.reminders
):
raise RepositoryError(f"Mahnstufe {level - 1} wurde noch nicht versandt.")
created_at = datetime.now().astimezone().isoformat(timespec="seconds")
reminder = {
"reminder_id": str(uuid4()),
"reminder_key": f"{claim_id}:level-{level}",
"claim_id": claim_id,
"level": level,
"name": name.strip() or f"Mahnung Stufe {level}",
"status": "draft",
"detail": detail.strip(),
"channel": channel.strip() or "email",
"created_at": created_at,
"generated_at": None,
"sent_at": None,
"payment_deadline_days": payment_deadline_days,
"payment_deadline": None,
"balance_snapshot": money_text(claim_balance(data, claim)),
"fee": money_text(selected_fee),
"fee_item_id": None,
"document": {"path": "", "sha256": ""},
}
data.reminders.append(reminder)
self.save_contributions(member_id, data)
self.append_event(
member_id,
event_type="reminder_draft_created",
summary=f"Mahnungsentwurf erstellt: {reminder['name']}",
references={"claim_id": claim_id, "reminder_id": str(reminder["reminder_id"])},
data={"fee": reminder["fee"], "balance_snapshot": reminder["balance_snapshot"]},
)
return reminder
def mark_reminder_sent(self, member_id: str, claim_id: str, reminder_id: str) -> dict:
data, claim = self.get_claim(member_id, claim_id)
reminder = self._find_reminder(data, claim_id, reminder_id)
if str(reminder.get("status", "")) not in {"draft", "generated"}:
raise RepositoryError("Nur ein Entwurf kann als versandt markiert werden.")
if claim_balance(data, claim) <= 0:
raise RepositoryError("Die Forderung hat keinen offenen Betrag mehr.")
now = datetime.now().astimezone()
selected_fee = decimal_value(reminder.get("fee", "0"), "Mahngebühr")
if selected_fee > 0 and not reminder.get("fee_item_id"):
item = {
"item_id": str(uuid4()),
"type": "fee",
"description": f"Mahngebühr Stufe {reminder.get('level', '')}",
"quantity": "1.00",
"unit_price": money_text(selected_fee),
"amount": money_text(selected_fee),
"created_at": now.isoformat(timespec="seconds"),
}
materialize_claim_items(claim).append(item)
claim["amount"] = money_text(claim_total(claim))
reminder["fee_item_id"] = item["item_id"]
reminder["status"] = "sent"
reminder["sent_at"] = now.isoformat(timespec="seconds")
reminder["payment_deadline"] = (
now.date() + timedelta(days=int(reminder.get("payment_deadline_days", 14)))
).isoformat()
self.save_contributions(member_id, data)
self.append_event(
member_id,
event_type="reminder_sent",
summary=f"Mahnung als versandt markiert: {reminder.get('name', '')}",
references={"claim_id": claim_id, "reminder_id": str(reminder["reminder_id"])},
data={"fee": money_text(selected_fee), "payment_deadline": reminder["payment_deadline"]},
)
return reminder
def cancel_reminder(self, member_id: str, claim_id: str, reminder_id: str) -> None:
data, _claim = self.get_claim(member_id, claim_id)
reminder = self._find_reminder(data, claim_id, reminder_id)
if str(reminder.get("status", "")) == "sent":
raise RepositoryError("Eine bereits versandte Mahnung kann nicht verworfen werden.")
reminder["status"] = "cancelled"
reminder["cancelled_at"] = datetime.now().astimezone().isoformat(timespec="seconds")
self.save_contributions(member_id, data)
self.append_event(
member_id,
event_type="reminder_cancelled",
summary=f"Mahnungsentwurf verworfen: {reminder.get('name', '')}",
references={"claim_id": claim_id, "reminder_id": reminder_id},
)
def register_reminder_document(
self,
member_id: str,
claim_id: str,
reminder_id: str,
*,
relative_path: str,
sha256: str,
template: str,
) -> None:
document_path = Path(relative_path)
if document_path.is_absolute() or ".." in document_path.parts:
raise RepositoryError("Der Dokumentpfad muss innerhalb des Mitglieder-Dateiordners liegen.")
data, _claim = self.get_claim(member_id, claim_id)
reminder = self._find_reminder(data, claim_id, reminder_id)
reminder["document"] = {
"path": relative_path,
"sha256": sha256,
"template": template,
"generated_at": datetime.now().astimezone().isoformat(timespec="seconds"),
}
if str(reminder.get("status", "draft")) == "draft":
reminder["status"] = "generated"
reminder["generated_at"] = reminder["document"]["generated_at"]
self.save_contributions(member_id, data)
self.append_event(
member_id,
event_type="reminder_document_generated",
summary=f"Mahndokument erzeugt: {Path(relative_path).name}",
actor_type="user",
actor_name="Vorstand",
references={
"claim_id": claim_id,
"reminder_id": reminder_id,
"document": relative_path,
},
data={"template": template, "sha256": sha256},
)
def set_dunning_hold(
self,
member_id: str,
claim_id: str,
*,
active: bool,
reason: str = "",
until: str = "",
) -> None:
data, claim = self.get_claim(member_id, claim_id)
try:
normalized_until = normalize_date_input(until, "Mahnsperre bis")
except DateValidationError as exc:
raise RepositoryError(str(exc)) from exc
claim["dunning_hold"] = {
"active": active,
"reason": reason.strip(),
"until": normalized_until or None,
"updated_at": datetime.now().astimezone().isoformat(timespec="seconds"),
}
self.save_contributions(member_id, data)
self.append_event(
member_id,
event_type="dunning_hold_changed",
summary="Mahnsperre gesetzt" if active else "Mahnsperre aufgehoben",
references={"claim_id": claim_id},
data={"reason": reason.strip(), "until": normalized_until},
)
@staticmethod
def _find_reminder(data: ContributionData, claim_id: str, reminder_id: str) -> dict:
reminder = next(
(
item
for item in data.reminders
if str(item.get("claim_id", "")) == claim_id
and str(item.get("reminder_id", "")) == reminder_id
),
None,
)
if reminder is None:
raise RepositoryError("Mahnung nicht gefunden.")
return reminder
def cancel_claim(self, member_id: str, claim_id: str) -> None:
data, claim = self.get_claim(member_id, claim_id)
if claim_balance(data, claim) != claim_total(claim):
raise RepositoryError("Eine Forderung mit Zahlungszuordnungen kann nicht storniert werden.")
claim["status"] = "cancelled"
claim["cancelled_at"] = datetime.now().astimezone().isoformat(timespec="seconds")
self.save_contributions(member_id, data)
self.append_event(
member_id,
event_type="claim_cancelled",
summary=f"Forderung storniert: {claim.get('title', claim_id)}",
references={"claim_id": claim_id},
)
def _assert_gnucash_id_available(self, transaction_id: str) -> None:
selected = transaction_id.casefold()
for member in self.list_members():
try:
payments = self.get_contributions(member.member_id).payments
except RepositoryError:
continue
if any(
str(payment.get("gnucash_transaction_id", "")).casefold() == selected for payment in payments
):
raise RepositoryError(f"GnuCash-ID bereits verwendet: {transaction_id}")
def append_event(
self,
member_id: str,
*,
event_type: str,
summary: str,
actor_type: str = "system",
actor_name: str = "CCMA",
references: dict[str, str] | None = None,
data: dict[str, object] | None = None,
) -> Event:
directory = self._member_path(member_id)
if not (directory / "member.json").is_file():
raise RepositoryError(f"Mitglied nicht gefunden: {member_id}")
event = Event(
event_id=str(uuid4()),
timestamp=datetime.now().astimezone().isoformat(timespec="seconds"),
event_type=event_type,
summary=summary.strip(),
actor_type=actor_type,
actor_name=actor_name,
references=references or {},
data=data or {},
)
path = directory / "events.jsonl"
line = json.dumps(event.to_dict(), ensure_ascii=False, separators=(",", ":")) + "\n"
with path.open("a", encoding="utf-8", newline="\n") as handle:
handle.write(line)
handle.flush()
os.fsync(handle.fileno())
return event
def get_events(self, member_id: str) -> list[Event]:
path = self._member_path(member_id) / "events.jsonl"
if not path.exists():
return []
events: list[Event] = []
with path.open("r", encoding="utf-8") as handle:
for line_number, line in enumerate(handle, start=1):
if not line.strip():
continue
try:
raw = json.loads(line)
if not isinstance(raw, dict):
raise TypeError("Event muss ein JSON-Objekt sein")
events.append(Event.from_dict(raw))
except (AttributeError, ValueError, TypeError, KeyError, json.JSONDecodeError) as exc:
raise RepositoryError(f"Ungültiges Event in Zeile {line_number}: {exc}") from exc
return events
def search(self, query: str) -> list[Member]:
tokens = [_normalize(token) for token in query.split() if token.strip()]
if not tokens:
return self.list_members()
scored: list[tuple[int, Member]] = []
for member in self.list_members():
fields = [
member.member_number,
member.first_name,
member.last_name,
member.display_name,
member.email,
member.phone,
member.street,
member.postal_code,
member.city,
member.birth_date,
_german_date(member.birth_date),
]
normalized = [_normalize(value) for value in fields if value]
if not all(any(token in value for value in normalized) for token in tokens):
continue
exact = sum(token == value for token in tokens for value in normalized)
prefix = sum(value.startswith(token) for token in tokens for value in normalized)
scored.append((exact * 100 + prefix * 10, member))
scored.sort(key=lambda item: (-item[0], item[1].last_name.casefold(), item[1].first_name.casefold()))
return [member for _, member in scored]
def member_count(self) -> int:
return sum(1 for _ in self._member_directories())
def get_configuration(self) -> dict:
try:
configuration = read_json(self.root / "repository.json")
except (OSError, ValueError, TypeError, json.JSONDecodeError) as exc:
raise RepositoryError(f"repository.json konnte nicht gelesen werden: {exc}") from exc
if not isinstance(configuration, dict):
raise RepositoryError("repository.json enthält keine gültige Konfiguration.")
return configuration
def get_member_number_policy(self) -> dict[str, str]:
try:
config = read_json(self.root / "repository.json")
except (OSError, ValueError, TypeError, json.JSONDecodeError) as exc:
raise RepositoryError(f"Mitgliedsnummernregel konnte nicht gelesen werden: {exc}") from exc
policy = config.get("member_number_policy") or {}
mode = str(policy.get("mode", "automatic"))
if mode not in {"automatic", "manual"}:
mode = "automatic"
pattern = str(policy.get("pattern", DEFAULT_MEMBER_NUMBER_PATTERN))
validate_member_number_pattern(pattern)
return {"mode": mode, "pattern": pattern}
def save_member_number_policy(self, *, mode: str, pattern: str) -> None:
if mode not in {"automatic", "manual"}:
raise RepositoryError("Ungültiger Mitgliedsnummernmodus.")
validate_member_number_pattern(pattern)
config = read_json(self.root / "repository.json")
config["member_number_policy"] = {"mode": mode, "pattern": pattern.strip()}
config.setdefault("member_number_sequences", {})
write_json_atomic(self.root / "repository.json", config)
def save_organization(self, values: dict[str, str]) -> None:
organization = {key: str(value).strip() for key, value in values.items()}
organization["iban"] = normalize_iban(organization.get("iban", ""))
organization["bic"] = organization.get("bic", "").replace(" ", "").upper()
validate_iban(organization["iban"])
validate_bic(organization["bic"])
if not organization.get("name"):
raise RepositoryError("Der Vereinsname ist erforderlich.")
config = self.get_configuration()
config["organization"] = organization
write_json_atomic(self.root / "repository.json", config)
def preview_member_number(self, pattern: str | None = None) -> str:
selected_pattern = pattern or self.get_member_number_policy()["pattern"]
validate_member_number_pattern(selected_pattern)
config = read_json(self.root / "repository.json")
return self._next_available_member_number(config, selected_pattern)[0]
def _member_directories(self) -> Iterable[Path]:
if not self.members_root.exists():
return []
return (
path for path in self.members_root.iterdir() if path.is_dir() and not path.name.startswith(".")
)
def _member_path(self, member_id: str) -> Path:
if not member_id or Path(member_id).name != member_id or member_id in {".", ".."}:
raise RepositoryError("Ungültige Mitglieds-ID.")
return self.members_root / member_id
def _allocate_member_number(self, pattern: str) -> str:
config = read_json(self.root / "repository.json")
member_number, next_value = self._next_available_member_number(config, pattern)
sequences = config.get("member_number_sequences")
if not isinstance(sequences, dict):
sequences = {}
config["member_number_sequences"] = sequences
sequences[pattern] = next_value
write_json_atomic(self.root / "repository.json", config)
return member_number
def _next_available_member_number(self, config: dict, pattern: str) -> tuple[str, int]:
sequences = config.get("member_number_sequences")
if not isinstance(sequences, dict):
sequences = {}
try:
number = max(1, int(sequences.get(pattern, 1)))
except (TypeError, ValueError):
number = 1
existing = {member.member_number.casefold() for member in self.list_members() if member.member_number}
for _attempt in range(1_000_000):
candidate = format_member_number(pattern, number)
number += 1
if candidate.casefold() not in existing:
return candidate, number
raise RepositoryError("Keine freie Mitgliedsnummer im konfigurierten Nummernbereich gefunden.")
def _assert_member_number_available(
self,
member_number: str,
*,
exclude_member_id: str | None = None,
) -> None:
normalized = member_number.casefold().strip()
if not normalized:
raise RepositoryError("Eine Mitgliedsnummer ist erforderlich.")
for member in self.list_members():
if member.member_id != exclude_member_id and member.member_number.casefold() == normalized:
raise RepositoryError(f"Die Mitgliedsnummer {member_number} ist bereits vergeben.")
@staticmethod
def _summarize_changes(before: Member, after: Member) -> list[str]:
labels = {
"member_number": "Mitgliedsnummer",
"first_name": "Vorname",
"last_name": "Nachname",
"email": "E-Mail-Adresse",
"phone": "Telefonnummer",
"birth_date": "Geburtsdatum",
"status": "Status",
"payment_frequency": "Zahlungsweise",
"contribution_rule_id": "Beitragsregel",
"honorary": "Ehrenmitgliedschaft",
"notes": "interne Notiz",
"street": "Anschrift",
"address_addition": "Adresszusatz",
"postal_code": "Postleitzahl",
"city": "Ort",
"country": "Land",
"account_holder": "Kontoinhaber",
"iban": "IBAN",
"bic": "BIC",
"mandate_reference": "SEPA-Mandatsreferenz",
"mandate_signed_at": "Mandat erteilt am",
"mandate_active": "Lastschriftmandat aktiv",
"mandate_revoked_at": "Mandat widerrufen am",
}
changes: list[str] = []
for field, label in labels.items():
old_value = getattr(before, field)
new_value = getattr(after, field)
if old_value == new_value:
continue
if field == "status":
old_label = MEMBERSHIP_STATUS_LABELS.get(str(old_value), str(old_value))
new_label = MEMBERSHIP_STATUS_LABELS.get(str(new_value), str(new_value))
changes.append(f"Status von {old_label} zu {new_label}")
else:
changes.append(label)
return changes
def normalize_iban(value: str) -> str:
return "".join(value.split()).upper()
def validate_iban(value: str) -> None:
if not value:
return
if not 15 <= len(value) <= 34 or not value[:2].isalpha() or not value[2:].isalnum():
raise RepositoryError("Die IBAN hat kein gültiges Format.")
rearranged = value[4:] + value[:4]
numeric = "".join(
str(ord(character) - 55) if character.isalpha() else character
for character in rearranged
)
if int(numeric) % 97 != 1:
raise RepositoryError("Die IBAN-Prüfsumme ist ungültig.")
def validate_bic(value: str) -> None:
if value and (len(value) not in {8, 11} or not value.isalnum()):
raise RepositoryError("Der BIC muss 8 oder 11 Zeichen enthalten.")
def _normalize(value: str) -> str:
normalized = unicodedata.normalize("NFKD", value.casefold().strip())
return "".join(character for character in normalized if not unicodedata.combining(character))
def _german_date(value: str) -> str:
try:
return date.fromisoformat(value).strftime("%d.%m.%Y")
except ValueError:
return ""
def _dunning_hold_is_active(claim: dict) -> bool:
hold = claim.get("dunning_hold") or {}
if not hold.get("active"):
return False
if not hold.get("until"):
return True
try:
return date.fromisoformat(str(hold["until"])) >= date.today()
except ValueError:
return True
def validate_member_number_pattern(pattern: str) -> None:
selected = pattern.strip()
if not selected:
raise RepositoryError("Das Mitgliedsnummern-Pattern darf nicht leer sein.")
has_number = False
try:
for _literal, field_name, format_spec, conversion in Formatter().parse(selected):
if field_name is None:
continue
if field_name not in {"number", "year"}:
raise RepositoryError(f"Unbekannter Platzhalter: {{{field_name}}}")
if conversion:
raise RepositoryError("Konvertierungen wie !r sind im Pattern nicht erlaubt.")
if "{" in format_spec or "}" in format_spec:
raise RepositoryError("Verschachtelte Formatierungen sind nicht erlaubt.")
has_number = has_number or field_name == "number"
if not has_number:
raise RepositoryError("Das Pattern muss den Platzhalter {number} enthalten.")
first = format_member_number(selected, 1)
second = format_member_number(selected, 2)
if first == second:
raise RepositoryError("Das Pattern erzeugt keine eindeutigen Mitgliedsnummern.")
except RepositoryError:
raise
except (KeyError, ValueError) as exc:
raise RepositoryError(f"Ungültiges Mitgliedsnummern-Pattern: {exc}") from exc
def format_member_number(pattern: str, number: int, *, year: int | None = None) -> str:
try:
value = pattern.strip().format(number=number, year=year or date.today().year)
except (KeyError, ValueError) as exc:
raise RepositoryError(f"Mitgliedsnummer konnte nicht formatiert werden: {exc}") from exc
if not value or len(value) > 80 or any(character in value for character in "\r\n\t"):
raise RepositoryError("Das Pattern erzeugt eine ungültige Mitgliedsnummer.")
return value