Files
CCMA/src/ccma/storage/repository.py
T
2026-06-21 18:20:55 +02:00

716 lines
30 KiB
Python

from __future__ import annotations
import json
import os
import unicodedata
from collections.abc import Iterable
from datetime import date, datetime
from pathlib import Path
from string import Formatter
from uuid import uuid4
from ccma.domain.contributions import (
claim_balance,
claim_total,
decimal_value,
materialize_claim_items,
money_text,
payment_allocated_total,
)
from ccma.domain.dates import DateValidationError, normalize_date_input, validate_member_dates
from ccma.domain.models import MEMBERSHIP_STATUS_LABELS, ContributionData, Event, Member
from ccma.storage.atomic import read_json, write_json_atomic
class RepositoryError(RuntimeError):
pass
DEFAULT_MEMBER_NUMBER_PATTERN = "CCMA-{number:04d}"
DEFAULT_CONFIGURATION = {
"schema_version": 1,
"organization": "Chaos Computer Club Mannheim e.V.",
"member_number_policy": {
"mode": "automatic",
"pattern": DEFAULT_MEMBER_NUMBER_PATTERN,
},
"member_number_sequences": {},
"contribution_rules": [
{
"rule_id": "standard-2022",
"name": "Regulärer Beitrag ab 2022",
"valid_from": "2022-01-01",
"valid_until": None,
"annual_amount": "150.00",
"admission_fee": "10.00",
"annual_due": "01-31",
"semiannual_due": ["01-31", "07-31"],
"entry_proration": {"mode": "monthly", "started_month": "included"},
"first_payment_due_days_after_acceptance": 28,
"issue_days_before_due": 30,
"reminder_fee": "5.00",
"failed_debit_fee": "5.00",
}
],
}
class MemberRepository:
def __init__(self, root: Path | str):
self.root = Path(root).expanduser().resolve()
self.members_root = self.root / "members"
def initialize(self) -> None:
self.members_root.mkdir(parents=True, exist_ok=True)
(self.root / "rules").mkdir(parents=True, exist_ok=True)
config_path = self.root / "repository.json"
if not config_path.exists():
write_json_atomic(config_path, DEFAULT_CONFIGURATION)
def validate(self) -> list[str]:
errors: list[str] = []
try:
config = read_json(self.root / "repository.json")
if int(config.get("schema_version", 0)) != 1:
errors.append("repository.json: nicht unterstützte schema_version")
policy = config.get("member_number_policy") or {}
if str(policy.get("mode", "automatic")) not in {"automatic", "manual"}:
errors.append("repository.json: ungültiger Mitgliedsnummernmodus")
validate_member_number_pattern(str(policy.get("pattern", DEFAULT_MEMBER_NUMBER_PATTERN)))
except (OSError, ValueError, TypeError, json.JSONDecodeError, RepositoryError) as exc:
errors.append(f"repository.json: {exc}")
seen_numbers: dict[str, str] = {}
for member_dir in self._member_directories():
try:
member, _contributions = self.preflight_member_record(member_dir.name)
validate_member_dates(
birth_date=member.birth_date,
accepted_at=member.accepted_at,
membership_started_at=member.membership_started_at,
)
if member.member_id != member_dir.name:
errors.append(f"{member_dir.name}/member.json: member_id stimmt nicht mit Ordner überein")
normalized_number = member.member_number.casefold().strip()
if normalized_number and normalized_number in seen_numbers:
errors.append(
f"{member_dir.name}/member.json: Mitgliedsnummer {member.member_number} ist doppelt"
)
elif normalized_number:
seen_numbers[normalized_number] = member.member_id
except RepositoryError as exc:
errors.append(str(exc))
except (
OSError,
ValueError,
TypeError,
KeyError,
json.JSONDecodeError,
DateValidationError,
) as exc:
errors.append(f"{member_dir.name}/member.json: {exc}")
return errors
def list_members(self) -> list[Member]:
members: list[Member] = []
for member_id in self.list_member_ids():
try:
members.append(self.get_member(member_id))
except RepositoryError:
continue
return sorted(members, key=lambda item: (item.last_name.casefold(), item.first_name.casefold()))
def list_member_ids(self) -> list[str]:
return sorted(directory.name for directory in self._member_directories())
def get_member(self, member_id: str) -> Member:
path = self._member_path(member_id) / "member.json"
if not path.is_file():
raise RepositoryError(f"Mitglied nicht gefunden: {member_id}")
try:
raw = read_json(path)
if not isinstance(raw, dict):
raise TypeError("Wurzelelement muss ein JSON-Objekt sein")
for section_name in ("person", "membership", "contribution_profile"):
if section_name in raw and not isinstance(raw[section_name], dict):
raise TypeError(f"{section_name} muss ein JSON-Objekt sein")
return Member.from_dict(raw)
except (OSError, ValueError, TypeError, KeyError, json.JSONDecodeError) as exc:
raise RepositoryError(f"{member_id}/member.json konnte nicht gelesen werden: {exc}") from exc
def preflight_member_record(self, member_id: str) -> tuple[Member, ContributionData]:
member = self.get_member(member_id)
if member.member_id != member_id:
raise RepositoryError(f"{member_id}/member.json: member_id stimmt nicht mit Ordner überein")
if member.schema_version != 1:
raise RepositoryError(
f"{member_id}/member.json: nicht unterstützte schema_version {member.schema_version}"
)
contributions = self.get_contributions(member_id)
try:
self.get_events(member_id)
except (OSError, UnicodeError) as exc:
raise RepositoryError(f"{member_id}/events.jsonl konnte nicht gelesen werden: {exc}") from exc
return member, contributions
def create_member(
self,
*,
first_name: str,
last_name: str,
email: str = "",
birth_date: str = "",
member_number: str = "",
) -> Member:
if not first_name.strip() or not last_name.strip():
raise RepositoryError("Vorname und Nachname sind erforderlich.")
try:
birth_date = normalize_date_input(birth_date, "Geburtsdatum")
validate_member_dates(birth_date=birth_date)
except DateValidationError as exc:
raise RepositoryError(str(exc)) from exc
selected_number = member_number.strip()
policy = self.get_member_number_policy()
if selected_number:
self._assert_member_number_available(selected_number)
elif policy["mode"] == "manual":
raise RepositoryError("Eine Mitgliedsnummer ist erforderlich.")
else:
selected_number = self._allocate_member_number(policy["pattern"])
member_id = str(uuid4())
directory = self._member_path(member_id)
directory.mkdir(parents=True, exist_ok=False)
(directory / "files").mkdir()
member = Member(
member_id=member_id,
member_number=selected_number,
first_name=first_name.strip(),
last_name=last_name.strip(),
email=email.strip(),
birth_date=birth_date,
)
write_json_atomic(directory / "member.json", member.to_dict())
write_json_atomic(directory / "contributions.json", ContributionData().to_dict())
self.append_event(
member_id,
event_type="member_created",
summary="Mitgliederakte angelegt",
actor_type="user",
actor_name="Vorstand",
)
return member
def save_member(self, member: Member, *, actor_name: str = "Vorstand") -> None:
existing = self.get_member(member.member_id)
try:
member.birth_date = normalize_date_input(member.birth_date, "Geburtsdatum")
member.accepted_at = normalize_date_input(member.accepted_at, "Aufnahmebeschluss")
member.membership_started_at = normalize_date_input(member.membership_started_at, "Mitglied seit")
validate_member_dates(
birth_date=member.birth_date,
accepted_at=member.accepted_at,
membership_started_at=member.membership_started_at,
)
except DateValidationError as exc:
raise RepositoryError(str(exc)) from exc
if member.member_number != existing.member_number:
self._assert_member_number_available(member.member_number, exclude_member_id=member.member_id)
changes = self._summarize_changes(existing, member)
member.updated_at = datetime.now().astimezone().isoformat(timespec="seconds")
write_json_atomic(self._member_path(member.member_id) / "member.json", member.to_dict())
if changes:
self.append_event(
member.member_id,
event_type="member_data_changed",
summary=f"Mitgliedsdaten geändert: {', '.join(changes)}",
actor_type="user",
actor_name=actor_name,
)
def get_contributions(self, member_id: str) -> ContributionData:
path = self._member_path(member_id) / "contributions.json"
if not path.exists():
return ContributionData()
try:
raw = read_json(path)
if not isinstance(raw, dict):
raise TypeError("Wurzelelement muss ein JSON-Objekt sein")
for field_name in ("claims", "payments", "allocations", "reminders"):
if field_name in raw and not isinstance(raw[field_name], list):
raise TypeError(f"{field_name} muss eine JSON-Liste sein")
if field_name in raw and any(not isinstance(item, dict) for item in raw[field_name]):
raise TypeError(f"Alle Einträge in {field_name} müssen JSON-Objekte sein")
return ContributionData.from_dict(raw)
except (OSError, ValueError, TypeError, json.JSONDecodeError) as exc:
raise RepositoryError(
f"{member_id}/contributions.json konnte nicht gelesen werden: {exc}"
) from exc
def save_contributions(self, member_id: str, data: ContributionData) -> None:
self.get_member(member_id)
write_json_atomic(self._member_path(member_id) / "contributions.json", data.to_dict())
def get_claim(self, member_id: str, claim_id: str) -> tuple[ContributionData, dict]:
data = self.get_contributions(member_id)
claim = next(
(item for item in data.claims if str(item.get("claim_id", "")) == claim_id),
None,
)
if claim is None:
raise RepositoryError(f"Forderung nicht gefunden: {claim_id}")
return data, claim
def add_claim_item(
self,
member_id: str,
claim_id: str,
*,
description: str,
quantity: str,
unit_price: str,
item_type: str = "correction",
) -> dict:
if not description.strip():
raise RepositoryError("Eine Beschreibung ist erforderlich.")
try:
selected_quantity = decimal_value(quantity, "Menge")
selected_unit_price = decimal_value(unit_price, "Einzelpreis")
except ValueError as exc:
raise RepositoryError(str(exc)) from exc
if selected_quantity == 0:
raise RepositoryError("Die Menge darf nicht null sein.")
data, claim = self.get_claim(member_id, claim_id)
if str(claim.get("status", "")) == "cancelled":
raise RepositoryError("Eine stornierte Forderung kann nicht verändert werden.")
amount = selected_quantity * selected_unit_price
item = {
"item_id": str(uuid4()),
"type": item_type.strip() or "correction",
"description": description.strip(),
"quantity": money_text(selected_quantity),
"unit_price": money_text(selected_unit_price),
"amount": money_text(amount),
"created_at": datetime.now().astimezone().isoformat(timespec="seconds"),
}
materialize_claim_items(claim).append(item)
claim["amount"] = money_text(claim_total(claim))
self.save_contributions(member_id, data)
self.append_event(
member_id,
event_type="claim_item_added",
summary=f"Forderungsposition ergänzt: {item['description']} ({item['amount']} EUR)",
references={"claim_id": claim_id, "item_id": str(item["item_id"])},
)
return item
def record_payment(
self,
member_id: str,
claim_id: str,
*,
payment_date: str,
amount: str,
allocation_amount: str,
gnucash_transaction_id: str = "",
reference: str = "",
) -> dict:
try:
normalized_date = normalize_date_input(payment_date, "Zahlungsdatum")
selected_amount = decimal_value(amount)
selected_allocation = decimal_value(allocation_amount, "Zuordnung")
except (DateValidationError, ValueError) as exc:
raise RepositoryError(str(exc)) from exc
if not normalized_date:
raise RepositoryError("Zahlungsdatum ist erforderlich.")
if selected_amount <= 0:
raise RepositoryError("Der Zahlungsbetrag muss größer als null sein.")
if selected_allocation <= 0 or selected_allocation > selected_amount:
raise RepositoryError(
"Die Zuordnung muss größer als null und höchstens so hoch wie die Zahlung sein."
)
gnucash_id = gnucash_transaction_id.strip()
if gnucash_id:
self._assert_gnucash_id_available(gnucash_id)
data, claim = self.get_claim(member_id, claim_id)
payment = {
"payment_id": str(uuid4()),
"date": normalized_date,
"amount": money_text(selected_amount),
"method": "bank_transfer",
"gnucash_transaction_id": gnucash_id,
"reference": reference.strip(),
"created_at": datetime.now().astimezone().isoformat(timespec="seconds"),
}
allocation = {
"allocation_id": str(uuid4()),
"payment_id": payment["payment_id"],
"claim_id": claim_id,
"amount": money_text(selected_allocation),
}
data.payments.append(payment)
data.allocations.append(allocation)
self.save_contributions(member_id, data)
self.append_event(
member_id,
event_type="payment_recorded",
summary=f"Zahlung eingegangen: {payment['amount']} EUR",
references={"claim_id": claim_id, "payment_id": str(payment["payment_id"])},
data={"allocation_amount": allocation["amount"]},
)
return payment
def allocate_payment(self, member_id: str, claim_id: str, *, payment_id: str, amount: str) -> dict:
data, _claim = self.get_claim(member_id, claim_id)
payment = next(
(item for item in data.payments if str(item.get("payment_id", "")) == payment_id),
None,
)
if payment is None:
raise RepositoryError("Zahlung nicht gefunden.")
try:
selected_amount = decimal_value(amount, "Zuordnung")
available = decimal_value(payment.get("amount", "0")) - payment_allocated_total(data, payment_id)
except ValueError as exc:
raise RepositoryError(str(exc)) from exc
if selected_amount <= 0 or selected_amount > available:
raise RepositoryError(f"Es sind nur {money_text(available)} EUR dieser Zahlung verfügbar.")
allocation = {
"allocation_id": str(uuid4()),
"payment_id": payment_id,
"claim_id": claim_id,
"amount": money_text(selected_amount),
}
data.allocations.append(allocation)
self.save_contributions(member_id, data)
self.append_event(
member_id,
event_type="payment_allocated",
summary=f"Zahlung zugeordnet: {allocation['amount']} EUR",
references={"claim_id": claim_id, "payment_id": payment_id},
)
return allocation
def add_reminder(
self,
member_id: str,
claim_id: str,
*,
level: int,
detail: str = "",
fee: str = "0",
) -> dict:
if level < 1:
raise RepositoryError("Die Mahnstufe muss mindestens 1 sein.")
try:
selected_fee = decimal_value(fee, "Mahngebühr")
except ValueError as exc:
raise RepositoryError(str(exc)) from exc
if selected_fee < 0:
raise RepositoryError("Die Mahngebühr darf nicht negativ sein.")
data, claim = self.get_claim(member_id, claim_id)
if str(claim.get("status", "")) == "cancelled":
raise RepositoryError("Eine stornierte Forderung kann nicht gemahnt werden.")
reminder = {
"reminder_id": str(uuid4()),
"claim_id": claim_id,
"level": level,
"detail": detail.strip(),
"created_at": datetime.now().astimezone().isoformat(timespec="seconds"),
"fee_item_id": None,
}
if selected_fee > 0:
item = {
"item_id": str(uuid4()),
"type": "fee",
"description": f"Mahngebühr Stufe {level}",
"quantity": "1.00",
"unit_price": money_text(selected_fee),
"amount": money_text(selected_fee),
"created_at": reminder["created_at"],
}
materialize_claim_items(claim).append(item)
claim["amount"] = money_text(claim_total(claim))
reminder["fee_item_id"] = item["item_id"]
data.reminders.append(reminder)
self.save_contributions(member_id, data)
self.append_event(
member_id,
event_type="reminder_created",
summary=f"Mahnung Stufe {level} erfasst",
references={"claim_id": claim_id, "reminder_id": str(reminder["reminder_id"])},
data={"fee": money_text(selected_fee)},
)
return reminder
def cancel_claim(self, member_id: str, claim_id: str) -> None:
data, claim = self.get_claim(member_id, claim_id)
if claim_balance(data, claim) != claim_total(claim):
raise RepositoryError("Eine Forderung mit Zahlungszuordnungen kann nicht storniert werden.")
claim["status"] = "cancelled"
claim["cancelled_at"] = datetime.now().astimezone().isoformat(timespec="seconds")
self.save_contributions(member_id, data)
self.append_event(
member_id,
event_type="claim_cancelled",
summary=f"Forderung storniert: {claim.get('title', claim_id)}",
references={"claim_id": claim_id},
)
def _assert_gnucash_id_available(self, transaction_id: str) -> None:
selected = transaction_id.casefold()
for member in self.list_members():
try:
payments = self.get_contributions(member.member_id).payments
except RepositoryError:
continue
if any(
str(payment.get("gnucash_transaction_id", "")).casefold() == selected for payment in payments
):
raise RepositoryError(f"GnuCash-ID bereits verwendet: {transaction_id}")
def append_event(
self,
member_id: str,
*,
event_type: str,
summary: str,
actor_type: str = "system",
actor_name: str = "CCMA",
references: dict[str, str] | None = None,
data: dict[str, object] | None = None,
) -> Event:
directory = self._member_path(member_id)
if not (directory / "member.json").is_file():
raise RepositoryError(f"Mitglied nicht gefunden: {member_id}")
event = Event(
event_id=str(uuid4()),
timestamp=datetime.now().astimezone().isoformat(timespec="seconds"),
event_type=event_type,
summary=summary.strip(),
actor_type=actor_type,
actor_name=actor_name,
references=references or {},
data=data or {},
)
path = directory / "events.jsonl"
line = json.dumps(event.to_dict(), ensure_ascii=False, separators=(",", ":")) + "\n"
with path.open("a", encoding="utf-8", newline="\n") as handle:
handle.write(line)
handle.flush()
os.fsync(handle.fileno())
return event
def get_events(self, member_id: str) -> list[Event]:
path = self._member_path(member_id) / "events.jsonl"
if not path.exists():
return []
events: list[Event] = []
with path.open("r", encoding="utf-8") as handle:
for line_number, line in enumerate(handle, start=1):
if not line.strip():
continue
try:
raw = json.loads(line)
if not isinstance(raw, dict):
raise TypeError("Event muss ein JSON-Objekt sein")
events.append(Event.from_dict(raw))
except (AttributeError, ValueError, TypeError, KeyError, json.JSONDecodeError) as exc:
raise RepositoryError(f"Ungültiges Event in Zeile {line_number}: {exc}") from exc
return events
def search(self, query: str) -> list[Member]:
tokens = [_normalize(token) for token in query.split() if token.strip()]
if not tokens:
return self.list_members()
scored: list[tuple[int, Member]] = []
for member in self.list_members():
fields = [
member.member_number,
member.first_name,
member.last_name,
member.display_name,
member.email,
member.birth_date,
_german_date(member.birth_date),
]
normalized = [_normalize(value) for value in fields if value]
if not all(any(token in value for value in normalized) for token in tokens):
continue
exact = sum(token == value for token in tokens for value in normalized)
prefix = sum(value.startswith(token) for token in tokens for value in normalized)
scored.append((exact * 100 + prefix * 10, member))
scored.sort(key=lambda item: (-item[0], item[1].last_name.casefold(), item[1].first_name.casefold()))
return [member for _, member in scored]
def member_count(self) -> int:
return sum(1 for _ in self._member_directories())
def get_configuration(self) -> dict:
try:
configuration = read_json(self.root / "repository.json")
except (OSError, ValueError, TypeError, json.JSONDecodeError) as exc:
raise RepositoryError(f"repository.json konnte nicht gelesen werden: {exc}") from exc
if not isinstance(configuration, dict):
raise RepositoryError("repository.json enthält keine gültige Konfiguration.")
return configuration
def get_member_number_policy(self) -> dict[str, str]:
try:
config = read_json(self.root / "repository.json")
except (OSError, ValueError, TypeError, json.JSONDecodeError) as exc:
raise RepositoryError(f"Mitgliedsnummernregel konnte nicht gelesen werden: {exc}") from exc
policy = config.get("member_number_policy") or {}
mode = str(policy.get("mode", "automatic"))
if mode not in {"automatic", "manual"}:
mode = "automatic"
pattern = str(policy.get("pattern", DEFAULT_MEMBER_NUMBER_PATTERN))
validate_member_number_pattern(pattern)
return {"mode": mode, "pattern": pattern}
def save_member_number_policy(self, *, mode: str, pattern: str) -> None:
if mode not in {"automatic", "manual"}:
raise RepositoryError("Ungültiger Mitgliedsnummernmodus.")
validate_member_number_pattern(pattern)
config = read_json(self.root / "repository.json")
config["member_number_policy"] = {"mode": mode, "pattern": pattern.strip()}
config.setdefault("member_number_sequences", {})
write_json_atomic(self.root / "repository.json", config)
def preview_member_number(self, pattern: str | None = None) -> str:
selected_pattern = pattern or self.get_member_number_policy()["pattern"]
validate_member_number_pattern(selected_pattern)
config = read_json(self.root / "repository.json")
return self._next_available_member_number(config, selected_pattern)[0]
def _member_directories(self) -> Iterable[Path]:
if not self.members_root.exists():
return []
return (
path for path in self.members_root.iterdir() if path.is_dir() and not path.name.startswith(".")
)
def _member_path(self, member_id: str) -> Path:
if not member_id or Path(member_id).name != member_id or member_id in {".", ".."}:
raise RepositoryError("Ungültige Mitglieds-ID.")
return self.members_root / member_id
def _allocate_member_number(self, pattern: str) -> str:
config = read_json(self.root / "repository.json")
member_number, next_value = self._next_available_member_number(config, pattern)
sequences = config.get("member_number_sequences")
if not isinstance(sequences, dict):
sequences = {}
config["member_number_sequences"] = sequences
sequences[pattern] = next_value
write_json_atomic(self.root / "repository.json", config)
return member_number
def _next_available_member_number(self, config: dict, pattern: str) -> tuple[str, int]:
sequences = config.get("member_number_sequences")
if not isinstance(sequences, dict):
sequences = {}
try:
number = max(1, int(sequences.get(pattern, 1)))
except (TypeError, ValueError):
number = 1
existing = {member.member_number.casefold() for member in self.list_members() if member.member_number}
for _attempt in range(1_000_000):
candidate = format_member_number(pattern, number)
number += 1
if candidate.casefold() not in existing:
return candidate, number
raise RepositoryError("Keine freie Mitgliedsnummer im konfigurierten Nummernbereich gefunden.")
def _assert_member_number_available(
self,
member_number: str,
*,
exclude_member_id: str | None = None,
) -> None:
normalized = member_number.casefold().strip()
if not normalized:
raise RepositoryError("Eine Mitgliedsnummer ist erforderlich.")
for member in self.list_members():
if member.member_id != exclude_member_id and member.member_number.casefold() == normalized:
raise RepositoryError(f"Die Mitgliedsnummer {member_number} ist bereits vergeben.")
@staticmethod
def _summarize_changes(before: Member, after: Member) -> list[str]:
labels = {
"member_number": "Mitgliedsnummer",
"first_name": "Vorname",
"last_name": "Nachname",
"email": "E-Mail-Adresse",
"birth_date": "Geburtsdatum",
"status": "Status",
"payment_frequency": "Zahlungsweise",
"contribution_rule_id": "Beitragsregel",
"honorary": "Ehrenmitgliedschaft",
"notes": "interne Notiz",
}
changes: list[str] = []
for field, label in labels.items():
old_value = getattr(before, field)
new_value = getattr(after, field)
if old_value == new_value:
continue
if field == "status":
old_label = MEMBERSHIP_STATUS_LABELS.get(str(old_value), str(old_value))
new_label = MEMBERSHIP_STATUS_LABELS.get(str(new_value), str(new_value))
changes.append(f"Status von {old_label} zu {new_label}")
else:
changes.append(label)
return changes
def _normalize(value: str) -> str:
normalized = unicodedata.normalize("NFKD", value.casefold().strip())
return "".join(character for character in normalized if not unicodedata.combining(character))
def _german_date(value: str) -> str:
try:
return date.fromisoformat(value).strftime("%d.%m.%Y")
except ValueError:
return ""
def validate_member_number_pattern(pattern: str) -> None:
selected = pattern.strip()
if not selected:
raise RepositoryError("Das Mitgliedsnummern-Pattern darf nicht leer sein.")
has_number = False
try:
for _literal, field_name, format_spec, conversion in Formatter().parse(selected):
if field_name is None:
continue
if field_name not in {"number", "year"}:
raise RepositoryError(f"Unbekannter Platzhalter: {{{field_name}}}")
if conversion:
raise RepositoryError("Konvertierungen wie !r sind im Pattern nicht erlaubt.")
if "{" in format_spec or "}" in format_spec:
raise RepositoryError("Verschachtelte Formatierungen sind nicht erlaubt.")
has_number = has_number or field_name == "number"
if not has_number:
raise RepositoryError("Das Pattern muss den Platzhalter {number} enthalten.")
first = format_member_number(selected, 1)
second = format_member_number(selected, 2)
if first == second:
raise RepositoryError("Das Pattern erzeugt keine eindeutigen Mitgliedsnummern.")
except RepositoryError:
raise
except (KeyError, ValueError) as exc:
raise RepositoryError(f"Ungültiges Mitgliedsnummern-Pattern: {exc}") from exc
def format_member_number(pattern: str, number: int, *, year: int | None = None) -> str:
try:
value = pattern.strip().format(number=number, year=year or date.today().year)
except (KeyError, ValueError) as exc:
raise RepositoryError(f"Mitgliedsnummer konnte nicht formatiert werden: {exc}") from exc
if not value or len(value) > 80 or any(character in value for character in "\r\n\t"):
raise RepositoryError("Das Pattern erzeugt eine ungültige Mitgliedsnummer.")
return value