Add asset records, claims, and credit workflows

This commit is contained in:
Marcel Peterkau
2026-06-26 23:03:06 +02:00
parent 30b6d253b2
commit d1dab793a6
11 changed files with 2060 additions and 10 deletions
+441 -2
View File
@@ -13,13 +13,14 @@ from uuid import uuid4
from ccma.domain.contributions import (
claim_balance,
claim_total,
credit_allocated_total,
decimal_value,
materialize_claim_items,
money_text,
payment_allocated_total,
)
from ccma.domain.dates import DateValidationError, normalize_date_input, validate_member_dates
from ccma.domain.models import MEMBERSHIP_STATUS_LABELS, ContributionData, Event, Member
from ccma.domain.models import ASSET_STATUS_LABELS, MEMBERSHIP_STATUS_LABELS, Asset, ContributionData, Event, Member
from ccma.storage.atomic import read_json, write_json_atomic
@@ -114,9 +115,11 @@ class MemberRepository:
def __init__(self, root: Path | str):
self.root = Path(root).expanduser().resolve()
self.members_root = self.root / "members"
self.assets_root = self.root / "assets"
def initialize(self) -> None:
self.members_root.mkdir(parents=True, exist_ok=True)
self.assets_root.mkdir(parents=True, exist_ok=True)
(self.root / "rules").mkdir(parents=True, exist_ok=True)
templates_root = self.root / "templates"
templates_root.mkdir(parents=True, exist_ok=True)
@@ -177,6 +180,30 @@ class MemberRepository:
DateValidationError,
) as exc:
errors.append(f"{member_dir.name}/member.json: {exc}")
for asset_dir in self._asset_directories():
try:
asset = self.get_asset(asset_dir.name)
if asset.asset_id != asset_dir.name:
errors.append(f"{asset_dir.name}/asset.json: asset_id stimmt nicht mit Ordner überein")
if asset.schema_version != 1:
errors.append(
f"{asset_dir.name}/asset.json: nicht unterstützte schema_version {asset.schema_version}"
)
if asset.status not in ASSET_STATUS_LABELS:
errors.append(f"{asset_dir.name}/asset.json: ungültiger Asset-Status")
if asset.current_holder_member_id:
self.get_member(asset.current_holder_member_id)
if asset.status != "issued":
errors.append(
f"{asset_dir.name}/asset.json: zugeordnetes Asset muss Status issued haben"
)
elif asset.status == "issued":
errors.append(f"{asset_dir.name}/asset.json: issued benötigt current_holder_member_id")
self.get_asset_events(asset.asset_id)
except RepositoryError as exc:
errors.append(str(exc))
except (OSError, ValueError, TypeError, KeyError, json.JSONDecodeError) as exc:
errors.append(f"{asset_dir.name}/asset.json: {exc}")
return errors
def list_members(self) -> list[Member]:
@@ -313,6 +340,299 @@ class MemberRepository:
actor_name=actor_name,
)
def list_assets(self) -> list[Asset]:
assets: list[Asset] = []
for asset_id in self.list_asset_ids():
try:
assets.append(self.get_asset(asset_id))
except RepositoryError:
continue
return sorted(assets, key=lambda item: (item.label.casefold(), item.inventory_number.casefold()))
def list_asset_ids(self) -> list[str]:
return sorted(directory.name for directory in self._asset_directories())
def get_asset(self, asset_id: str) -> Asset:
path = self._asset_path(asset_id) / "asset.json"
if not path.is_file():
raise RepositoryError(f"Asset nicht gefunden: {asset_id}")
try:
raw = read_json(path)
if not isinstance(raw, dict):
raise TypeError("Wurzelelement muss ein JSON-Objekt sein")
return Asset.from_dict(raw)
except (OSError, ValueError, TypeError, KeyError, json.JSONDecodeError) as exc:
raise RepositoryError(f"{asset_id}/asset.json konnte nicht gelesen werden: {exc}") from exc
def create_asset(
self,
*,
label: str,
category: str = "",
inventory_number: str = "",
serial_number: str = "",
deposit_amount_default: str = "0",
notes: str = "",
) -> Asset:
if not label.strip():
raise RepositoryError("Eine Bezeichnung für das Asset ist erforderlich.")
try:
deposit_amount = decimal_value(deposit_amount_default or "0", "Kaution")
except ValueError as exc:
raise RepositoryError(str(exc)) from exc
if deposit_amount < 0:
raise RepositoryError("Die Kaution darf nicht negativ sein.")
asset_id = str(uuid4())
directory = self._asset_path(asset_id)
directory.mkdir(parents=True, exist_ok=False)
(directory / "files").mkdir()
asset = Asset(
asset_id=asset_id,
label=label.strip(),
category=category.strip(),
inventory_number=inventory_number.strip(),
serial_number=serial_number.strip(),
deposit_amount_default=money_text(deposit_amount),
notes=notes.strip(),
)
write_json_atomic(directory / "asset.json", asset.to_dict())
self.append_asset_event(
asset.asset_id,
event_type="asset_created",
summary="Asset angelegt",
actor_type="user",
actor_name="Vorstand",
)
return asset
def save_asset(self, asset: Asset, *, actor_name: str = "Vorstand") -> None:
existing = self.get_asset(asset.asset_id)
if not asset.label.strip():
raise RepositoryError("Eine Bezeichnung für das Asset ist erforderlich.")
if asset.status not in ASSET_STATUS_LABELS:
raise RepositoryError("Ungültiger Asset-Status.")
try:
deposit_amount = decimal_value(asset.deposit_amount_default or "0", "Kaution")
except ValueError as exc:
raise RepositoryError(str(exc)) from exc
if deposit_amount < 0:
raise RepositoryError("Die Kaution darf nicht negativ sein.")
if (
existing.current_holder_member_id
and money_text(deposit_amount) != str(existing.deposit_amount_default)
):
raise RepositoryError("Die Kaution kann nur geändert werden, wenn das Asset nicht ausgegeben ist.")
asset.label = asset.label.strip()
asset.category = asset.category.strip()
asset.inventory_number = asset.inventory_number.strip()
asset.serial_number = asset.serial_number.strip()
asset.deposit_amount_default = money_text(deposit_amount)
asset.notes = asset.notes.strip()
if asset.current_holder_member_id:
self.get_member(asset.current_holder_member_id)
if asset.status != "issued":
raise RepositoryError("Ein zugeordnetes Asset muss den Status issued haben.")
elif asset.status == "issued":
raise RepositoryError("Status issued benötigt ein zugeordnetes Mitglied.")
changes = self._summarize_asset_changes(existing, asset)
asset.updated_at = datetime.now().astimezone().isoformat(timespec="seconds")
write_json_atomic(self._asset_path(asset.asset_id) / "asset.json", asset.to_dict())
if changes:
self.append_asset_event(
asset.asset_id,
event_type="asset_data_changed",
summary=f"Assetdaten geändert: {', '.join(changes)}",
actor_type="user",
actor_name=actor_name,
)
def assign_asset(self, asset_id: str, member_id: str, *, actor_name: str = "Vorstand") -> Asset:
asset = self.get_asset(asset_id)
member = self.get_member(member_id)
if asset.current_holder_member_id:
raise RepositoryError("Das Asset ist bereits einem Mitglied zugeordnet.")
if asset.status in {"lost", "retired"}:
raise RepositoryError("Verlorene oder ausgemusterte Assets können nicht ausgegeben werden.")
asset.current_holder_member_id = member.member_id
asset.status = "issued"
asset.updated_at = datetime.now().astimezone().isoformat(timespec="seconds")
write_json_atomic(self._asset_path(asset.asset_id) / "asset.json", asset.to_dict())
self.append_asset_event(
asset.asset_id,
event_type="asset_issued",
summary=f"Asset ausgegeben an {member.member_number or member.member_id}",
actor_type="user",
actor_name=actor_name,
references={"member_id": member.member_id},
)
self.append_event(
member.member_id,
event_type="asset_assigned",
summary=f"Asset ausgegeben: {asset.label}",
actor_type="user",
actor_name=actor_name,
references={"asset_id": asset.asset_id},
)
return asset
def return_asset(self, asset_id: str, *, actor_name: str = "Vorstand") -> Asset:
asset = self.get_asset(asset_id)
member_id = asset.current_holder_member_id
if not member_id:
raise RepositoryError("Das Asset ist aktuell keinem Mitglied zugeordnet.")
asset.current_holder_member_id = ""
asset.status = "available"
asset.updated_at = datetime.now().astimezone().isoformat(timespec="seconds")
write_json_atomic(self._asset_path(asset.asset_id) / "asset.json", asset.to_dict())
self.append_asset_event(
asset.asset_id,
event_type="asset_returned",
summary="Asset zurückgenommen",
actor_type="user",
actor_name=actor_name,
references={"member_id": member_id},
)
self.append_event(
member_id,
event_type="asset_returned",
summary=f"Asset zurückgegeben: {asset.label}",
actor_type="user",
actor_name=actor_name,
references={"asset_id": asset.asset_id},
)
return asset
def list_member_assets(self, member_id: str) -> list[Asset]:
self.get_member(member_id)
return [asset for asset in self.list_assets() if asset.current_holder_member_id == member_id]
def create_manual_claim(
self,
member_id: str,
*,
title: str,
amount: str,
due_date: str,
description: str = "",
claim_type: str = "asset_charge",
references: dict[str, str] | None = None,
actor_name: str = "Vorstand",
) -> dict:
self.get_member(member_id)
if not title.strip():
raise RepositoryError("Ein Forderungstitel ist erforderlich.")
try:
normalized_due_date = normalize_date_input(due_date, "Fälligkeitsdatum")
amount_value = decimal_value(amount)
except (DateValidationError, ValueError) as exc:
raise RepositoryError(str(exc)) from exc
if not normalized_due_date:
raise RepositoryError("Ein Fälligkeitsdatum ist erforderlich.")
if amount_value == 0:
raise RepositoryError("Der Betrag darf nicht null sein.")
now = datetime.now().astimezone().isoformat(timespec="seconds")
claim_id = str(uuid4())
item_type = "credit" if amount_value < 0 else "base"
claim = {
"claim_id": claim_id,
"claim_key": f"manual:{claim_type}:{claim_id}",
"title": title.strip(),
"amount": money_text(amount_value),
"due_date": normalized_due_date,
"status": "open",
"created_at": now,
"items": [
{
"item_id": str(uuid4()),
"type": item_type,
"description": description.strip() or title.strip(),
"quantity": "1.00",
"unit_price": money_text(amount_value),
"amount": money_text(amount_value),
"created_at": now,
}
],
"origin": {
"type": "manual",
"subtype": claim_type.strip() or "asset_charge",
"actor": actor_name,
"asset_id": (references or {}).get("asset_id", ""),
},
}
data = self.get_contributions(member_id)
data.claims.append(claim)
self.save_contributions(member_id, data)
event = self.append_event(
member_id,
event_type="claim_created",
summary=f"Forderung angelegt: {claim['title']}",
actor_type="user",
actor_name=actor_name,
references={"claim_id": claim_id, **(references or {})},
data={"amount": claim["amount"], "due_date": claim["due_date"], "claim_type": claim_type},
)
if references and references.get("asset_id"):
self.append_asset_event(
references["asset_id"],
event_type="asset_claim_created",
summary=f"Forderung für Asset angelegt: {claim['title']}",
actor_type="user",
actor_name=actor_name,
references={"member_id": member_id, "claim_id": claim_id},
data={"amount": claim["amount"], "due_date": claim["due_date"], "claim_type": claim_type},
)
return {"claim": claim, "event": event}
def append_asset_event(
self,
asset_id: str,
*,
event_type: str,
summary: str,
actor_type: str = "system",
actor_name: str = "CCMA",
references: dict[str, str] | None = None,
data: dict[str, object] | None = None,
) -> Event:
directory = self._asset_path(asset_id)
if not (directory / "asset.json").is_file():
raise RepositoryError(f"Asset nicht gefunden: {asset_id}")
event = Event(
event_id=str(uuid4()),
timestamp=datetime.now().astimezone().isoformat(timespec="seconds"),
event_type=event_type,
summary=summary.strip(),
actor_type=actor_type,
actor_name=actor_name,
references=references or {},
data=data or {},
)
path = directory / "events.jsonl"
line = json.dumps(event.to_dict(), ensure_ascii=False, separators=(",", ":")) + "\n"
with path.open("a", encoding="utf-8", newline="\n") as handle:
handle.write(line)
handle.flush()
os.fsync(handle.fileno())
return event
def get_asset_events(self, asset_id: str) -> list[Event]:
path = self._asset_path(asset_id) / "events.jsonl"
if not path.exists():
return []
events: list[Event] = []
with path.open("r", encoding="utf-8") as handle:
for line_number, line in enumerate(handle, start=1):
if not line.strip():
continue
try:
raw = json.loads(line)
if not isinstance(raw, dict):
raise TypeError("Event muss ein JSON-Objekt sein")
events.append(Event.from_dict(raw))
except (AttributeError, ValueError, TypeError, KeyError, json.JSONDecodeError) as exc:
raise RepositoryError(f"Ungültiges Event in Zeile {line_number}: {exc}") from exc
return events
def get_contributions(self, member_id: str) -> ContributionData:
path = self._member_path(member_id) / "contributions.json"
if not path.exists():
@@ -321,7 +641,7 @@ class MemberRepository:
raw = read_json(path)
if not isinstance(raw, dict):
raise TypeError("Wurzelelement muss ein JSON-Objekt sein")
for field_name in ("claims", "payments", "allocations", "reminders"):
for field_name in ("claims", "payments", "credits", "allocations", "reminders"):
if field_name in raw and not isinstance(raw[field_name], list):
raise TypeError(f"{field_name} muss eine JSON-Liste sein")
if field_name in raw and any(not isinstance(item, dict) for item in raw[field_name]):
@@ -476,6 +796,92 @@ class MemberRepository:
)
return allocation
def record_credit(
self,
member_id: str,
claim_id: str,
*,
credit_date: str,
amount: str,
allocation_amount: str,
reference: str = "",
) -> dict:
try:
normalized_date = normalize_date_input(credit_date, "Gutschriftsdatum")
selected_amount = decimal_value(amount, "Gutschrift")
selected_allocation = decimal_value(allocation_amount, "Zuordnung")
except (DateValidationError, ValueError) as exc:
raise RepositoryError(str(exc)) from exc
if not normalized_date:
raise RepositoryError("Gutschriftsdatum ist erforderlich.")
if selected_amount <= 0:
raise RepositoryError("Der Gutschriftsbetrag muss größer als null sein.")
if selected_allocation <= 0 or selected_allocation > selected_amount:
raise RepositoryError(
"Die Zuordnung muss größer als null und höchstens so hoch wie die Gutschrift sein."
)
data, claim = self.get_claim(member_id, claim_id)
if claim_total(claim) >= 0:
raise RepositoryError("Gutschriften können nur negativen Forderungen zugeordnet werden.")
credit = {
"credit_id": str(uuid4()),
"date": normalized_date,
"amount": money_text(selected_amount),
"method": "payout",
"reference": reference.strip(),
"created_at": datetime.now().astimezone().isoformat(timespec="seconds"),
}
allocation = {
"allocation_id": str(uuid4()),
"credit_id": credit["credit_id"],
"claim_id": claim_id,
"amount": money_text(selected_allocation),
}
data.credits.append(credit)
data.allocations.append(allocation)
self.save_contributions(member_id, data)
self.append_event(
member_id,
event_type="credit_recorded",
summary=f"Gutschrift / Auszahlung erfasst: {credit['amount']} EUR",
references={"claim_id": claim_id, "credit_id": str(credit["credit_id"])},
data={"allocation_amount": allocation["amount"]},
)
return credit
def allocate_credit(self, member_id: str, claim_id: str, *, credit_id: str, amount: str) -> dict:
data, claim = self.get_claim(member_id, claim_id)
if claim_total(claim) >= 0:
raise RepositoryError("Gutschriften können nur negativen Forderungen zugeordnet werden.")
credit = next(
(item for item in data.credits if str(item.get("credit_id", "")) == credit_id),
None,
)
if credit is None:
raise RepositoryError("Gutschrift nicht gefunden.")
try:
selected_amount = decimal_value(amount, "Zuordnung")
available = decimal_value(credit.get("amount", "0")) - credit_allocated_total(data, credit_id)
except ValueError as exc:
raise RepositoryError(str(exc)) from exc
if selected_amount <= 0 or selected_amount > available:
raise RepositoryError(f"Es sind nur {money_text(available)} EUR dieser Gutschrift verfügbar.")
allocation = {
"allocation_id": str(uuid4()),
"credit_id": credit_id,
"claim_id": claim_id,
"amount": money_text(selected_amount),
}
data.allocations.append(allocation)
self.save_contributions(member_id, data)
self.append_event(
member_id,
event_type="credit_allocated",
summary=f"Gutschrift zugeordnet: {allocation['amount']} EUR",
references={"claim_id": claim_id, "credit_id": credit_id},
)
return allocation
def create_reminder_draft(
self,
member_id: str,
@@ -793,6 +1199,9 @@ class MemberRepository:
def member_count(self) -> int:
return sum(1 for _ in self._member_directories())
def asset_count(self) -> int:
return sum(1 for _ in self._asset_directories())
def get_configuration(self) -> dict:
try:
configuration = read_json(self.root / "repository.json")
@@ -849,11 +1258,23 @@ class MemberRepository:
path for path in self.members_root.iterdir() if path.is_dir() and not path.name.startswith(".")
)
def _asset_directories(self) -> Iterable[Path]:
if not self.assets_root.exists():
return []
return (
path for path in self.assets_root.iterdir() if path.is_dir() and not path.name.startswith(".")
)
def _member_path(self, member_id: str) -> Path:
if not member_id or Path(member_id).name != member_id or member_id in {".", ".."}:
raise RepositoryError("Ungültige Mitglieds-ID.")
return self.members_root / member_id
def _asset_path(self, asset_id: str) -> Path:
if not asset_id or Path(asset_id).name != asset_id or asset_id in {".", ".."}:
raise RepositoryError("Ungültige Asset-ID.")
return self.assets_root / asset_id
def _allocate_member_number(self, pattern: str) -> str:
config = read_json(self.root / "repository.json")
member_number, next_value = self._next_available_member_number(config, pattern)
@@ -936,6 +1357,24 @@ class MemberRepository:
changes.append(label)
return changes
@staticmethod
def _summarize_asset_changes(before: Asset, after: Asset) -> list[str]:
labels = {
"label": "Bezeichnung",
"category": "Kategorie",
"inventory_number": "Inventarnummer",
"serial_number": "Seriennummer",
"status": "Status",
"current_holder_member_id": "Zuordnung",
"deposit_amount_default": "Kaution",
"notes": "Notiz",
}
changes: list[str] = []
for field, label in labels.items():
if getattr(before, field) != getattr(after, field):
changes.append(label)
return changes
def normalize_iban(value: str) -> str:
return "".join(value.split()).upper()