feat: add OpenDocument PDF templates

This commit is contained in:
Marcel Peterkau
2026-06-21 22:10:16 +02:00
parent b34135b34a
commit 0622a22794
14 changed files with 942 additions and 14 deletions
+407
View File
@@ -0,0 +1,407 @@
from __future__ import annotations
import hashlib
import os
import re
import shutil
import subprocess
import tempfile
import zipfile
from dataclasses import dataclass
from datetime import date, datetime
from pathlib import Path
from xml.etree import ElementTree
from ccma.domain.contributions import (
CLAIM_STATUS_LABELS,
allocated_total,
claim_balance,
claim_items,
claim_status,
claim_total,
money_text,
)
from ccma.domain.dates import format_date_for_display
from ccma.domain.models import MEMBERSHIP_STATUS_LABELS, Member
from ccma.storage.repository import MemberRepository
SUPPORTED_TEMPLATE_SUFFIXES = {".fodt", ".odt", ".ott"}
PLACEHOLDER_PATTERN = re.compile(r"\{\{\s*([a-z][a-z0-9_.]*)\s*\}\}", re.IGNORECASE)
class DocumentError(RuntimeError):
pass
@dataclass(frozen=True, slots=True)
class DocumentTemplate:
name: str
path: Path
relative_path: str
@dataclass(frozen=True, slots=True)
class GeneratedDocument:
path: Path
relative_path: str
sha256: str
class DocumentService:
def __init__(self, repository: MemberRepository):
self.repository = repository
self.templates_root = repository.root / "templates"
def list_templates(self) -> list[DocumentTemplate]:
templates: list[DocumentTemplate] = []
if not self.templates_root.is_dir():
return templates
for path in sorted(self.templates_root.rglob("*")):
if path.is_file() and path.suffix.casefold() in SUPPORTED_TEMPLATE_SUFFIXES:
relative = path.relative_to(self.templates_root).as_posix()
templates.append(DocumentTemplate(path.stem.replace("_", " "), path, relative))
return templates
def compatible_templates(
self,
*,
has_claim: bool,
has_reminder: bool,
) -> list[DocumentTemplate]:
compatible: list[DocumentTemplate] = []
for template in self.list_templates():
fields = _template_fields(template.path)
if not has_claim and any(field.startswith("claim.") for field in fields):
continue
if not has_reminder and any(field.startswith("reminder.") for field in fields):
continue
compatible.append(template)
return compatible
def generate(
self,
template: DocumentTemplate,
member_id: str,
*,
output_name: str,
claim_id: str | None = None,
reminder_id: str | None = None,
) -> GeneratedDocument:
template_path = template.path.resolve()
try:
template_path.relative_to(self.templates_root.resolve())
except ValueError as exc:
raise DocumentError("Das Template liegt nicht im Template-Verzeichnis.") from exc
if not template_path.is_file() or template_path.suffix.casefold() not in SUPPORTED_TEMPLATE_SUFFIXES:
raise DocumentError("Das ausgewählte OpenDocument-Template ist nicht verfügbar.")
member = self.repository.get_member(member_id)
data = None
claim = None
reminder = None
if claim_id:
data, claim = self.repository.get_claim(member_id, claim_id)
if reminder_id:
if not claim_id or data is None:
raise DocumentError("Eine Mahnung benötigt den Kontext ihrer Forderung.")
reminder = next(
(
item
for item in data.reminders
if str(item.get("claim_id", "")) == claim_id
and str(item.get("reminder_id", "")) == reminder_id
),
None,
)
if reminder is None:
raise DocumentError("Die ausgewählte Mahnung wurde nicht gefunden.")
values = _template_values(member, data=data, claim=claim, reminder=reminder)
destination_dir = self.repository.members_root / member_id / "files" / "documents"
destination_dir.mkdir(parents=True, exist_ok=True)
destination = _available_path(destination_dir, _safe_pdf_name(output_name))
with tempfile.TemporaryDirectory(prefix="ccma-document-") as temporary_name:
temporary = Path(temporary_name)
rendered = temporary / f"rendered{template_path.suffix.casefold()}"
_render_template(template_path, rendered, values)
converted = _convert_to_pdf(rendered, temporary)
temporary_destination = destination.with_name(f".{destination.name}.tmp")
try:
shutil.copyfile(converted, temporary_destination)
os.replace(temporary_destination, destination)
finally:
temporary_destination.unlink(missing_ok=True)
relative_path = destination.relative_to(
self.repository.members_root / member_id / "files"
).as_posix()
digest = hashlib.sha256(destination.read_bytes()).hexdigest()
if reminder_id and claim_id:
self.repository.register_reminder_document(
member_id,
claim_id,
reminder_id,
relative_path=relative_path,
sha256=digest,
template=template.relative_path,
)
else:
references = {"document": relative_path}
if claim_id:
references["claim_id"] = claim_id
self.repository.append_event(
member_id,
event_type="document_generated",
summary=f"Dokument erzeugt: {destination.name}",
actor_type="user",
actor_name="Vorstand",
references=references,
data={"template": template.relative_path, "sha256": digest},
)
return GeneratedDocument(destination, relative_path, digest)
def _template_values(
member: Member,
*,
data=None,
claim: dict | None = None,
reminder: dict | None = None,
) -> dict[str, str]:
values = {
"document.date": format_date_for_display(date.today().isoformat()),
"document.datetime": datetime.now().astimezone().strftime("%d.%m.%Y %H:%M"),
"member.id": member.member_id,
"member.number": member.member_number,
"member.first_name": member.first_name,
"member.last_name": member.last_name,
"member.full_name": member.display_name,
"member.email": member.email,
"member.birth_date": format_date_for_display(member.birth_date),
"member.status": MEMBERSHIP_STATUS_LABELS.get(member.status, member.status),
"member.accepted_at": format_date_for_display(member.accepted_at),
"member.started_at": format_date_for_display(member.membership_started_at),
}
if claim is not None and data is not None:
claim_id = str(claim.get("claim_id", ""))
item_lines = [
f"{item.get('description', '')}: {item.get('amount', '0.00')} EUR"
for item in claim_items(claim)
]
status = claim_status(data, claim)
values.update(
{
"claim.id": claim_id,
"claim.title": str(claim.get("title", "")),
"claim.due_date": format_date_for_display(str(claim.get("due_date", ""))),
"claim.total": f"{money_text(claim_total(claim))} EUR",
"claim.paid": f"{money_text(allocated_total(data, claim_id))} EUR",
"claim.balance": f"{money_text(claim_balance(data, claim))} EUR",
"claim.status": CLAIM_STATUS_LABELS.get(status, status),
"claim.items": "; ".join(item_lines),
}
)
if reminder is not None:
reminder_status_labels = {
"draft": "Entwurf",
"generated": "Dokument erzeugt",
"sent": "Versandt",
"cancelled": "Verworfen",
}
channel_labels = {"email": "E-Mail", "letter": "Brief", "personal": "Persönlich"}
status = str(reminder.get("status", "draft"))
channel = str(reminder.get("channel", ""))
values.update(
{
"reminder.id": str(reminder.get("reminder_id", "")),
"reminder.level": str(reminder.get("level", "")),
"reminder.name": str(reminder.get("name", "")),
"reminder.status": reminder_status_labels.get(status, status),
"reminder.created_at": _display_timestamp(str(reminder.get("created_at", ""))),
"reminder.sent_at": _display_timestamp(str(reminder.get("sent_at") or "")),
"reminder.payment_deadline": format_date_for_display(
str(reminder.get("payment_deadline") or "")
),
"reminder.payment_deadline_days": str(reminder.get("payment_deadline_days", "")),
"reminder.fee": f"{reminder.get('fee', '0.00')} EUR",
"reminder.detail": str(reminder.get("detail", "")),
"reminder.channel": channel_labels.get(channel, channel),
}
)
return values
def _render_template(source: Path, destination: Path, values: dict[str, str]) -> None:
if source.suffix.casefold() == ".fodt":
try:
content = source.read_bytes()
except OSError as exc:
raise DocumentError(f"Template konnte nicht gelesen werden: {exc}") from exc
destination.write_bytes(_replace_xml_placeholders(content, values))
return
try:
with zipfile.ZipFile(source, "r") as archive, zipfile.ZipFile(destination, "w") as output:
for info in archive.infolist():
content = archive.read(info.filename)
if info.filename in {"content.xml", "styles.xml"}:
content = _replace_xml_placeholders(content, values)
output.writestr(info, content)
except (OSError, zipfile.BadZipFile) as exc:
raise DocumentError(f"OpenDocument-Template ist beschädigt: {exc}") from exc
def _template_fields(source: Path) -> set[str]:
try:
if source.suffix.casefold() == ".fodt":
contents = [source.read_bytes()]
else:
with zipfile.ZipFile(source, "r") as archive:
contents = [
archive.read(name)
for name in ("content.xml", "styles.xml")
if name in archive.namelist()
]
except (OSError, zipfile.BadZipFile):
return set()
fields: set[str] = set()
for content in contents:
try:
root = ElementTree.fromstring(content)
except ElementTree.ParseError:
continue
for paragraph in root.iter():
if _local_name(paragraph.tag) in {"p", "h"}:
combined = "".join(value for _node, _attribute, value in _text_slots(paragraph))
fields.update(match.group(1) for match in PLACEHOLDER_PATTERN.finditer(combined))
return fields
def _replace_xml_placeholders(content: bytes, values: dict[str, str]) -> bytes:
try:
root = ElementTree.fromstring(content)
except ElementTree.ParseError as exc:
raise DocumentError(f"Template-XML ist beschädigt: {exc}") from exc
unknown: set[str] = set()
for paragraph in root.iter():
if _local_name(paragraph.tag) not in {"p", "h"}:
continue
slots = _text_slots(paragraph)
combined = "".join(value for _element, _attribute, value in slots)
matches = list(PLACEHOLDER_PATTERN.finditer(combined))
unknown.update(match.group(1) for match in matches if match.group(1) not in values)
_replace_matches(slots, matches, values)
if unknown:
names = ", ".join(sorted(unknown))
raise DocumentError(f"Unbekannte oder im Kontext nicht verfügbare Platzhalter: {names}")
return ElementTree.tostring(root, encoding="utf-8", xml_declaration=True)
def _text_slots(element) -> list[tuple[object, str, str]]:
slots: list[tuple[object, str, str]] = []
def collect(node) -> None:
if node.text:
slots.append((node, "text", node.text))
for child in node:
collect(child)
if child.tail:
slots.append((child, "tail", child.tail))
collect(element)
return slots
def _replace_matches(slots, matches, values: dict[str, str]) -> None:
boundaries: list[tuple[int, int]] = []
offset = 0
for _element, _attribute, text in slots:
boundaries.append((offset, offset + len(text)))
offset += len(text)
for match in reversed(matches):
key = match.group(1)
if key not in values:
continue
start_index, start_offset = _slot_at(boundaries, match.start())
end_index, end_offset = _slot_at(boundaries, match.end() - 1)
start_element, start_attribute, start_text = slots[start_index]
end_element, end_attribute, end_text = slots[end_index]
end_offset += 1
if start_index == end_index:
replacement = start_text[:start_offset] + values[key] + start_text[end_offset:]
setattr(start_element, start_attribute, replacement)
slots[start_index] = (start_element, start_attribute, replacement)
continue
replacement = start_text[:start_offset] + values[key]
setattr(start_element, start_attribute, replacement)
slots[start_index] = (start_element, start_attribute, replacement)
for index in range(start_index + 1, end_index):
current_element, current_attribute, _current_text = slots[index]
setattr(current_element, current_attribute, "")
slots[index] = (current_element, current_attribute, "")
suffix = end_text[end_offset:]
setattr(end_element, end_attribute, suffix)
slots[end_index] = (end_element, end_attribute, suffix)
def _slot_at(boundaries: list[tuple[int, int]], position: int) -> tuple[int, int]:
for index, (start, end) in enumerate(boundaries):
if start <= position < end:
return index, position - start
raise DocumentError("Platzhalter konnte im Template nicht zugeordnet werden.")
def _convert_to_pdf(source: Path, output_directory: Path) -> Path:
executable = shutil.which("soffice") or shutil.which("libreoffice")
if not executable:
raise DocumentError("LibreOffice/OpenOffice wurde nicht gefunden; PDF-Erzeugung ist nicht möglich.")
profile = output_directory / "libreoffice-profile"
command = [
executable,
f"-env:UserInstallation={profile.resolve().as_uri()}",
"--headless",
"--convert-to",
"pdf",
"--outdir",
str(output_directory),
str(source),
]
try:
result = subprocess.run(command, capture_output=True, text=True, timeout=60, check=False)
except (OSError, subprocess.TimeoutExpired) as exc:
raise DocumentError(f"PDF-Konvertierung konnte nicht ausgeführt werden: {exc}") from exc
converted = output_directory / f"{source.stem}.pdf"
if result.returncode != 0 or not converted.is_file():
detail = (result.stderr or result.stdout).strip() or "keine PDF-Datei erzeugt"
raise DocumentError(f"LibreOffice konnte das Dokument nicht konvertieren: {detail}")
return converted
def _safe_pdf_name(value: str) -> str:
stem = Path(value.strip()).stem
stem = re.sub(r"[^A-Za-z0-9ÄÖÜäöüß._ -]+", "-", stem).strip(" .-")
if not stem:
stem = f"Dokument-{date.today().isoformat()}"
return f"{stem}.pdf"
def _available_path(directory: Path, filename: str) -> Path:
candidate = directory / filename
counter = 2
while candidate.exists():
candidate = directory / f"{Path(filename).stem}-{counter}.pdf"
counter += 1
return candidate
def _display_timestamp(value: str) -> str:
if not value:
return ""
try:
return datetime.fromisoformat(value).strftime("%d.%m.%Y %H:%M")
except ValueError:
return value[:16]
def _local_name(tag: str) -> str:
return tag.rsplit("}", 1)[-1]