import json from decimal import Decimal import pytest from ccma.domain.contributions import claim_balance from ccma.services.housekeeper import Housekeeper from ccma.storage.repository import ( MemberRepository, RepositoryError, format_member_number, validate_member_number_pattern, ) def test_repository_creates_transparent_member_record(tmp_path) -> None: repository = MemberRepository(tmp_path / "store") repository.initialize() assert (repository.root / "templates").is_dir() member = repository.create_member( first_name="Ada", last_name="Lovelace", nickname="Enchantress", email="ada@example.org", birth_date="1990-12-10", member_number="0042", ) member_dir = repository.members_root / member.member_id assert (member_dir / "member.json").is_file() assert (member_dir / "contributions.json").is_file() assert (member_dir / "events.jsonl").is_file() assert (member_dir / "files").is_dir() assert repository.validate() == [] raw = json.loads((member_dir / "member.json").read_text(encoding="utf-8")) assert raw["person"]["first_name"] == "Ada" assert raw["person"]["nickname"] == "Enchantress" assert raw["schema_version"] == 1 assert raw["content_hash"] def test_search_matches_name_email_number_and_german_birth_date(tmp_path) -> None: repository = MemberRepository(tmp_path) repository.initialize() member = repository.create_member( first_name="Jörg", last_name="Müller", nickname="Jogi", email="joerg.mueller@example.org", birth_date="1990-04-23", member_number="C3-007", ) for query in ("Jorg Muller", "Jogi", "mueller@example.org", "C3-007", "23.04.1990"): assert [result.member_id for result in repository.search(query)] == [member.member_id] def test_events_are_appended_and_changes_do_not_leak_values(tmp_path) -> None: repository = MemberRepository(tmp_path) repository.initialize() member = repository.create_member(first_name="Alice", last_name="Example", email="old@example.org") member.email = "new@example.org" repository.save_member(member) repository.append_event( member.member_id, event_type="board_comment", summary="Telefonisch erreicht", actor_type="user", actor_name="Vorstand", ) events = repository.get_events(member.member_id) assert [event.event_type for event in events] == [ "member_created", "member_data_changed", "board_comment", ] assert "E-Mail-Adresse" in events[1].summary assert "old@example.org" not in events[1].summary assert "new@example.org" not in events[1].summary def test_status_change_audit_contains_old_and_new_status(tmp_path) -> None: repository = MemberRepository(tmp_path) repository.initialize() member = repository.create_member(first_name="Status", last_name="Test") member.status = "active" repository.save_member(member) event = repository.get_events(member.member_id)[-1] assert event.summary == "Mitgliedsdaten geändert: Status von ANTRAG zu AKTIV" def test_repository_accepts_local_date_input_and_rejects_invalid_dates(tmp_path) -> None: repository = MemberRepository(tmp_path) repository.initialize() member = repository.create_member(first_name="Local", last_name="Date", birth_date="31.12.2000") assert member.birth_date == "2000-12-31" assert repository.get_member(member.member_id).birth_date == "2000-12-31" with pytest.raises(RepositoryError, match="gültiges Datum"): repository.create_member(first_name="Invalid", last_name="Date", birth_date="31.02.2000") def test_repository_reports_empty_contributions_file(tmp_path) -> None: repository = MemberRepository(tmp_path) repository.initialize() member = repository.create_member(first_name="Empty", last_name="Contributions") path = repository.members_root / member.member_id / "contributions.json" path.write_text("", encoding="utf-8") with pytest.raises(RepositoryError, match="contributions.json konnte nicht gelesen"): repository.get_contributions(member.member_id) assert any("contributions.json" in error for error in repository.validate()) def test_repository_rejects_structurally_invalid_member_json(tmp_path) -> None: repository = MemberRepository(tmp_path) repository.initialize() member = repository.create_member(first_name="Invalid", last_name="Structure") path = repository.members_root / member.member_id / "member.json" raw = json.loads(path.read_text(encoding="utf-8")) raw["person"] = [] path.write_text(json.dumps(raw), encoding="utf-8") with pytest.raises(RepositoryError, match="person muss ein JSON-Objekt sein"): repository.preflight_member_record(member.member_id) def test_member_path_rejects_traversal(tmp_path) -> None: repository = MemberRepository(tmp_path) repository.initialize() with pytest.raises(RepositoryError): repository.get_member("../outside") def test_automatic_member_numbers_are_sequential_and_preview_does_not_consume(tmp_path) -> None: repository = MemberRepository(tmp_path) repository.initialize() assert repository.preview_member_number() == "CCMA-0001" assert repository.preview_member_number() == "CCMA-0001" first = repository.create_member(first_name="First", last_name="Member") second = repository.create_member(first_name="Second", last_name="Member") assert first.member_number == "CCMA-0001" assert second.member_number == "CCMA-0002" assert repository.preview_member_number() == "CCMA-0003" def test_custom_pattern_and_manual_mode(tmp_path) -> None: repository = MemberRepository(tmp_path) repository.initialize() repository.save_member_number_policy(mode="automatic", pattern="MA-{year}-{number:03d}") automatic = repository.create_member(first_name="Auto", last_name="Member") assert automatic.member_number.startswith("MA-") assert automatic.member_number.endswith("-001") repository.save_member_number_policy(mode="manual", pattern="MA-{number:03d}") with pytest.raises(RepositoryError, match="erforderlich"): repository.create_member(first_name="Missing", last_name="Number") manual = repository.create_member(first_name="Manual", last_name="Member", member_number="SPECIAL-7") assert manual.member_number == "SPECIAL-7" with pytest.raises(RepositoryError, match="bereits vergeben"): repository.create_member(first_name="Duplicate", last_name="Member", member_number="special-7") @pytest.mark.parametrize("pattern", ["", "CCMA-{year}", "{unknown}-{number}", "{number!r}"]) def test_invalid_member_number_patterns_are_rejected(pattern) -> None: with pytest.raises(RepositoryError): validate_member_number_pattern(pattern) def test_member_number_formatter_supports_padding_and_year() -> None: assert format_member_number("CCMA-{year}-{number:05d}", 42, year=2026) == "CCMA-2026-00042" def test_member_address_and_sepa_data_are_structured_and_audited_without_values(tmp_path) -> None: repository = MemberRepository(tmp_path) repository.initialize() member = repository.create_member(first_name="Bank", last_name="Member", phone="0621 12345") member.street = "Teststraße 23" member.postal_code = "68159" member.city = "Mannheim" member.account_holder = "Bank Member" member.iban = "DE89 3704 0044 0532 0130 00" member.bic = "COBADEFFXXX" member.mandate_reference = "CCMA-M-42" member.mandate_signed_at = "21.06.2026" member.mandate_active = True repository.save_member(member) loaded = repository.get_member(member.member_id) assert loaded.iban == "DE89370400440532013000" assert loaded.mandate_signed_at == "2026-06-21" raw = json.loads((repository.members_root / member.member_id / "member.json").read_text()) assert raw["address"]["city"] == "Mannheim" assert raw["banking"]["mandate_active"] is True event = repository.get_events(member.member_id)[-1] assert "IBAN" in event.summary assert loaded.iban not in event.summary assert loaded.street not in event.summary def test_active_sepa_mandate_requires_valid_complete_data(tmp_path) -> None: repository = MemberRepository(tmp_path) repository.initialize() member = repository.create_member(first_name="Invalid", last_name="Mandate") member.iban = "DE001234" with pytest.raises(RepositoryError, match="IBAN"): repository.save_member(member) member.iban = "DE89370400440532013000" member.mandate_active = True with pytest.raises(RepositoryError, match="aktives Lastschriftmandat"): repository.save_member(member) def test_organization_sender_data_is_stored_centrally(tmp_path) -> None: repository = MemberRepository(tmp_path) repository.initialize() repository.save_organization( { "name": "Chaos Computer Club Mannheim e.V.", "street": "Testweg 1", "postal_code": "68159", "city": "Mannheim", "country": "Deutschland", "email": "vorstand@example.org", "phone": "", "website": "https://example.org", "iban": "DE89370400440532013000", "bic": "COBADEFFXXX", "creditor_id": "DE98ZZZ09999999999", } ) organization = repository.get_configuration()["organization"] assert organization["street"] == "Testweg 1" assert organization["iban"] == "DE89370400440532013000" def test_repository_creates_asset_record_and_events(tmp_path) -> None: repository = MemberRepository(tmp_path / "store") repository.initialize() asset = repository.create_asset( label="Clubraumschlüssel A12", category="key", inventory_number="KEY-A12", deposit_amount_default="25", ) asset_dir = repository.assets_root / asset.asset_id assert (asset_dir / "asset.json").is_file() assert (asset_dir / "events.jsonl").is_file() assert (asset_dir / "files").is_dir() loaded = repository.get_asset(asset.asset_id) assert loaded.label == "Clubraumschlüssel A12" assert loaded.deposit_amount_default == "25.00" def test_asset_can_be_assigned_and_returned_to_single_member(tmp_path) -> None: repository = MemberRepository(tmp_path) repository.initialize() member = repository.create_member(first_name="Ada", last_name="Lovelace") other = repository.create_member(first_name="Grace", last_name="Hopper") asset = repository.create_asset(label="Transponder 01") repository.assign_asset(asset.asset_id, member.member_id) assigned = repository.get_asset(asset.asset_id) assert assigned.status == "issued" assert assigned.current_holder_member_id == member.member_id assert [item.asset_id for item in repository.list_member_assets(member.member_id)] == [asset.asset_id] with pytest.raises(RepositoryError, match="bereits einem Mitglied zugeordnet"): repository.assign_asset(asset.asset_id, other.member_id) repository.return_asset(asset.asset_id) returned = repository.get_asset(asset.asset_id) assert returned.status == "available" assert returned.current_holder_member_id == "" assert repository.list_member_assets(member.member_id) == [] def test_asset_assignment_is_audited_on_asset_and_member(tmp_path) -> None: repository = MemberRepository(tmp_path) repository.initialize() member = repository.create_member(first_name="Key", last_name="Holder", member_number="0042") asset = repository.create_asset(label="Clubraumschlüssel") repository.assign_asset(asset.asset_id, member.member_id) repository.return_asset(asset.asset_id) asset_events = [event.event_type for event in repository.get_asset_events(asset.asset_id)] member_events = [event.event_type for event in repository.get_events(member.member_id)] assert asset_events == ["asset_created", "asset_issued", "asset_returned"] assert "asset_assigned" in member_events assert "asset_returned" in member_events def test_asset_deposit_cannot_change_while_issued(tmp_path) -> None: repository = MemberRepository(tmp_path) repository.initialize() member = repository.create_member(first_name="Ada", last_name="Lovelace") asset = repository.create_asset(label="Clubraumschlüssel", deposit_amount_default="25") repository.assign_asset(asset.asset_id, member.member_id) issued = repository.get_asset(asset.asset_id) issued.deposit_amount_default = "35" with pytest.raises(RepositoryError, match="Kaution kann nur geändert werden"): repository.save_asset(issued) def test_asset_deposit_can_change_when_not_issued(tmp_path) -> None: repository = MemberRepository(tmp_path) repository.initialize() asset = repository.create_asset(label="Clubraumschlüssel", deposit_amount_default="25") asset.deposit_amount_default = "35" repository.save_asset(asset) updated = repository.get_asset(asset.asset_id) assert updated.deposit_amount_default == "35.00" def test_manual_asset_claim_is_linked_to_member_and_asset(tmp_path) -> None: repository = MemberRepository(tmp_path) repository.initialize() member = repository.create_member(first_name="Ada", last_name="Lovelace") asset = repository.create_asset(label="Clubraumschlüssel") repository.assign_asset(asset.asset_id, member.member_id) result = repository.create_manual_claim( member.member_id, title="Kaution Clubraumschlüssel", amount="25.00", due_date="2026-06-26", description="Kaution für Schlüssel", claim_type="asset_deposit", references={"asset_id": asset.asset_id}, ) claim = result["claim"] loaded_claim = repository.get_contributions(member.member_id).claims[0] assert claim["claim_id"] == loaded_claim["claim_id"] assert loaded_claim["origin"]["asset_id"] == asset.asset_id assert repository.get_asset_events(asset.asset_id)[-1].event_type == "asset_claim_created" def test_negative_claim_can_be_settled_with_credit(tmp_path) -> None: repository = MemberRepository(tmp_path) repository.initialize() member = repository.create_member(first_name="Ada", last_name="Lovelace") asset = repository.create_asset(label="Clubraumschlüssel") repository.assign_asset(asset.asset_id, member.member_id) claim = repository.create_manual_claim( member.member_id, title="Kautionsrückzahlung", amount="-25.00", due_date="2026-06-26", claim_type="asset_refund", references={"asset_id": asset.asset_id}, )["claim"] repository.record_credit( member.member_id, str(claim["claim_id"]), credit_date="2026-06-26", amount="25.00", allocation_amount="25.00", reference="Bar ausgezahlt", ) data, loaded_claim = repository.get_claim(member.member_id, str(claim["claim_id"])) assert claim_balance(data, loaded_claim) == Decimal("0.00") assert data.credits[0]["amount"] == "25.00" def test_member_hash_warning_does_not_block_reading(tmp_path) -> None: repository = MemberRepository(tmp_path) repository.initialize() member = repository.create_member(first_name="Ada", last_name="Lovelace") path = repository.members_root / member.member_id / "member.json" raw = json.loads(path.read_text(encoding="utf-8")) raw["person"]["first_name"] = "Eve" path.write_text(json.dumps(raw, ensure_ascii=False, indent=2), encoding="utf-8") loaded = repository.get_member(member.member_id) warnings = repository.member_hash_warnings(member.member_id) assert loaded.first_name == "Eve" assert warnings assert "Hash fehlt oder stimmt nicht" in warnings[0] def test_refresh_member_record_hashes_clears_hash_warning(tmp_path) -> None: repository = MemberRepository(tmp_path) repository.initialize() member = repository.create_member(first_name="Ada", last_name="Lovelace") path = repository.members_root / member.member_id / "member.json" raw = json.loads(path.read_text(encoding="utf-8")) raw["person"]["first_name"] = "Eve" path.write_text(json.dumps(raw, ensure_ascii=False, indent=2), encoding="utf-8") assert repository.member_hash_warnings(member.member_id) repository.refresh_member_record_hashes(member.member_id) assert repository.member_hash_warnings(member.member_id) == [] def test_housekeeper_reports_json_hash_mismatch(tmp_path) -> None: repository = MemberRepository(tmp_path) repository.initialize() member = repository.create_member(first_name="Ada", last_name="Lovelace") path = repository.members_root / member.member_id / "member.json" raw = json.loads(path.read_text(encoding="utf-8")) raw["person"]["last_name"] = "Example" path.write_text(json.dumps(raw, ensure_ascii=False, indent=2), encoding="utf-8") findings = Housekeeper(repository).run() assert any(finding.code == "json_hash_mismatch" and finding.member_id == member.member_id for finding in findings)