215 lines
7.7 KiB
Rust
215 lines
7.7 KiB
Rust
use std::collections::HashSet;
|
|
|
|
use dd3_core::{Clock, Radio, ReceiverConfig, ReceiverPipeline, SenderConfig, SenderStateMachine, StatusSink};
|
|
use dd3_protocol::{encode_frame, MsgKind};
|
|
use dd3_sim::{
|
|
Endpoint, FakeClock, FakeRadioConfig, MockPublisher, MockStatusSink, MockStorage, RadioEndpoint,
|
|
ScenarioRunner,
|
|
};
|
|
|
|
#[derive(Default)]
|
|
struct NoopStatus;
|
|
impl StatusSink for NoopStatus {}
|
|
|
|
fn make_sync_ack(batch_id: u16, epoch: u32) -> Vec<u8> {
|
|
let payload = dd3_protocol::encode_ack_down_payload(dd3_protocol::AckDownPayload {
|
|
time_valid: true,
|
|
batch_id,
|
|
epoch_utc: epoch,
|
|
});
|
|
encode_frame(MsgKind::AckDown, 0xBEEF, &payload)
|
|
}
|
|
|
|
#[test]
|
|
fn no_receiver_present_stays_unsynced_and_sync_only() {
|
|
let bus = dd3_sim::fake_radio::shared_bus(FakeRadioConfig::default());
|
|
let mut sender_radio = RadioEndpoint::new(Endpoint::Sender, bus.clone());
|
|
let mut sender = SenderStateMachine::new(
|
|
SenderConfig {
|
|
short_id: 0xF19C,
|
|
sender_id: 1,
|
|
device_id: "dd3-F19C".to_string(),
|
|
},
|
|
0,
|
|
);
|
|
let mut clock = FakeClock::new(0, dd3_core::MIN_ACCEPTED_EPOCH_UTC, true);
|
|
let mut status = NoopStatus::default();
|
|
|
|
for _ in 0..90_000u64 {
|
|
clock.advance_ms(1);
|
|
sender.tick(&clock, &mut sender_radio, &mut status);
|
|
}
|
|
|
|
let stats = sender.stats();
|
|
assert!(!sender.is_time_acquired());
|
|
assert_eq!(0, stats.build_count);
|
|
assert_eq!(0, stats.queue_depth);
|
|
}
|
|
|
|
#[test]
|
|
fn receiver_bootstrap_unlocks_sampling_and_batches() {
|
|
let mut runner = ScenarioRunner::new(FakeRadioConfig::default());
|
|
|
|
// Initial unsynced period should result in sync request + ACK bootstrap.
|
|
runner.tick(20_000);
|
|
assert!(runner.sender.is_time_acquired());
|
|
|
|
// After unlock, sender should sample and send normal batches.
|
|
runner.tick(35_000);
|
|
assert!(!runner.publisher.state_messages.is_empty());
|
|
assert!(!runner.storage.csv_lines.is_empty());
|
|
}
|
|
|
|
#[test]
|
|
fn packet_loss_eventually_progresses_without_duplicate_commit() {
|
|
let mut runner = ScenarioRunner::new(FakeRadioConfig {
|
|
loss_pct: 20,
|
|
duplicate_pct: 10,
|
|
max_delay_ms: 80,
|
|
seed: 0xBADC0DE,
|
|
});
|
|
|
|
runner.tick(180_000);
|
|
|
|
let sender_stats = runner.sender.stats();
|
|
assert!(sender_stats.last_acked_batch_id > 0);
|
|
assert!(!runner.publisher.state_messages.is_empty());
|
|
|
|
let mut uniq = HashSet::new();
|
|
for (topic, payload) in &runner.publisher.state_messages {
|
|
let key = format!("{topic}|{payload}");
|
|
assert!(uniq.insert(key), "duplicate committed state payload detected");
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn ack_mismatch_is_ignored_and_inflight_unchanged() {
|
|
let bus = dd3_sim::fake_radio::shared_bus(FakeRadioConfig::default());
|
|
let mut sender_radio = RadioEndpoint::new(Endpoint::Sender, bus.clone());
|
|
let mut inject_radio = RadioEndpoint::new(Endpoint::Receiver, bus.clone());
|
|
|
|
let mut sender = SenderStateMachine::new(
|
|
SenderConfig {
|
|
short_id: 0xF19C,
|
|
sender_id: 1,
|
|
device_id: "dd3-F19C".to_string(),
|
|
},
|
|
0,
|
|
);
|
|
let mut clock = FakeClock::new(0, dd3_core::MIN_ACCEPTED_EPOCH_UTC, true);
|
|
let mut status = MockStatusSink::default();
|
|
|
|
// Trigger sync-request send so we have inflight batch_id=1 and ack pending.
|
|
clock.advance_ms(15_000);
|
|
sender.tick(&clock, &mut sender_radio, &mut status);
|
|
assert!(sender.stats().ack_pending);
|
|
|
|
let bad_ack = make_sync_ack(999, dd3_core::MIN_ACCEPTED_EPOCH_UTC + 5);
|
|
assert!(inject_radio.send_frame(&bad_ack));
|
|
|
|
sender.tick(&clock, &mut sender_radio, &mut status);
|
|
let stats = sender.stats();
|
|
assert!(stats.ack_pending, "wrong ack must not clear inflight");
|
|
assert_eq!(0, stats.last_acked_batch_id);
|
|
}
|
|
|
|
#[test]
|
|
fn backpressure_queue_depth_stays_bounded() {
|
|
let bus = dd3_sim::fake_radio::shared_bus(FakeRadioConfig::default());
|
|
let mut sender_radio = RadioEndpoint::new(Endpoint::Sender, bus.clone());
|
|
let mut inject_radio = RadioEndpoint::new(Endpoint::Receiver, bus.clone());
|
|
|
|
let mut sender = SenderStateMachine::new(
|
|
SenderConfig {
|
|
short_id: 0xF19C,
|
|
sender_id: 1,
|
|
device_id: "dd3-F19C".to_string(),
|
|
},
|
|
0,
|
|
);
|
|
let mut clock = FakeClock::new(0, dd3_core::MIN_ACCEPTED_EPOCH_UTC, true);
|
|
let mut status = MockStatusSink::default();
|
|
|
|
// Bootstrap one valid ACK so sender enters normal mode.
|
|
clock.advance_ms(15_000);
|
|
sender.tick(&clock, &mut sender_radio, &mut status);
|
|
let ack = make_sync_ack(1, dd3_core::MIN_ACCEPTED_EPOCH_UTC + 1);
|
|
assert!(inject_radio.send_frame(&ack));
|
|
sender.tick(&clock, &mut sender_radio, &mut status);
|
|
assert!(sender.is_time_acquired());
|
|
|
|
for _ in 0..600_000u64 {
|
|
clock.advance_ms(1);
|
|
sender.tick(&clock, &mut sender_radio, &mut status);
|
|
}
|
|
|
|
assert!(sender.stats().queue_depth as usize <= dd3_core::BATCH_QUEUE_DEPTH);
|
|
}
|
|
|
|
#[test]
|
|
fn duplicate_batch_updates_counters_but_suppresses_publish_and_log() {
|
|
let bus = dd3_sim::fake_radio::shared_bus(FakeRadioConfig::default());
|
|
let mut tx_radio = RadioEndpoint::new(Endpoint::Sender, bus.clone());
|
|
let mut rx_radio = RadioEndpoint::new(Endpoint::Receiver, bus.clone());
|
|
|
|
let mut receiver = ReceiverPipeline::new(ReceiverConfig {
|
|
short_id: 0xBEEF,
|
|
device_id: "dd3-BEEF".to_string(),
|
|
expected_sender_ids: vec![0xF19C],
|
|
});
|
|
|
|
let mut clock = FakeClock::new(0, dd3_core::MIN_ACCEPTED_EPOCH_UTC + 100, true);
|
|
let mut pubsub = MockPublisher::default();
|
|
let mut storage = MockStorage::default();
|
|
let mut status = MockStatusSink::default();
|
|
|
|
let payload = include_bytes!("../../../fixtures/protocol/payload_v3/sparse_5.bin");
|
|
let batch_id = 42u16;
|
|
let total_len = payload.len() as u16;
|
|
let chunk_size = dd3_core::BATCH_CHUNK_PAYLOAD;
|
|
let chunks = ((payload.len() + chunk_size - 1) / chunk_size) as u8;
|
|
|
|
{
|
|
let mut offset = 0usize;
|
|
for idx in 0..chunks {
|
|
let part_len = (payload.len() - offset).min(chunk_size);
|
|
let mut pkt_payload = Vec::new();
|
|
pkt_payload.extend_from_slice(&batch_id.to_le_bytes());
|
|
pkt_payload.push(idx);
|
|
pkt_payload.push(chunks);
|
|
pkt_payload.extend_from_slice(&total_len.to_le_bytes());
|
|
pkt_payload.extend_from_slice(&payload[offset..offset + part_len]);
|
|
offset += part_len;
|
|
let frame = encode_frame(MsgKind::BatchUp, 0xF19C, &pkt_payload);
|
|
assert!(tx_radio.send_frame(&frame));
|
|
}
|
|
receiver.tick(&clock, &mut rx_radio, &mut pubsub, &mut storage, &mut status);
|
|
}
|
|
let first_state = pubsub.state_messages.len();
|
|
let first_csv = storage.csv_lines.len();
|
|
|
|
{
|
|
let mut offset = 0usize;
|
|
for idx in 0..chunks {
|
|
let part_len = (payload.len() - offset).min(chunk_size);
|
|
let mut pkt_payload = Vec::new();
|
|
pkt_payload.extend_from_slice(&batch_id.to_le_bytes());
|
|
pkt_payload.push(idx);
|
|
pkt_payload.push(chunks);
|
|
pkt_payload.extend_from_slice(&total_len.to_le_bytes());
|
|
pkt_payload.extend_from_slice(&payload[offset..offset + part_len]);
|
|
offset += part_len;
|
|
let frame = encode_frame(MsgKind::BatchUp, 0xF19C, &pkt_payload);
|
|
assert!(tx_radio.send_frame(&frame));
|
|
}
|
|
receiver.tick(&clock, &mut rx_radio, &mut pubsub, &mut storage, &mut status);
|
|
}
|
|
|
|
let statuses = receiver.sender_statuses();
|
|
assert_eq!(2, statuses[0].rx_batches_total);
|
|
assert_eq!(1, statuses[0].rx_batches_duplicate);
|
|
|
|
assert_eq!(first_state, pubsub.state_messages.len(), "duplicate should not republish state");
|
|
assert_eq!(first_csv, storage.csv_lines.len(), "duplicate should not duplicate csv log");
|
|
}
|