use std::collections::HashSet; use dd3_core::{Clock, Radio, ReceiverConfig, ReceiverPipeline, SenderConfig, SenderStateMachine, StatusSink}; use dd3_protocol::{encode_frame, MsgKind}; use dd3_sim::{ Endpoint, FakeClock, FakeRadioConfig, MockPublisher, MockStatusSink, MockStorage, RadioEndpoint, ScenarioRunner, }; #[derive(Default)] struct NoopStatus; impl StatusSink for NoopStatus {} fn make_sync_ack(batch_id: u16, epoch: u32) -> Vec { let payload = dd3_protocol::encode_ack_down_payload(dd3_protocol::AckDownPayload { time_valid: true, batch_id, epoch_utc: epoch, }); encode_frame(MsgKind::AckDown, 0xBEEF, &payload) } #[test] fn no_receiver_present_stays_unsynced_and_sync_only() { let bus = dd3_sim::fake_radio::shared_bus(FakeRadioConfig::default()); let mut sender_radio = RadioEndpoint::new(Endpoint::Sender, bus.clone()); let mut sender = SenderStateMachine::new( SenderConfig { short_id: 0xF19C, sender_id: 1, device_id: "dd3-F19C".to_string(), }, 0, ); let mut clock = FakeClock::new(0, dd3_core::MIN_ACCEPTED_EPOCH_UTC, true); let mut status = NoopStatus::default(); for _ in 0..90_000u64 { clock.advance_ms(1); sender.tick(&clock, &mut sender_radio, &mut status); } let stats = sender.stats(); assert!(!sender.is_time_acquired()); assert_eq!(0, stats.build_count); assert_eq!(0, stats.queue_depth); } #[test] fn receiver_bootstrap_unlocks_sampling_and_batches() { let mut runner = ScenarioRunner::new(FakeRadioConfig::default()); // Initial unsynced period should result in sync request + ACK bootstrap. runner.tick(20_000); assert!(runner.sender.is_time_acquired()); // After unlock, sender should sample and send normal batches. runner.tick(35_000); assert!(!runner.publisher.state_messages.is_empty()); assert!(!runner.storage.csv_lines.is_empty()); } #[test] fn packet_loss_eventually_progresses_without_duplicate_commit() { let mut runner = ScenarioRunner::new(FakeRadioConfig { loss_pct: 20, duplicate_pct: 10, max_delay_ms: 80, seed: 0xBADC0DE, }); runner.tick(180_000); let sender_stats = runner.sender.stats(); assert!(sender_stats.last_acked_batch_id > 0); assert!(!runner.publisher.state_messages.is_empty()); let mut uniq = HashSet::new(); for (topic, payload) in &runner.publisher.state_messages { let key = format!("{topic}|{payload}"); assert!(uniq.insert(key), "duplicate committed state payload detected"); } } #[test] fn ack_mismatch_is_ignored_and_inflight_unchanged() { let bus = dd3_sim::fake_radio::shared_bus(FakeRadioConfig::default()); let mut sender_radio = RadioEndpoint::new(Endpoint::Sender, bus.clone()); let mut inject_radio = RadioEndpoint::new(Endpoint::Receiver, bus.clone()); let mut sender = SenderStateMachine::new( SenderConfig { short_id: 0xF19C, sender_id: 1, device_id: "dd3-F19C".to_string(), }, 0, ); let mut clock = FakeClock::new(0, dd3_core::MIN_ACCEPTED_EPOCH_UTC, true); let mut status = MockStatusSink::default(); // Trigger sync-request send so we have inflight batch_id=1 and ack pending. clock.advance_ms(15_000); sender.tick(&clock, &mut sender_radio, &mut status); assert!(sender.stats().ack_pending); let bad_ack = make_sync_ack(999, dd3_core::MIN_ACCEPTED_EPOCH_UTC + 5); assert!(inject_radio.send_frame(&bad_ack)); sender.tick(&clock, &mut sender_radio, &mut status); let stats = sender.stats(); assert!(stats.ack_pending, "wrong ack must not clear inflight"); assert_eq!(0, stats.last_acked_batch_id); } #[test] fn backpressure_queue_depth_stays_bounded() { let bus = dd3_sim::fake_radio::shared_bus(FakeRadioConfig::default()); let mut sender_radio = RadioEndpoint::new(Endpoint::Sender, bus.clone()); let mut inject_radio = RadioEndpoint::new(Endpoint::Receiver, bus.clone()); let mut sender = SenderStateMachine::new( SenderConfig { short_id: 0xF19C, sender_id: 1, device_id: "dd3-F19C".to_string(), }, 0, ); let mut clock = FakeClock::new(0, dd3_core::MIN_ACCEPTED_EPOCH_UTC, true); let mut status = MockStatusSink::default(); // Bootstrap one valid ACK so sender enters normal mode. clock.advance_ms(15_000); sender.tick(&clock, &mut sender_radio, &mut status); let ack = make_sync_ack(1, dd3_core::MIN_ACCEPTED_EPOCH_UTC + 1); assert!(inject_radio.send_frame(&ack)); sender.tick(&clock, &mut sender_radio, &mut status); assert!(sender.is_time_acquired()); for _ in 0..600_000u64 { clock.advance_ms(1); sender.tick(&clock, &mut sender_radio, &mut status); } assert!(sender.stats().queue_depth as usize <= dd3_core::BATCH_QUEUE_DEPTH); } #[test] fn duplicate_batch_updates_counters_but_suppresses_publish_and_log() { let bus = dd3_sim::fake_radio::shared_bus(FakeRadioConfig::default()); let mut tx_radio = RadioEndpoint::new(Endpoint::Sender, bus.clone()); let mut rx_radio = RadioEndpoint::new(Endpoint::Receiver, bus.clone()); let mut receiver = ReceiverPipeline::new(ReceiverConfig { short_id: 0xBEEF, device_id: "dd3-BEEF".to_string(), expected_sender_ids: vec![0xF19C], }); let mut clock = FakeClock::new(0, dd3_core::MIN_ACCEPTED_EPOCH_UTC + 100, true); let mut pubsub = MockPublisher::default(); let mut storage = MockStorage::default(); let mut status = MockStatusSink::default(); let payload = include_bytes!("../../../fixtures/protocol/payload_v3/sparse_5.bin"); let batch_id = 42u16; let total_len = payload.len() as u16; let chunk_size = dd3_core::BATCH_CHUNK_PAYLOAD; let chunks = ((payload.len() + chunk_size - 1) / chunk_size) as u8; { let mut offset = 0usize; for idx in 0..chunks { let part_len = (payload.len() - offset).min(chunk_size); let mut pkt_payload = Vec::new(); pkt_payload.extend_from_slice(&batch_id.to_le_bytes()); pkt_payload.push(idx); pkt_payload.push(chunks); pkt_payload.extend_from_slice(&total_len.to_le_bytes()); pkt_payload.extend_from_slice(&payload[offset..offset + part_len]); offset += part_len; let frame = encode_frame(MsgKind::BatchUp, 0xF19C, &pkt_payload); assert!(tx_radio.send_frame(&frame)); } receiver.tick(&clock, &mut rx_radio, &mut pubsub, &mut storage, &mut status); } let first_state = pubsub.state_messages.len(); let first_csv = storage.csv_lines.len(); { let mut offset = 0usize; for idx in 0..chunks { let part_len = (payload.len() - offset).min(chunk_size); let mut pkt_payload = Vec::new(); pkt_payload.extend_from_slice(&batch_id.to_le_bytes()); pkt_payload.push(idx); pkt_payload.push(chunks); pkt_payload.extend_from_slice(&total_len.to_le_bytes()); pkt_payload.extend_from_slice(&payload[offset..offset + part_len]); offset += part_len; let frame = encode_frame(MsgKind::BatchUp, 0xF19C, &pkt_payload); assert!(tx_radio.send_frame(&frame)); } receiver.tick(&clock, &mut rx_radio, &mut pubsub, &mut storage, &mut status); } let statuses = receiver.sender_statuses(); assert_eq!(2, statuses[0].rx_batches_total); assert_eq!(1, statuses[0].rx_batches_duplicate); assert_eq!(first_state, pubsub.state_messages.len(), "duplicate should not republish state"); assert_eq!(first_csv, storage.csv_lines.len(), "duplicate should not duplicate csv log"); }