Bootstrap DD3 Rust port workspace with host-first compatibility tests

This commit is contained in:
2026-02-21 00:59:03 +01:00
parent d3f9a2e62d
commit d0212f4e38
63 changed files with 3914 additions and 0 deletions

View File

@@ -0,0 +1,8 @@
[package]
name = "dd3_core"
version = "0.1.0"
edition = "2021"
[dependencies]
dd3_protocol = { path = "../dd3_protocol" }
dd3_contracts = { path = "../dd3_contracts" }

View File

@@ -0,0 +1,15 @@
pub const MIN_ACCEPTED_EPOCH_UTC: u32 = 1_769_904_000;
pub const SYNC_REQUEST_INTERVAL_MS: u32 = 15_000;
pub const METER_SAMPLE_INTERVAL_MS: u32 = 1_000;
pub const METER_SEND_INTERVAL_MS: u32 = 30_000;
pub const BATCH_MAX_RETRIES: u8 = 2;
pub const BATCH_QUEUE_DEPTH: usize = 10;
pub const ACK_REPEAT_COUNT: u8 = 3;
pub const ACK_REPEAT_DELAY_MS: u32 = 200;
pub const METER_BATCH_MAX_SAMPLES: usize = 30;
pub const BATCH_HEADER_SIZE: usize = 6;
pub const BATCH_CHUNK_PAYLOAD: usize = dd3_protocol::LORA_MAX_PAYLOAD - BATCH_HEADER_SIZE;
pub const BATCH_MAX_COMPRESSED: usize = 4096;
pub const BATCH_RX_MARGIN_MS: u32 = 800;

View File

@@ -0,0 +1,11 @@
pub mod constants;
pub mod receiver;
pub mod sender;
pub mod traits;
pub mod types;
pub use constants::*;
pub use receiver::{ReceiverConfig, ReceiverPipeline, ReceiverStats, SenderStatus};
pub use sender::{SenderConfig, SenderPhase, SenderStateMachine, SenderStats};
pub use traits::{Clock, Publisher, Radio, StatusSink, Storage};
pub use types::{FaultCounters, MeterSample};

View File

@@ -0,0 +1,470 @@
use std::collections::BTreeMap;
use dd3_contracts::{
build_ha_discovery_payload, build_ha_discovery_topic, format_csv_line, mqtt_state_json,
CsvLineInput, HaDiscoverySpec, MqttState,
};
use dd3_protocol::{
decode_batch_v3, decode_frame, encode_ack_down_payload, encode_frame, push_chunk, MsgKind,
ReassemblyState, ReassemblyStatus,
};
use crate::constants::*;
use crate::traits::{Clock, Publisher, Radio, StatusSink, Storage};
use crate::types::{FaultCounters, MeterSample};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReceiverConfig {
pub short_id: u16,
pub device_id: String,
pub expected_sender_ids: Vec<u16>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct ReceiverStats {
pub receiver_decode_fail: u32,
pub receiver_lora_tx_fail: u32,
pub last_rx_reject: u8,
pub receiver_discovery_sent: bool,
}
#[derive(Debug, Clone, PartialEq)]
pub struct SenderStatus {
pub short_id: u16,
pub device_id: String,
pub last_data: Option<MeterSample>,
pub last_update_ts_utc: u32,
pub rx_batches_total: u32,
pub rx_batches_duplicate: u32,
pub rx_last_duplicate_ts_utc: u32,
pub has_data: bool,
}
impl SenderStatus {
fn new(short_id: u16) -> Self {
Self {
short_id,
device_id: format!("dd3-{short_id:04X}"),
last_data: None,
last_update_ts_utc: 0,
rx_batches_total: 0,
rx_batches_duplicate: 0,
rx_last_duplicate_ts_utc: 0,
has_data: false,
}
}
}
pub struct ReceiverPipeline {
config: ReceiverConfig,
sender_statuses: Vec<SenderStatus>,
last_batch_id_rx: Vec<u16>,
receiver_faults: FaultCounters,
receiver_last_error: u8,
last_rx_reject: u8,
reassembly: BTreeMap<u16, ReassemblyState>,
reassembly_buffer: BTreeMap<u16, Vec<u8>>,
sender_discovery_sent: Vec<bool>,
receiver_discovery_sent: bool,
}
impl ReceiverPipeline {
pub fn new(config: ReceiverConfig) -> Self {
let sender_statuses = config
.expected_sender_ids
.iter()
.copied()
.map(SenderStatus::new)
.collect::<Vec<_>>();
let sender_count = sender_statuses.len();
Self {
config,
sender_statuses,
last_batch_id_rx: vec![0; sender_count],
receiver_faults: FaultCounters::default(),
receiver_last_error: 0,
last_rx_reject: 0,
reassembly: BTreeMap::new(),
reassembly_buffer: BTreeMap::new(),
sender_discovery_sent: vec![false; sender_count],
receiver_discovery_sent: false,
}
}
pub fn stats(&self) -> ReceiverStats {
ReceiverStats {
receiver_decode_fail: self.receiver_faults.decode_fail,
receiver_lora_tx_fail: self.receiver_faults.lora_tx_fail,
last_rx_reject: self.last_rx_reject,
receiver_discovery_sent: self.receiver_discovery_sent,
}
}
pub fn sender_statuses(&self) -> &[SenderStatus] {
&self.sender_statuses
}
fn note_fault_decode(&mut self) {
self.receiver_faults.decode_fail = self.receiver_faults.decode_fail.saturating_add(1);
self.receiver_last_error = 2;
}
fn sender_idx_from_short_id(&self, short_id: u16) -> Option<usize> {
self.config
.expected_sender_ids
.iter()
.position(|expected| *expected == short_id)
}
fn short_id_from_sender_id(&self, sender_id: u16) -> Option<u16> {
if sender_id == 0 {
return None;
}
self.config
.expected_sender_ids
.get(sender_id as usize - 1)
.copied()
}
fn compute_batch_rx_timeout_ms(total_len: u16, chunk_count: u8) -> u32 {
if total_len == 0 || chunk_count == 0 {
return 10_000;
}
let max_chunk_payload = (total_len as usize).min(BATCH_CHUNK_PAYLOAD);
let payload_len = BATCH_HEADER_SIZE + max_chunk_payload;
let packet_len = 3 + payload_len + 2;
let per_chunk_toa_ms = 120 + (packet_len as u32) * 6;
(chunk_count as u32)
.saturating_mul(per_chunk_toa_ms)
.saturating_add(BATCH_RX_MARGIN_MS)
.max(10_000)
}
fn send_batch_ack(&mut self, batch_id: u16, clock: &impl Clock, radio: &mut impl Radio) {
let epoch = clock.now_utc();
let time_valid = clock.is_time_synced() && epoch >= MIN_ACCEPTED_EPOCH_UTC;
let ack_payload = encode_ack_down_payload(dd3_protocol::AckDownPayload {
time_valid,
batch_id,
epoch_utc: if time_valid { epoch } else { 0 },
});
let frame = encode_frame(MsgKind::AckDown, self.config.short_id, &ack_payload);
let repeats = if ACK_REPEAT_COUNT == 0 { 1 } else { ACK_REPEAT_COUNT };
for _ in 0..repeats {
if !radio.send_frame(&frame) {
self.receiver_faults.lora_tx_fail = self.receiver_faults.lora_tx_fail.saturating_add(1);
self.receiver_last_error = 3;
}
}
}
fn process_batch_packet(
&mut self,
pkt_short_id: u16,
payload: &[u8],
now_ms: u64,
) -> Result<Option<(u16, dd3_protocol::BatchInputV3)>, ()> {
if payload.len() < BATCH_HEADER_SIZE {
return Ok(None);
}
let batch_id = u16::from_le_bytes([payload[0], payload[1]]);
let chunk_index = payload[2];
let chunk_count = payload[3];
let total_len = u16::from_le_bytes([payload[4], payload[5]]);
let chunk_data = &payload[BATCH_HEADER_SIZE..];
let state = self.reassembly.entry(pkt_short_id).or_default();
let buffer = self
.reassembly_buffer
.entry(pkt_short_id)
.or_insert_with(|| vec![0u8; BATCH_MAX_COMPRESSED]);
let status = push_chunk(
state,
batch_id,
chunk_index,
chunk_count,
total_len,
chunk_data,
now_ms as u32,
Self::compute_batch_rx_timeout_ms(total_len, chunk_count),
BATCH_MAX_COMPRESSED as u16,
buffer,
);
match status {
ReassemblyStatus::InProgress => Ok(None),
ReassemblyStatus::ErrorReset => Ok(None),
ReassemblyStatus::Complete { complete_len } => {
let decoded = decode_batch_v3(&buffer[..complete_len as usize]).map_err(|_| ())?;
Ok(Some((batch_id, decoded)))
}
}
}
fn publish_discovery_bundle(&self, publisher: &mut impl Publisher, device_id: &str) {
let state_topic = format!("smartmeter/{device_id}/state");
let faults_topic = format!("smartmeter/{device_id}/faults");
let sensors = [
("energy", "Energy", Some("kWh"), Some("energy"), state_topic.as_str(), "{{ value_json.e_kwh }}"),
("power", "Power", Some("W"), Some("power"), state_topic.as_str(), "{{ value_json.p_w }}"),
("p1", "Power L1", Some("W"), Some("power"), state_topic.as_str(), "{{ value_json.p1_w }}"),
("p2", "Power L2", Some("W"), Some("power"), state_topic.as_str(), "{{ value_json.p2_w }}"),
("p3", "Power L3", Some("W"), Some("power"), state_topic.as_str(), "{{ value_json.p3_w }}"),
("bat_v", "Battery Voltage", Some("V"), Some("voltage"), state_topic.as_str(), "{{ value_json.bat_v }}"),
("bat_pct", "Battery", Some("%"), Some("battery"), state_topic.as_str(), "{{ value_json.bat_pct }}"),
("rssi", "LoRa RSSI", Some("dBm"), Some("signal_strength"), state_topic.as_str(), "{{ value_json.rssi }}"),
("snr", "LoRa SNR", Some("dB"), None, state_topic.as_str(), "{{ value_json.snr }}"),
("err_m", "Meter Read Errors", Some("count"), None, faults_topic.as_str(), "{{ value_json.err_m }}"),
("err_d", "Decode Errors", Some("count"), None, faults_topic.as_str(), "{{ value_json.err_d }}"),
("err_tx", "LoRa TX Errors", Some("count"), None, faults_topic.as_str(), "{{ value_json.err_tx }}"),
("err_last", "Last Error Code", None, None, faults_topic.as_str(), "{{ value_json.err_last }}"),
("err_last_text", "Last Error", None, None, faults_topic.as_str(), "{{ value_json.err_last_text }}"),
("err_last_age", "Last Error Age", Some("s"), None, faults_topic.as_str(), "{{ value_json.err_last_age }}"),
];
for (key, name, unit, device_class, topic, template) in sensors {
let spec = HaDiscoverySpec {
device_id,
key,
name,
unit,
device_class,
state_topic: topic,
value_template: template,
};
let discovery_topic = build_ha_discovery_topic(device_id, key);
let discovery_payload = build_ha_discovery_payload(&spec);
publisher.publish_discovery(&discovery_topic, &discovery_payload);
}
}
pub fn tick(
&mut self,
clock: &impl Clock,
radio: &mut impl Radio,
publisher: &mut impl Publisher,
storage: &mut impl Storage,
status_sink: &mut impl StatusSink,
) {
let now_ms = clock.now_ms();
while let Some(raw) = radio.recv_frame(0, now_ms) {
let frame = match decode_frame(&raw, MsgKind::AckDown as u8) {
Ok(frame) => frame,
Err(_) => {
self.last_rx_reject = 3;
continue;
}
};
if frame.msg_kind != MsgKind::BatchUp {
continue;
}
let packet = match self.process_batch_packet(frame.short_id, &frame.payload, now_ms) {
Ok(Some(pkt)) => pkt,
Ok(None) => continue,
Err(_) => {
self.note_fault_decode();
continue;
}
};
let (batch_id, batch) = packet;
let Some(sender_idx) = self.sender_idx_from_short_id(frame.short_id) else {
self.last_rx_reject = 6;
self.note_fault_decode();
continue;
};
let expected_sender_id = (sender_idx as u16) + 1;
if batch.sender_id != expected_sender_id {
self.last_rx_reject = 4;
self.note_fault_decode();
continue;
}
let duplicate = self.last_batch_id_rx[sender_idx] == batch_id;
{
let status = &mut self.sender_statuses[sender_idx];
status.rx_batches_total = status.rx_batches_total.saturating_add(1);
if duplicate {
status.rx_batches_duplicate = status.rx_batches_duplicate.saturating_add(1);
let dup_ts = if clock.now_utc() == 0 { batch.t_last } else { clock.now_utc() };
status.rx_last_duplicate_ts_utc = dup_ts;
}
}
self.send_batch_ack(batch_id, clock, radio);
if duplicate {
continue;
}
self.last_batch_id_rx[sender_idx] = batch_id;
if batch.n == 0 {
continue;
}
if batch.n as usize > METER_BATCH_MAX_SAMPLES {
self.note_fault_decode();
continue;
}
if batch.present_mask.count_ones() as u8 != batch.n {
self.note_fault_decode();
continue;
}
if batch.t_last < (METER_BATCH_MAX_SAMPLES as u32 - 1) || batch.t_last < MIN_ACCEPTED_EPOCH_UTC {
self.note_fault_decode();
continue;
}
let mut samples: Vec<MeterSample> = Vec::with_capacity(batch.n as usize);
let count = batch.n as usize;
let window_start = batch.t_last - (METER_BATCH_MAX_SAMPLES as u32 - 1);
let mut s = 0usize;
let short_id = if frame.short_id != 0 {
frame.short_id
} else {
self.short_id_from_sender_id(batch.sender_id).unwrap_or(0)
};
let device_id = format!("dd3-{short_id:04X}");
let bat_v = if batch.battery_mv > 0 {
batch.battery_mv as f32 / 1000.0
} else {
f32::NAN
};
for slot in 0..METER_BATCH_MAX_SAMPLES {
if (batch.present_mask & (1u32 << slot)) == 0 {
continue;
}
if s >= count {
self.note_fault_decode();
samples.clear();
break;
}
let ts_utc = window_start + slot as u32;
if ts_utc < MIN_ACCEPTED_EPOCH_UTC {
self.note_fault_decode();
samples.clear();
break;
}
let p1 = batch.p1_w[s] as f32;
let p2 = batch.p2_w[s] as f32;
let p3 = batch.p3_w[s] as f32;
let sample = MeterSample {
ts_utc,
energy_total_kwh: batch.energy_wh[s] as f32 / 1000.0,
phase_power_w: [p1, p2, p3],
total_power_w: p1 + p2 + p3,
battery_voltage_v: bat_v,
battery_percent: if bat_v.is_nan() { 0 } else { 75 },
link_rssi_dbm: -70,
link_snr_db: 7.0,
err_meter_read: batch.err_m as u32,
err_decode: batch.err_d as u32,
err_lora_tx: batch.err_tx as u32,
err_last: batch.err_last,
rx_reject_reason: batch.err_rx_reject,
};
samples.push(sample);
let csv_line = format_csv_line(&CsvLineInput {
ts_utc: sample.ts_utc,
ts_hms_local: "00:00:00",
p_w: sample.total_power_w,
p1_w: sample.phase_power_w[0],
p2_w: sample.phase_power_w[1],
p3_w: sample.phase_power_w[2],
e_kwh: sample.energy_total_kwh,
bat_v: sample.battery_voltage_v,
bat_pct: sample.battery_percent,
rssi: sample.link_rssi_dbm,
snr: sample.link_snr_db,
err_m: sample.err_meter_read,
err_d: sample.err_decode,
err_tx: sample.err_lora_tx,
err_last_text: Some(match sample.err_last {
1 => "meter",
2 => "decode",
3 => "loratx",
_ => "",
}),
include_error_text: s + 1 == count,
});
storage.append_csv(&device_id, &csv_line);
let state_payload = mqtt_state_json(&MqttState {
device_id: &device_id,
ts_utc: sample.ts_utc,
energy_total_kwh: sample.energy_total_kwh,
total_power_w: sample.total_power_w,
phase_power_w: sample.phase_power_w,
battery_voltage_v: sample.battery_voltage_v,
battery_percent: sample.battery_percent,
link_valid: true,
link_rssi_dbm: sample.link_rssi_dbm,
link_snr_db: sample.link_snr_db,
err_meter_read: sample.err_meter_read,
err_decode: sample.err_decode,
err_lora_tx: sample.err_lora_tx,
err_last: sample.err_last,
rx_reject_reason: sample.rx_reject_reason,
});
publisher.publish_state(&format!("smartmeter/{device_id}/state"), &state_payload);
s += 1;
}
if samples.len() != count {
self.note_fault_decode();
continue;
}
if !self.sender_discovery_sent[sender_idx] {
self.publish_discovery_bundle(publisher, &device_id);
self.sender_discovery_sent[sender_idx] = true;
}
let faults_payload = format!(
"{{\"err_m\":{},\"err_d\":{},\"err_tx\":{},\"err_last\":{},\"err_last_text\":\"{}\",\"err_last_age\":0}}",
batch.err_m,
batch.err_d,
batch.err_tx,
batch.err_last,
match batch.err_last {
1 => "meter",
2 => "decode",
3 => "loratx",
_ => "none",
}
);
publisher.publish_faults(&format!("smartmeter/{device_id}/faults"), &faults_payload);
{
let status = &mut self.sender_statuses[sender_idx];
status.last_data = samples.last().copied();
status.last_update_ts_utc = samples.last().map(|s| s.ts_utc).unwrap_or(0);
status.has_data = true;
}
}
if !self.receiver_discovery_sent {
self.publish_discovery_bundle(publisher, &self.config.device_id);
self.receiver_discovery_sent = true;
}
status_sink.receiver_status("RX_ACTIVE");
}
}

View File

@@ -0,0 +1,708 @@
use std::collections::VecDeque;
use dd3_protocol::{
decode_ack_down_payload, decode_frame, encode_frame, encode_batch_v3, AckDownPayload, BatchInputV3,
MsgKind,
};
use crate::constants::*;
use crate::traits::{Clock, Radio, StatusSink};
use crate::types::{FaultCounters, MeterSample};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SenderConfig {
pub short_id: u16,
pub sender_id: u16,
pub device_id: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SenderPhase {
Syncing,
Normal,
Catchup,
WaitAck,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct SenderStats {
pub queue_depth: u8,
pub build_count: u8,
pub inflight_batch_id: u16,
pub last_sent_batch_id: u16,
pub last_acked_batch_id: u16,
pub retry_count: u8,
pub ack_pending: bool,
pub ack_timeout_total: u32,
pub ack_retry_total: u32,
pub ack_miss_streak: u32,
pub rx_window_ms: u32,
}
#[derive(Debug, Clone)]
struct BatchBuffer {
batch_id: u16,
batch_id_valid: bool,
samples: Vec<MeterSample>,
}
#[derive(Debug, Clone)]
struct Inflight {
batch_id: u16,
sync_request: bool,
samples: Vec<MeterSample>,
encoded_payload: Vec<u8>,
}
pub struct SenderStateMachine {
config: SenderConfig,
phase: SenderPhase,
time_acquired: bool,
sender_faults_reset_after_first_sync: bool,
sender_faults_reset_hour_utc: Option<u32>,
queue: VecDeque<BatchBuffer>,
build_samples: Vec<MeterSample>,
inflight: Option<Inflight>,
last_sample_ms: u64,
last_send_ms: u64,
last_sync_request_ms: u64,
last_batch_send_ms: u64,
batch_id: u16,
last_sent_batch_id: u16,
last_acked_batch_id: u16,
ack_pending: bool,
retry_count: u8,
ack_timeout_ms: u32,
ack_timeout_total: u32,
ack_retry_total: u32,
ack_rtt_ewma_ms: u32,
ack_miss_streak: u32,
rx_window_ms: u32,
sample_tick: u32,
sender_faults: FaultCounters,
sender_last_error: u8,
sender_rx_reject_reason: u8,
last_tx_build_encode_error: bool,
}
impl SenderStateMachine {
pub fn new(config: SenderConfig, now_ms: u64) -> Self {
Self {
config,
phase: SenderPhase::Syncing,
time_acquired: false,
sender_faults_reset_after_first_sync: false,
sender_faults_reset_hour_utc: None,
queue: VecDeque::with_capacity(BATCH_QUEUE_DEPTH),
build_samples: Vec::with_capacity(METER_BATCH_MAX_SAMPLES),
inflight: None,
last_sample_ms: now_ms.saturating_sub(METER_SAMPLE_INTERVAL_MS as u64),
last_send_ms: now_ms,
last_sync_request_ms: now_ms.saturating_sub(SYNC_REQUEST_INTERVAL_MS as u64),
last_batch_send_ms: 0,
batch_id: 1,
last_sent_batch_id: 0,
last_acked_batch_id: 0,
ack_pending: false,
retry_count: 0,
ack_timeout_ms: 10_000,
ack_timeout_total: 0,
ack_retry_total: 0,
ack_rtt_ewma_ms: 0,
ack_miss_streak: 0,
rx_window_ms: 0,
sample_tick: 0,
sender_faults: FaultCounters::default(),
sender_last_error: 0,
sender_rx_reject_reason: 0,
last_tx_build_encode_error: false,
}
}
pub fn stats(&self) -> SenderStats {
SenderStats {
queue_depth: self.queue.len() as u8,
build_count: self.build_samples.len() as u8,
inflight_batch_id: self.inflight.as_ref().map(|i| i.batch_id).unwrap_or(0),
last_sent_batch_id: self.last_sent_batch_id,
last_acked_batch_id: self.last_acked_batch_id,
retry_count: self.retry_count,
ack_pending: self.ack_pending,
ack_timeout_total: self.ack_timeout_total,
ack_retry_total: self.ack_retry_total,
ack_miss_streak: self.ack_miss_streak,
rx_window_ms: self.rx_window_ms,
}
}
pub fn is_time_acquired(&self) -> bool {
self.time_acquired
}
fn reset_sender_fault_stats(&mut self) {
self.sender_faults = FaultCounters::default();
self.sender_last_error = 0;
}
fn reset_faults_on_first_sync(&mut self, synced_utc: u32) {
if self.sender_faults_reset_after_first_sync || synced_utc < MIN_ACCEPTED_EPOCH_UTC {
return;
}
self.reset_sender_fault_stats();
self.sender_faults_reset_after_first_sync = true;
self.sender_faults_reset_hour_utc = Some(synced_utc / 3600);
}
fn reset_faults_on_hour_boundary(&mut self, now_utc: u32) {
if !self.time_acquired || !self.sender_faults_reset_after_first_sync {
return;
}
if now_utc < MIN_ACCEPTED_EPOCH_UTC {
return;
}
let now_hour = now_utc / 3600;
match self.sender_faults_reset_hour_utc {
None => self.sender_faults_reset_hour_utc = Some(now_hour),
Some(prev) if now_hour > prev => {
self.reset_sender_fault_stats();
self.sender_faults_reset_hour_utc = Some(now_hour);
}
_ => {}
}
}
fn generate_sample(&mut self, now_utc: u32) -> MeterSample {
self.sample_tick = self.sample_tick.saturating_add(1);
let ts_utc = if now_utc >= MIN_ACCEPTED_EPOCH_UTC {
now_utc
} else {
MIN_ACCEPTED_EPOCH_UTC.saturating_add(self.sample_tick)
};
let wh = self.sample_tick as f32;
let p1 = self.sample_tick as f32;
let p2 = self.sample_tick as f32;
let p3 = self.sample_tick as f32;
MeterSample {
ts_utc,
energy_total_kwh: wh / 1000.0,
phase_power_w: [p1, p2, p3],
total_power_w: p1 + p2 + p3,
battery_voltage_v: 3.8,
battery_percent: 75,
link_rssi_dbm: -70,
link_snr_db: 7.0,
err_meter_read: self.sender_faults.meter_read_fail,
err_decode: self.sender_faults.decode_fail,
err_lora_tx: self.sender_faults.lora_tx_fail,
err_last: self.sender_last_error,
rx_reject_reason: self.sender_rx_reject_reason,
}
}
fn append_meter_sample(&mut self, sample: MeterSample) {
self.build_samples.push(sample);
if self.build_samples.len() >= METER_BATCH_MAX_SAMPLES {
let samples = std::mem::take(&mut self.build_samples);
self.batch_queue_enqueue(samples);
}
}
fn batch_queue_drop_oldest(&mut self) -> bool {
if self.queue.is_empty() {
return false;
}
let dropped = self.queue.pop_front();
if let Some(dropped_batch) = dropped {
let dropped_inflight = self
.inflight
.as_ref()
.map(|inflight| dropped_batch.batch_id_valid && inflight.batch_id == dropped_batch.batch_id)
.unwrap_or(false);
if dropped_inflight {
self.ack_pending = false;
self.retry_count = 0;
self.inflight = None;
}
return dropped_inflight;
}
false
}
fn batch_queue_enqueue(&mut self, samples: Vec<MeterSample>) {
if samples.is_empty() {
return;
}
if self.queue.len() >= BATCH_QUEUE_DEPTH {
if self.batch_queue_drop_oldest() {
self.batch_id = self.batch_id.wrapping_add(1);
}
}
self.queue.push_back(BatchBuffer {
batch_id: 0,
batch_id_valid: false,
samples,
});
}
fn kwh_to_wh_from_float(v: f32) -> u32 {
if v.is_nan() {
return 0;
}
let mut wh = (v as f64) * 1000.0;
if wh < 0.0 {
wh = 0.0;
}
if wh > u32::MAX as f64 {
u32::MAX
} else {
wh.round() as u32
}
}
fn float_to_i16_w_clamped(v: f32) -> i16 {
if v.is_nan() {
return 0;
}
let rounded = v.round();
if rounded < i16::MIN as f32 {
i16::MIN
} else if rounded > i16::MAX as f32 {
i16::MAX
} else {
rounded as i16
}
}
fn compute_airtime_ms(packet_len: usize) -> u32 {
let base = 120u32;
let per_byte = 6u32;
base.saturating_add((packet_len as u32).saturating_mul(per_byte))
}
fn compute_batch_ack_timeout_ms(payload_len: usize) -> u32 {
if payload_len == 0 {
return 10_000;
}
let chunk_count = ((payload_len + BATCH_CHUNK_PAYLOAD - 1) / BATCH_CHUNK_PAYLOAD) as u32;
let packet_len = 3 + BATCH_HEADER_SIZE + payload_len.min(BATCH_CHUNK_PAYLOAD) + 2;
let per_chunk_toa = Self::compute_airtime_ms(packet_len);
let timeout = chunk_count.saturating_mul(per_chunk_toa).saturating_add(BATCH_RX_MARGIN_MS);
timeout.max(10_000)
}
fn prepare_inflight_from_queue(&mut self) -> bool {
if self.inflight.is_some() {
return true;
}
let Some(front) = self.queue.front_mut() else {
return false;
};
if !front.batch_id_valid {
front.batch_id = self.batch_id;
front.batch_id_valid = true;
}
self.inflight = Some(Inflight {
batch_id: front.batch_id,
sync_request: false,
samples: front.samples.clone(),
encoded_payload: Vec::new(),
});
true
}
fn build_payload(&mut self, clock: &impl Clock) -> Result<Vec<u8>, ()> {
let inflight = self.inflight.as_ref().ok_or(())?;
let mut input = BatchInputV3::default();
input.sender_id = self.config.sender_id;
input.batch_id = inflight.batch_id;
input.t_last = if inflight.sync_request {
clock.now_utc()
} else {
inflight.samples.last().map(|s| s.ts_utc).unwrap_or(0)
};
input.present_mask = 0;
input.n = 0;
input.battery_mv = if inflight.sync_request {
3800
} else {
inflight
.samples
.last()
.map(|s| (s.battery_voltage_v * 1000.0).round() as u16)
.unwrap_or(0)
};
input.err_m = self.sender_faults.meter_read_fail.min(255) as u8;
input.err_d = self.sender_faults.decode_fail.min(255) as u8;
input.err_tx = self.sender_faults.lora_tx_fail.min(255) as u8;
input.err_last = self.sender_last_error;
input.err_rx_reject = self.sender_rx_reject_reason;
if !inflight.sync_request {
if input.t_last < (METER_BATCH_MAX_SAMPLES as u32 - 1) {
return Err(());
}
let window_start = input.t_last - (METER_BATCH_MAX_SAMPLES as u32 - 1);
let mut slot_samples: [Option<MeterSample>; METER_BATCH_MAX_SAMPLES] = [None; METER_BATCH_MAX_SAMPLES];
for sample in &inflight.samples {
if sample.ts_utc < window_start || sample.ts_utc > input.t_last {
continue;
}
let slot = (sample.ts_utc - window_start) as usize;
slot_samples[slot] = Some(*sample);
}
for (slot, sample_opt) in slot_samples.iter().enumerate() {
let Some(sample) = sample_opt else {
continue;
};
let out_idx = input.n as usize;
if out_idx >= METER_BATCH_MAX_SAMPLES {
return Err(());
}
input.present_mask |= 1u32 << slot;
input.n = input.n.saturating_add(1);
input.energy_wh[out_idx] = Self::kwh_to_wh_from_float(sample.energy_total_kwh);
input.p1_w[out_idx] = Self::float_to_i16_w_clamped(sample.phase_power_w[0]);
input.p2_w[out_idx] = Self::float_to_i16_w_clamped(sample.phase_power_w[1]);
input.p3_w[out_idx] = Self::float_to_i16_w_clamped(sample.phase_power_w[2]);
}
for i in 1..(input.n as usize) {
if input.energy_wh[i] < input.energy_wh[i - 1] {
input.energy_wh[i] = input.energy_wh[i - 1];
}
}
}
encode_batch_v3(&input).map_err(|_| ())
}
fn send_batch_payload(&mut self, radio: &mut impl Radio, data: &[u8], batch_id: u16) -> bool {
if data.is_empty() || data.len() > BATCH_MAX_COMPRESSED {
return false;
}
let chunk_count = ((data.len() + BATCH_CHUNK_PAYLOAD - 1) / BATCH_CHUNK_PAYLOAD) as u8;
if chunk_count == 0 {
return false;
}
let mut all_ok = true;
let mut offset = 0usize;
for chunk_index in 0..chunk_count {
let mut chunk_len = data.len() - offset;
if chunk_len > BATCH_CHUNK_PAYLOAD {
chunk_len = BATCH_CHUNK_PAYLOAD;
}
let mut payload = Vec::with_capacity(BATCH_HEADER_SIZE + chunk_len);
payload.extend_from_slice(&batch_id.to_le_bytes());
payload.push(chunk_index);
payload.push(chunk_count);
payload.extend_from_slice(&(data.len() as u16).to_le_bytes());
payload.extend_from_slice(&data[offset..offset + chunk_len]);
let frame = encode_frame(MsgKind::BatchUp, self.config.short_id, &payload);
let ok = radio.send_frame(&frame);
all_ok &= ok;
if !ok {
self.sender_faults.lora_tx_fail = self.sender_faults.lora_tx_fail.saturating_add(1);
self.sender_last_error = 3;
}
offset += chunk_len;
}
all_ok
}
fn send_inflight_batch(&mut self, clock: &impl Clock, radio: &mut impl Radio) -> bool {
self.last_tx_build_encode_error = false;
let payload = {
let needs_encode = self
.inflight
.as_ref()
.map(|i| i.encoded_payload.is_empty())
.unwrap_or(true);
if needs_encode {
let encoded = match self.build_payload(clock) {
Ok(v) => v,
Err(_) => {
self.last_tx_build_encode_error = true;
return false;
}
};
if let Some(inflight) = self.inflight.as_mut() {
inflight.encoded_payload = encoded;
}
}
self.inflight
.as_ref()
.map(|i| i.encoded_payload.clone())
.unwrap_or_default()
};
let batch_id = self.inflight.as_ref().map(|i| i.batch_id).unwrap_or(0);
self.ack_timeout_ms = Self::compute_batch_ack_timeout_ms(payload.len());
if self.send_batch_payload(radio, &payload, batch_id) {
self.last_batch_send_ms = clock.now_ms();
true
} else {
false
}
}
fn send_meter_batch(&mut self, clock: &impl Clock, radio: &mut impl Radio) -> bool {
if !self.prepare_inflight_from_queue() {
return false;
}
if let Some(inflight) = self.inflight.as_mut() {
inflight.sync_request = false;
}
let ok = self.send_inflight_batch(clock, radio);
if ok {
self.last_sent_batch_id = self.inflight.as_ref().map(|i| i.batch_id).unwrap_or(0);
self.ack_pending = true;
} else {
if self.last_tx_build_encode_error {
self.finish_inflight_batch(true);
self.sender_faults.decode_fail = self.sender_faults.decode_fail.saturating_add(1);
self.sender_last_error = 2;
} else {
self.inflight = None;
}
}
ok
}
fn send_sync_request(&mut self, clock: &impl Clock, radio: &mut impl Radio) -> bool {
if self.ack_pending {
return false;
}
self.inflight = Some(Inflight {
batch_id: self.batch_id,
sync_request: true,
samples: Vec::new(),
encoded_payload: Vec::new(),
});
let ok = self.send_inflight_batch(clock, radio);
if ok {
self.last_sent_batch_id = self.inflight.as_ref().map(|i| i.batch_id).unwrap_or(0);
self.ack_pending = true;
} else {
self.inflight = None;
}
ok
}
fn resend_inflight_batch(&mut self, clock: &impl Clock, radio: &mut impl Radio) -> bool {
if !self.ack_pending || self.inflight.is_none() {
return false;
}
self.send_inflight_batch(clock, radio)
}
fn finish_inflight_batch(&mut self, pop_queue: bool) {
if pop_queue && !self.queue.is_empty() {
let _ = self.queue.pop_front();
}
self.ack_pending = false;
self.retry_count = 0;
self.inflight = None;
if pop_queue {
self.batch_id = self.batch_id.wrapping_add(1);
}
}
fn handle_ack_window(&mut self, clock: &impl Clock, radio: &mut impl Radio) {
if !self.ack_pending {
return;
}
let ack_len = 3 + dd3_protocol::ACK_DOWN_PAYLOAD_LEN + 2;
let ack_air_ms = Self::compute_airtime_ms(ack_len);
let mut ack_window_first_ms = ack_air_ms.saturating_add(200);
if self.ack_rtt_ewma_ms > 0 {
ack_window_first_ms = ack_window_first_ms.max(self.ack_rtt_ewma_ms.saturating_add(150));
}
ack_window_first_ms = ack_window_first_ms
.saturating_add((self.ack_miss_streak.saturating_mul(150)).min(1200))
.clamp(600, 2500);
let mut ack_window_second_ms = ack_window_first_ms + (ack_window_first_ms / 2);
ack_window_second_ms = ack_window_second_ms
.max(ack_air_ms.saturating_add(400))
.min(5000);
let now_ms = clock.now_ms();
let mut got_first = false;
let mut frame = radio.recv_frame(ack_window_first_ms, now_ms);
if frame.is_some() {
got_first = true;
} else {
frame = radio.recv_frame(ack_window_second_ms, now_ms);
}
let rx_elapsed = if got_first {
ack_window_first_ms
} else {
ack_window_first_ms.saturating_add(ack_window_second_ms)
};
self.rx_window_ms = self.rx_window_ms.saturating_add(rx_elapsed);
let Some(raw) = frame else {
self.ack_miss_streak = self.ack_miss_streak.saturating_add(1);
return;
};
let parsed = match decode_frame(&raw, MsgKind::AckDown as u8) {
Ok(frame) => frame,
Err(_) => {
self.sender_rx_reject_reason = 3;
self.ack_miss_streak = self.ack_miss_streak.saturating_add(1);
return;
}
};
if parsed.msg_kind != MsgKind::AckDown {
self.sender_rx_reject_reason = 2;
self.ack_miss_streak = self.ack_miss_streak.saturating_add(1);
return;
}
let ack = match decode_ack_down_payload(&parsed.payload) {
Ok(a) => a,
Err(_) => {
self.sender_rx_reject_reason = 3;
self.ack_miss_streak = self.ack_miss_streak.saturating_add(1);
return;
}
};
if ack.batch_id != self.last_sent_batch_id {
self.sender_rx_reject_reason = 5;
self.ack_miss_streak = self.ack_miss_streak.saturating_add(1);
return;
}
self.last_acked_batch_id = ack.batch_id;
if self.ack_rtt_ewma_ms == 0 {
self.ack_rtt_ewma_ms = rx_elapsed;
} else {
self.ack_rtt_ewma_ms = (self.ack_rtt_ewma_ms.saturating_mul(3) + rx_elapsed + 1) / 4;
}
if ack.time_valid && ack.epoch_utc >= MIN_ACCEPTED_EPOCH_UTC {
self.time_acquired = true;
self.reset_faults_on_first_sync(ack.epoch_utc);
}
self.finish_inflight_batch(true);
self.ack_miss_streak = 0;
}
fn send_batch_ack_preview(clock: &impl Clock, batch_id: u16) -> AckDownPayload {
let epoch = clock.now_utc();
let time_valid = clock.is_time_synced() && epoch >= MIN_ACCEPTED_EPOCH_UTC;
AckDownPayload {
time_valid,
batch_id,
epoch_utc: if time_valid { epoch } else { 0 },
}
}
pub fn tick(&mut self, clock: &impl Clock, radio: &mut impl Radio, status: &mut impl StatusSink) {
let now_ms = clock.now_ms();
if self.time_acquired {
self.reset_faults_on_hour_boundary(clock.now_utc());
while now_ms.saturating_sub(self.last_sample_ms) >= METER_SAMPLE_INTERVAL_MS as u64 {
self.last_sample_ms = self.last_sample_ms.saturating_add(METER_SAMPLE_INTERVAL_MS as u64);
let sample = self.generate_sample(clock.now_utc());
self.append_meter_sample(sample);
}
if !self.ack_pending && now_ms.saturating_sub(self.last_send_ms) >= METER_SEND_INTERVAL_MS as u64 {
self.last_send_ms = now_ms;
if !self.build_samples.is_empty() {
let samples = std::mem::take(&mut self.build_samples);
self.batch_queue_enqueue(samples);
}
if !self.queue.is_empty() {
let _ = self.send_meter_batch(clock, radio);
}
}
if !self.ack_pending && self.queue.len() > 1 {
let _ = self.send_meter_batch(clock, radio);
}
} else if !self.ack_pending
&& now_ms.saturating_sub(self.last_sync_request_ms) >= SYNC_REQUEST_INTERVAL_MS as u64
{
self.last_sync_request_ms = now_ms;
let _ = self.send_sync_request(clock, radio);
}
self.handle_ack_window(clock, radio);
if self.ack_pending && now_ms.saturating_sub(self.last_batch_send_ms) >= self.ack_timeout_ms as u64 {
self.ack_timeout_total = self.ack_timeout_total.saturating_add(1);
if self.retry_count < BATCH_MAX_RETRIES {
self.retry_count = self.retry_count.saturating_add(1);
self.ack_retry_total = self.ack_retry_total.saturating_add(1);
let _ = self.resend_inflight_batch(clock, radio);
} else {
self.ack_pending = false;
self.retry_count = 0;
self.inflight = None;
self.sender_faults.lora_tx_fail = self.sender_faults.lora_tx_fail.saturating_add(1);
self.sender_last_error = 3;
}
}
self.phase = if self.ack_pending {
SenderPhase::WaitAck
} else if !self.time_acquired {
SenderPhase::Syncing
} else if self.queue.len() > 1 {
SenderPhase::Catchup
} else {
SenderPhase::Normal
};
let phase_text = match self.phase {
SenderPhase::Syncing => "SYNCING",
SenderPhase::Normal => "NORMAL",
SenderPhase::Catchup => "CATCHUP",
SenderPhase::WaitAck => "WAIT_ACK",
};
status.sender_phase(phase_text);
let _ = Self::send_batch_ack_preview(clock, self.last_sent_batch_id);
}
}

View File

@@ -0,0 +1,27 @@
use std::vec::Vec;
pub trait Clock {
fn now_ms(&self) -> u64;
fn now_utc(&self) -> u32;
fn is_time_synced(&self) -> bool;
}
pub trait Radio {
fn send_frame(&mut self, frame: &[u8]) -> bool;
fn recv_frame(&mut self, window_ms: u32, now_ms: u64) -> Option<Vec<u8>>;
}
pub trait Publisher {
fn publish_state(&mut self, device_id: &str, payload: &str);
fn publish_faults(&mut self, device_id: &str, payload: &str);
fn publish_discovery(&mut self, device_id: &str, payload: &str);
}
pub trait Storage {
fn append_csv(&mut self, device_id: &str, line: &str);
}
pub trait StatusSink {
fn sender_phase(&mut self, _phase: &str) {}
fn receiver_status(&mut self, _status: &str) {}
}

View File

@@ -0,0 +1,43 @@
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct FaultCounters {
pub meter_read_fail: u32,
pub decode_fail: u32,
pub lora_tx_fail: u32,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct MeterSample {
pub ts_utc: u32,
pub energy_total_kwh: f32,
pub phase_power_w: [f32; 3],
pub total_power_w: f32,
pub battery_voltage_v: f32,
pub battery_percent: u8,
pub link_rssi_dbm: i16,
pub link_snr_db: f32,
pub err_meter_read: u32,
pub err_decode: u32,
pub err_lora_tx: u32,
pub err_last: u8,
pub rx_reject_reason: u8,
}
impl Default for MeterSample {
fn default() -> Self {
Self {
ts_utc: 0,
energy_total_kwh: 0.0,
phase_power_w: [0.0, 0.0, 0.0],
total_power_w: 0.0,
battery_voltage_v: 0.0,
battery_percent: 0,
link_rssi_dbm: 0,
link_snr_db: f32::NAN,
err_meter_read: 0,
err_decode: 0,
err_lora_tx: 0,
err_last: 0,
rx_reject_reason: 0,
}
}
}