diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2ed7808 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +/target +**/target +Cargo.lock +.fuzz/ diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..1f395cd --- /dev/null +++ b/.gitmodules @@ -0,0 +1,4 @@ +[submodule "dd3-cpp"] + path = vendor/dd3-cpp + url = https://git.mannheim.ccc.de/C3MA/DD3-LoRa-Bridge-MultiSender.git + branch = lora-refactor diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..62e7f9c --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,19 @@ +[workspace] +resolver = "2" +members = [ + "crates/dd3_protocol", + "crates/dd3_contracts", + "crates/dd3_core", + "crates/dd3_sim", + "crates/dd3_firmware", + "crates/xtask", +] + +[workspace.package] +edition = "2021" +license = "MIT" +version = "0.1.0" +authors = ["AcidBurns"] + +[workspace.lints.rust] +unsafe_code = "forbid" \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..fbc64e9 --- /dev/null +++ b/Makefile @@ -0,0 +1,11 @@ +.PHONY: test lint fuzz-smoke + +test: + cargo test --workspace + +lint: + cargo fmt --all -- --check + cargo clippy --workspace --all-targets -- -D warnings + +fuzz-smoke: + @where cargo-fuzz >NUL 2>&1 && cargo fuzz run frame_decode -- -max_total_time=5 || echo cargo-fuzz not installed; skipping \ No newline at end of file diff --git a/crates/dd3_contracts/Cargo.toml b/crates/dd3_contracts/Cargo.toml new file mode 100644 index 0000000..b392dd7 --- /dev/null +++ b/crates/dd3_contracts/Cargo.toml @@ -0,0 +1,6 @@ +[package] +name = "dd3_contracts" +version = "0.1.0" +edition = "2021" + +[dependencies] \ No newline at end of file diff --git a/crates/dd3_contracts/src/csv.rs b/crates/dd3_contracts/src/csv.rs new file mode 100644 index 0000000..307bcf4 --- /dev/null +++ b/crates/dd3_contracts/src/csv.rs @@ -0,0 +1,69 @@ +use std::string::String; + +pub const CSV_HEADER: &str = + "ts_utc,ts_hms_local,p_w,p1_w,p2_w,p3_w,e_kwh,bat_v,bat_pct,rssi,snr,err_m,err_d,err_tx,err_last"; + +fn round_to(value: f32, decimals: u32) -> f32 { + let scale = 10f32.powi(decimals as i32); + (value * scale).round() / scale +} + +#[derive(Debug, Clone, PartialEq)] +pub struct CsvLineInput<'a> { + pub ts_utc: u32, + pub ts_hms_local: &'a str, + pub p_w: f32, + pub p1_w: f32, + pub p2_w: f32, + pub p3_w: f32, + pub e_kwh: f32, + pub bat_v: f32, + pub bat_pct: u8, + pub rssi: i16, + pub snr: f32, + pub err_m: u32, + pub err_d: u32, + pub err_tx: u32, + pub err_last_text: Option<&'a str>, + pub include_error_text: bool, +} + +pub fn format_csv_line(input: &CsvLineInput<'_>) -> String { + let mut line = String::new(); + line.push_str(&input.ts_utc.to_string()); + line.push(','); + line.push_str(input.ts_hms_local); + line.push(','); + line.push_str(&format!("{:.1}", round_to(input.p_w, 1))); + line.push(','); + line.push_str(&format!("{:.1}", round_to(input.p1_w, 1))); + line.push(','); + line.push_str(&format!("{:.1}", round_to(input.p2_w, 1))); + line.push(','); + line.push_str(&format!("{:.1}", round_to(input.p3_w, 1))); + line.push(','); + line.push_str(&format!("{:.3}", round_to(input.e_kwh, 3))); + line.push(','); + line.push_str(&format!("{:.2}", round_to(input.bat_v, 2))); + line.push(','); + line.push_str(&input.bat_pct.to_string()); + line.push(','); + line.push_str(&input.rssi.to_string()); + line.push(','); + if !input.snr.is_nan() { + line.push_str(&format!("{:.1}", round_to(input.snr, 1))); + } + line.push(','); + line.push_str(&input.err_m.to_string()); + line.push(','); + line.push_str(&input.err_d.to_string()); + line.push(','); + line.push_str(&input.err_tx.to_string()); + line.push(','); + if input.include_error_text { + if let Some(err_text) = input.err_last_text { + line.push_str(err_text); + } + } + line +} diff --git a/crates/dd3_contracts/src/escape.rs b/crates/dd3_contracts/src/escape.rs new file mode 100644 index 0000000..f40847a --- /dev/null +++ b/crates/dd3_contracts/src/escape.rs @@ -0,0 +1,34 @@ +use std::string::String; + +pub fn html_escape(input: &str) -> String { + let mut out = String::with_capacity(input.len() + 8); + for ch in input.chars() { + match ch { + '&' => out.push_str("&"), + '<' => out.push_str("<"), + '>' => out.push_str(">"), + '"' => out.push_str("""), + '\'' => out.push_str("'"), + _ => out.push(ch), + } + } + out +} + +pub fn url_encode_component(input: &str) -> String { + let mut out = String::with_capacity(input.len() * 3); + const HEX: &[u8; 16] = b"0123456789ABCDEF"; + + for b in input.as_bytes() { + let safe = b.is_ascii_alphanumeric() || *b == b'-' || *b == b'_' || *b == b'.' || *b == b'~'; + if safe { + out.push(*b as char); + } else { + out.push('%'); + out.push(HEX[(b >> 4) as usize] as char); + out.push(HEX[(b & 0x0F) as usize] as char); + } + } + + out +} \ No newline at end of file diff --git a/crates/dd3_contracts/src/ha.rs b/crates/dd3_contracts/src/ha.rs new file mode 100644 index 0000000..eb2323d --- /dev/null +++ b/crates/dd3_contracts/src/ha.rs @@ -0,0 +1,63 @@ +use std::string::String; + +use crate::HA_MANUFACTURER; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct HaDiscoverySpec<'a> { + pub device_id: &'a str, + pub key: &'a str, + pub name: &'a str, + pub unit: Option<&'a str>, + pub device_class: Option<&'a str>, + pub state_topic: &'a str, + pub value_template: &'a str, +} + +fn json_escape(input: &str) -> String { + let mut out = String::with_capacity(input.len() + 8); + for c in input.chars() { + match c { + '"' => out.push_str("\\\""), + '\\' => out.push_str("\\\\"), + '\n' => out.push_str("\\n"), + '\r' => out.push_str("\\r"), + '\t' => out.push_str("\\t"), + ch if (ch as u32) < 0x20 => out.push_str(&format!("\\u{:04X}", ch as u32)), + _ => out.push(c), + } + } + out +} + +pub fn build_ha_discovery_topic(device_id: &str, key: &str) -> String { + format!("homeassistant/sensor/{}/{}/config", device_id, key) +} + +pub fn build_ha_discovery_payload(spec: &HaDiscoverySpec<'_>) -> String { + let sensor_name = format!("{} {}", spec.device_id, spec.name); + let unique_id = format!("{}_{}", spec.device_id, spec.key); + + let mut json = String::new(); + json.push('{'); + json.push_str(&format!("\"name\":\"{}\"", json_escape(&sensor_name))); + json.push_str(&format!(",\"state_topic\":\"{}\"", json_escape(spec.state_topic))); + json.push_str(&format!(",\"unique_id\":\"{}\"", json_escape(&unique_id))); + if let Some(unit) = spec.unit { + if !unit.is_empty() { + json.push_str(&format!(",\"unit_of_measurement\":\"{}\"", json_escape(unit))); + } + } + if let Some(dc) = spec.device_class { + if !dc.is_empty() { + json.push_str(&format!(",\"device_class\":\"{}\"", json_escape(dc))); + } + } + json.push_str(&format!(",\"value_template\":\"{}\"", json_escape(spec.value_template))); + json.push_str(",\"device\":{"); + json.push_str(&format!("\"identifiers\":[\"{}\"]", json_escape(spec.device_id))); + json.push_str(&format!(",\"name\":\"{}\"", json_escape(spec.device_id))); + json.push_str(",\"model\":\"DD3-LoRa-Bridge\""); + json.push_str(&format!(",\"manufacturer\":\"{}\"", json_escape(HA_MANUFACTURER))); + json.push_str("}}"); + json +} diff --git a/crates/dd3_contracts/src/lib.rs b/crates/dd3_contracts/src/lib.rs new file mode 100644 index 0000000..d46b89d --- /dev/null +++ b/crates/dd3_contracts/src/lib.rs @@ -0,0 +1,13 @@ +pub mod csv; +pub mod escape; +pub mod ha; +pub mod mqtt; +pub mod sanitize; + +pub use csv::{format_csv_line, CsvLineInput, CSV_HEADER}; +pub use escape::{html_escape, url_encode_component}; +pub use ha::{build_ha_discovery_payload, build_ha_discovery_topic, HaDiscoverySpec}; +pub use mqtt::{mqtt_state_json, rx_reject_reason_text, MqttState}; +pub use sanitize::{sanitize_device_id, SanitizeError}; + +pub const HA_MANUFACTURER: &str = "AcidBurns"; \ No newline at end of file diff --git a/crates/dd3_contracts/src/mqtt.rs b/crates/dd3_contracts/src/mqtt.rs new file mode 100644 index 0000000..0855885 --- /dev/null +++ b/crates/dd3_contracts/src/mqtt.rs @@ -0,0 +1,135 @@ +use std::string::String; + +#[derive(Debug, Clone, PartialEq)] +pub struct MqttState<'a> { + pub device_id: &'a str, + pub ts_utc: u32, + pub energy_total_kwh: f32, + pub total_power_w: f32, + pub phase_power_w: [f32; 3], + pub battery_voltage_v: f32, + pub battery_percent: u8, + pub link_valid: bool, + pub link_rssi_dbm: i16, + pub link_snr_db: f32, + pub err_meter_read: u32, + pub err_decode: u32, + pub err_lora_tx: u32, + pub err_last: u8, + pub rx_reject_reason: u8, +} + +fn json_escape(input: &str) -> String { + let mut out = String::with_capacity(input.len() + 8); + for c in input.chars() { + match c { + '"' => out.push_str("\\\""), + '\\' => out.push_str("\\\\"), + '\n' => out.push_str("\\n"), + '\r' => out.push_str("\\r"), + '\t' => out.push_str("\\t"), + ch if (ch as u32) < 0x20 => out.push_str(&format!("\\u{:04X}", ch as u32)), + _ => out.push(c), + } + } + out +} + +fn round2(v: f32) -> f32 { + if v.is_nan() { + return v; + } + (v * 100.0).round() / 100.0 +} + +fn format_float_2_json(v: f32) -> String { + if v.is_nan() { + return "null".to_string(); + } + format!("{:.2}", round2(v)) +} + +fn round_to_i32(value: f32) -> i32 { + if value.is_nan() { + return 0; + } + let rounded = value.round(); + if rounded > i32::MAX as f32 { + i32::MAX + } else if rounded < i32::MIN as f32 { + i32::MIN + } else { + rounded as i32 + } +} + +fn short_id_from_device_id(device_id: &str) -> &str { + if device_id.len() >= 4 { + &device_id[device_id.len() - 4..] + } else { + device_id + } +} + +pub fn rx_reject_reason_text(reason: u8) -> &'static str { + match reason { + 1 => "crc_fail", + 2 => "invalid_msg_kind", + 3 => "length_mismatch", + 4 => "device_id_mismatch", + 5 => "batch_id_mismatch", + 6 => "unknown_sender", + _ => "none", + } +} + +pub fn mqtt_state_json(state: &MqttState<'_>) -> String { + let mut parts: Vec = Vec::new(); + + parts.push(format!("\"id\":\"{}\"", json_escape(short_id_from_device_id(state.device_id)))); + parts.push(format!("\"ts\":{}", state.ts_utc)); + parts.push(format!("\"e_kwh\":{}", format_float_2_json(state.energy_total_kwh))); + + if state.total_power_w.is_nan() { + parts.push("\"p_w\":null".to_string()); + } else { + parts.push(format!("\"p_w\":{}", round_to_i32(state.total_power_w))); + } + for (key, value) in [ + ("p1_w", state.phase_power_w[0]), + ("p2_w", state.phase_power_w[1]), + ("p3_w", state.phase_power_w[2]), + ] { + if value.is_nan() { + parts.push(format!("\"{}\":null", key)); + } else { + parts.push(format!("\"{}\":{}", key, round_to_i32(value))); + } + } + + parts.push(format!("\"bat_v\":{}", format_float_2_json(state.battery_voltage_v))); + parts.push(format!("\"bat_pct\":{}", state.battery_percent)); + + if state.link_valid { + parts.push(format!("\"rssi\":{}", state.link_rssi_dbm)); + parts.push(format!("\"snr\":{}", state.link_snr_db)); + } + if state.err_meter_read > 0 { + parts.push(format!("\"err_m\":{}", state.err_meter_read)); + } + if state.err_decode > 0 { + parts.push(format!("\"err_d\":{}", state.err_decode)); + } + if state.err_lora_tx > 0 { + parts.push(format!("\"err_tx\":{}", state.err_lora_tx)); + } + + parts.push(format!("\"err_last\":{}", state.err_last)); + parts.push(format!("\"rx_reject\":{}", state.rx_reject_reason)); + parts.push(format!( + "\"rx_reject_text\":\"{}\"", + rx_reject_reason_text(state.rx_reject_reason) + )); + + format!("{{{}}}", parts.join(",")) +} diff --git a/crates/dd3_contracts/src/sanitize.rs b/crates/dd3_contracts/src/sanitize.rs new file mode 100644 index 0000000..3ca8d4f --- /dev/null +++ b/crates/dd3_contracts/src/sanitize.rs @@ -0,0 +1,48 @@ +use std::string::String; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SanitizeError { + Empty, + PathTraversal, + PercentNotAllowed, + InvalidFormat, + InvalidHex, +} + +fn is_hex_char(c: char) -> bool { + c.is_ascii_hexdigit() +} + +fn to_upper_hex4(input: &str) -> String { + input.to_ascii_uppercase() +} + +pub fn sanitize_device_id(input: &str) -> Result { + let trimmed = input.trim(); + if trimmed.is_empty() { + return Err(SanitizeError::Empty); + } + if trimmed.contains('/') || trimmed.contains('\\') || trimmed.contains("..") { + return Err(SanitizeError::PathTraversal); + } + if trimmed.contains('%') { + return Err(SanitizeError::PercentNotAllowed); + } + + if trimmed.len() == 4 { + if !trimmed.chars().all(is_hex_char) { + return Err(SanitizeError::InvalidHex); + } + return Ok(format!("dd3-{}", to_upper_hex4(trimmed))); + } + + if trimmed.len() == 8 && trimmed.starts_with("dd3-") { + let hex = &trimmed[4..]; + if !hex.chars().all(is_hex_char) { + return Err(SanitizeError::InvalidHex); + } + return Ok(format!("dd3-{}", to_upper_hex4(hex))); + } + + Err(SanitizeError::InvalidFormat) +} diff --git a/crates/dd3_contracts/tests/contracts_tests.rs b/crates/dd3_contracts/tests/contracts_tests.rs new file mode 100644 index 0000000..915f56e --- /dev/null +++ b/crates/dd3_contracts/tests/contracts_tests.rs @@ -0,0 +1,184 @@ +use std::fs; +use std::path::Path; + +use dd3_contracts::{ + build_ha_discovery_payload, format_csv_line, html_escape, mqtt_state_json, sanitize_device_id, + url_encode_component, CsvLineInput, HaDiscoverySpec, MqttState, CSV_HEADER, HA_MANUFACTURER, +}; + +fn fixture(path: &str) -> String { + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + let full = root.join("../../").join(path); + fs::read_to_string(full).unwrap() +} + +#[test] +fn ha_discovery_snapshot_and_manufacturer_lock() { + let spec = HaDiscoverySpec { + device_id: "dd3-F19C", + key: "energy", + name: "Energy", + unit: Some("kWh"), + device_class: Some("energy"), + state_topic: "smartmeter/dd3-F19C/state", + value_template: "{{ value_json.e_kwh }}", + }; + + let actual = build_ha_discovery_payload(&spec); + let expected = fixture("fixtures/contracts/ha_discovery/energy.json"); + assert_eq!(expected, actual); + assert!(actual.contains("\"manufacturer\":\"AcidBurns\"")); +} + +#[test] +fn mqtt_state_snapshot_required_keys_and_no_legacy_keys() { + let state = MqttState { + device_id: "dd3-F19C", + ts_utc: 1_769_905_000, + energy_total_kwh: 1234.5678, + total_power_w: 321.6, + phase_power_w: [100.4, 110.4, 110.8], + battery_voltage_v: 3.876, + battery_percent: 77, + link_valid: true, + link_rssi_dbm: -71, + link_snr_db: 7.25, + err_meter_read: 1, + err_decode: 2, + err_lora_tx: 3, + err_last: 2, + rx_reject_reason: 1, + }; + + let actual = mqtt_state_json(&state); + let expected = fixture("fixtures/contracts/mqtt_state/sample.json"); + assert_eq!(expected, actual); + + for key in [ + "\"id\"", "\"ts\"", "\"e_kwh\"", "\"p_w\"", "\"p1_w\"", "\"p2_w\"", "\"p3_w\"", + "\"bat_v\"", "\"bat_pct\"", "\"rssi\"", "\"snr\"", "\"err_m\"", "\"err_d\"", + "\"err_tx\"", "\"err_last\"", "\"rx_reject\"", "\"rx_reject_text\"", + ] { + assert!(actual.contains(key), "missing key {key}"); + } + + assert!(!actual.contains("energy_total_kwh")); + assert!(!actual.contains("power_w")); + assert!(!actual.contains("battery_voltage")); +} + +#[test] +fn mqtt_state_optional_fields_omitted_when_unavailable() { + let state = MqttState { + device_id: "dd3-F19C", + ts_utc: 1_769_905_000, + energy_total_kwh: 10.0, + total_power_w: 100.0, + phase_power_w: [30.0, 30.0, 40.0], + battery_voltage_v: 3.9, + battery_percent: 88, + link_valid: false, + link_rssi_dbm: -80, + link_snr_db: 3.2, + err_meter_read: 0, + err_decode: 0, + err_lora_tx: 0, + err_last: 0, + rx_reject_reason: 0, + }; + + let json = mqtt_state_json(&state); + assert!(!json.contains("\"rssi\"")); + assert!(!json.contains("\"snr\"")); + assert!(!json.contains("\"err_m\"")); + assert!(!json.contains("\"err_d\"")); + assert!(!json.contains("\"err_tx\"")); + assert!(json.contains("\"rx_reject_text\":\"none\"")); +} + +#[test] +fn csv_header_and_line_snapshot() { + assert_eq!( + "ts_utc,ts_hms_local,p_w,p1_w,p2_w,p3_w,e_kwh,bat_v,bat_pct,rssi,snr,err_m,err_d,err_tx,err_last", + CSV_HEADER + ); + + let line = format_csv_line(&CsvLineInput { + ts_utc: 1_769_905_000, + ts_hms_local: "12:34:56", + p_w: 321.6, + p1_w: 100.4, + p2_w: 110.4, + p3_w: 110.8, + e_kwh: 1234.5678, + bat_v: 3.876, + bat_pct: 77, + rssi: -71, + snr: 7.25, + err_m: 1, + err_d: 2, + err_tx: 3, + err_last_text: Some("decode"), + include_error_text: true, + }); + + let snapshot = fixture("fixtures/contracts/sd_csv/sample.csv"); + let expected_line = snapshot.lines().nth(1).unwrap(); + assert_eq!(expected_line, line); +} + +#[test] +fn html_url_and_sanitize_table_cases() { + assert_eq!("", html_escape("")); + assert_eq!("plain", html_escape("plain")); + assert_eq!("a&b", html_escape("a&b")); + assert_eq!("<tag>", html_escape("")); + assert_eq!(""hi"", html_escape("\"hi\"")); + assert_eq!("it's", html_escape("it's")); + assert_eq!("&<>"'", html_escape("&<>\"'")); + assert_eq!("&amp;", html_escape("&")); + + assert_eq!("", url_encode_component("")); + assert_eq!("abcABC012-_.~", url_encode_component("abcABC012-_.~")); + assert_eq!("a%20b", url_encode_component("a b")); + assert_eq!("%2F%5C%3F%26%23%25%22%27", url_encode_component("/\\?&#%\"'")); + assert_eq!("line%0Abreak", url_encode_component("line\nbreak")); + assert_eq!("%01%1F%7F", url_encode_component(&String::from_utf8(vec![1, 31, 127]).unwrap())); + + for accept in ["F19C", "f19c", " f19c ", "dd3-f19c", "dd3-F19C", "dd3-a0b1"] { + let out = sanitize_device_id(accept).unwrap(); + if accept.contains("a0b1") { + assert_eq!("dd3-A0B1", out); + } else { + assert_eq!("dd3-F19C", out); + } + } + + for reject in [ + "", "F", "FFF", "FFFFF", "dd3-12", "dd3-12345", "F1 9C", "dd3-F1\t9C", + "dd3-F19C%00", "%F19C", "../F19C", "dd3-..1A", "dd3-12/3", "dd3-12\\3", "F19G", "dd3-zzzz", + ] { + assert!(sanitize_device_id(reject).is_err(), "unexpected accept: {reject}"); + } +} + +#[test] +fn manufacturer_drift_guard() { + assert_eq!("AcidBurns", HA_MANUFACTURER); + + let roots = [Path::new(env!("CARGO_MANIFEST_DIR")).join("src")]; + for root in roots { + let entries = fs::read_dir(&root).unwrap(); + for entry in entries { + let path = entry.unwrap().path(); + if path.extension().and_then(|x| x.to_str()) != Some("rs") { + continue; + } + let txt = fs::read_to_string(&path).unwrap(); + if path.ends_with(Path::new("lib.rs")) { + continue; + } + assert!(!txt.contains("\"AcidBurns\""), "hardcoded manufacturer in {}", path.display()); + } + } +} diff --git a/crates/dd3_core/Cargo.toml b/crates/dd3_core/Cargo.toml new file mode 100644 index 0000000..b50c666 --- /dev/null +++ b/crates/dd3_core/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "dd3_core" +version = "0.1.0" +edition = "2021" + +[dependencies] +dd3_protocol = { path = "../dd3_protocol" } +dd3_contracts = { path = "../dd3_contracts" } \ No newline at end of file diff --git a/crates/dd3_core/src/constants.rs b/crates/dd3_core/src/constants.rs new file mode 100644 index 0000000..ec54f14 --- /dev/null +++ b/crates/dd3_core/src/constants.rs @@ -0,0 +1,15 @@ +pub const MIN_ACCEPTED_EPOCH_UTC: u32 = 1_769_904_000; + +pub const SYNC_REQUEST_INTERVAL_MS: u32 = 15_000; +pub const METER_SAMPLE_INTERVAL_MS: u32 = 1_000; +pub const METER_SEND_INTERVAL_MS: u32 = 30_000; +pub const BATCH_MAX_RETRIES: u8 = 2; +pub const BATCH_QUEUE_DEPTH: usize = 10; +pub const ACK_REPEAT_COUNT: u8 = 3; +pub const ACK_REPEAT_DELAY_MS: u32 = 200; + +pub const METER_BATCH_MAX_SAMPLES: usize = 30; +pub const BATCH_HEADER_SIZE: usize = 6; +pub const BATCH_CHUNK_PAYLOAD: usize = dd3_protocol::LORA_MAX_PAYLOAD - BATCH_HEADER_SIZE; +pub const BATCH_MAX_COMPRESSED: usize = 4096; +pub const BATCH_RX_MARGIN_MS: u32 = 800; \ No newline at end of file diff --git a/crates/dd3_core/src/lib.rs b/crates/dd3_core/src/lib.rs new file mode 100644 index 0000000..803bbad --- /dev/null +++ b/crates/dd3_core/src/lib.rs @@ -0,0 +1,11 @@ +pub mod constants; +pub mod receiver; +pub mod sender; +pub mod traits; +pub mod types; + +pub use constants::*; +pub use receiver::{ReceiverConfig, ReceiverPipeline, ReceiverStats, SenderStatus}; +pub use sender::{SenderConfig, SenderPhase, SenderStateMachine, SenderStats}; +pub use traits::{Clock, Publisher, Radio, StatusSink, Storage}; +pub use types::{FaultCounters, MeterSample}; \ No newline at end of file diff --git a/crates/dd3_core/src/receiver.rs b/crates/dd3_core/src/receiver.rs new file mode 100644 index 0000000..22dec0e --- /dev/null +++ b/crates/dd3_core/src/receiver.rs @@ -0,0 +1,470 @@ +use std::collections::BTreeMap; + +use dd3_contracts::{ + build_ha_discovery_payload, build_ha_discovery_topic, format_csv_line, mqtt_state_json, + CsvLineInput, HaDiscoverySpec, MqttState, +}; +use dd3_protocol::{ + decode_batch_v3, decode_frame, encode_ack_down_payload, encode_frame, push_chunk, MsgKind, + ReassemblyState, ReassemblyStatus, +}; + +use crate::constants::*; +use crate::traits::{Clock, Publisher, Radio, StatusSink, Storage}; +use crate::types::{FaultCounters, MeterSample}; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ReceiverConfig { + pub short_id: u16, + pub device_id: String, + pub expected_sender_ids: Vec, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub struct ReceiverStats { + pub receiver_decode_fail: u32, + pub receiver_lora_tx_fail: u32, + pub last_rx_reject: u8, + pub receiver_discovery_sent: bool, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct SenderStatus { + pub short_id: u16, + pub device_id: String, + pub last_data: Option, + pub last_update_ts_utc: u32, + pub rx_batches_total: u32, + pub rx_batches_duplicate: u32, + pub rx_last_duplicate_ts_utc: u32, + pub has_data: bool, +} + +impl SenderStatus { + fn new(short_id: u16) -> Self { + Self { + short_id, + device_id: format!("dd3-{short_id:04X}"), + last_data: None, + last_update_ts_utc: 0, + rx_batches_total: 0, + rx_batches_duplicate: 0, + rx_last_duplicate_ts_utc: 0, + has_data: false, + } + } +} + +pub struct ReceiverPipeline { + config: ReceiverConfig, + sender_statuses: Vec, + last_batch_id_rx: Vec, + + receiver_faults: FaultCounters, + receiver_last_error: u8, + last_rx_reject: u8, + + reassembly: BTreeMap, + reassembly_buffer: BTreeMap>, + + sender_discovery_sent: Vec, + receiver_discovery_sent: bool, +} + +impl ReceiverPipeline { + pub fn new(config: ReceiverConfig) -> Self { + let sender_statuses = config + .expected_sender_ids + .iter() + .copied() + .map(SenderStatus::new) + .collect::>(); + + let sender_count = sender_statuses.len(); + + Self { + config, + sender_statuses, + last_batch_id_rx: vec![0; sender_count], + receiver_faults: FaultCounters::default(), + receiver_last_error: 0, + last_rx_reject: 0, + reassembly: BTreeMap::new(), + reassembly_buffer: BTreeMap::new(), + sender_discovery_sent: vec![false; sender_count], + receiver_discovery_sent: false, + } + } + + pub fn stats(&self) -> ReceiverStats { + ReceiverStats { + receiver_decode_fail: self.receiver_faults.decode_fail, + receiver_lora_tx_fail: self.receiver_faults.lora_tx_fail, + last_rx_reject: self.last_rx_reject, + receiver_discovery_sent: self.receiver_discovery_sent, + } + } + + pub fn sender_statuses(&self) -> &[SenderStatus] { + &self.sender_statuses + } + + fn note_fault_decode(&mut self) { + self.receiver_faults.decode_fail = self.receiver_faults.decode_fail.saturating_add(1); + self.receiver_last_error = 2; + } + + fn sender_idx_from_short_id(&self, short_id: u16) -> Option { + self.config + .expected_sender_ids + .iter() + .position(|expected| *expected == short_id) + } + + fn short_id_from_sender_id(&self, sender_id: u16) -> Option { + if sender_id == 0 { + return None; + } + self.config + .expected_sender_ids + .get(sender_id as usize - 1) + .copied() + } + + fn compute_batch_rx_timeout_ms(total_len: u16, chunk_count: u8) -> u32 { + if total_len == 0 || chunk_count == 0 { + return 10_000; + } + let max_chunk_payload = (total_len as usize).min(BATCH_CHUNK_PAYLOAD); + let payload_len = BATCH_HEADER_SIZE + max_chunk_payload; + let packet_len = 3 + payload_len + 2; + let per_chunk_toa_ms = 120 + (packet_len as u32) * 6; + (chunk_count as u32) + .saturating_mul(per_chunk_toa_ms) + .saturating_add(BATCH_RX_MARGIN_MS) + .max(10_000) + } + + fn send_batch_ack(&mut self, batch_id: u16, clock: &impl Clock, radio: &mut impl Radio) { + let epoch = clock.now_utc(); + let time_valid = clock.is_time_synced() && epoch >= MIN_ACCEPTED_EPOCH_UTC; + let ack_payload = encode_ack_down_payload(dd3_protocol::AckDownPayload { + time_valid, + batch_id, + epoch_utc: if time_valid { epoch } else { 0 }, + }); + let frame = encode_frame(MsgKind::AckDown, self.config.short_id, &ack_payload); + + let repeats = if ACK_REPEAT_COUNT == 0 { 1 } else { ACK_REPEAT_COUNT }; + for _ in 0..repeats { + if !radio.send_frame(&frame) { + self.receiver_faults.lora_tx_fail = self.receiver_faults.lora_tx_fail.saturating_add(1); + self.receiver_last_error = 3; + } + } + } + + fn process_batch_packet( + &mut self, + pkt_short_id: u16, + payload: &[u8], + now_ms: u64, + ) -> Result, ()> { + if payload.len() < BATCH_HEADER_SIZE { + return Ok(None); + } + + let batch_id = u16::from_le_bytes([payload[0], payload[1]]); + let chunk_index = payload[2]; + let chunk_count = payload[3]; + let total_len = u16::from_le_bytes([payload[4], payload[5]]); + let chunk_data = &payload[BATCH_HEADER_SIZE..]; + + let state = self.reassembly.entry(pkt_short_id).or_default(); + let buffer = self + .reassembly_buffer + .entry(pkt_short_id) + .or_insert_with(|| vec![0u8; BATCH_MAX_COMPRESSED]); + + let status = push_chunk( + state, + batch_id, + chunk_index, + chunk_count, + total_len, + chunk_data, + now_ms as u32, + Self::compute_batch_rx_timeout_ms(total_len, chunk_count), + BATCH_MAX_COMPRESSED as u16, + buffer, + ); + + match status { + ReassemblyStatus::InProgress => Ok(None), + ReassemblyStatus::ErrorReset => Ok(None), + ReassemblyStatus::Complete { complete_len } => { + let decoded = decode_batch_v3(&buffer[..complete_len as usize]).map_err(|_| ())?; + Ok(Some((batch_id, decoded))) + } + } + } + + fn publish_discovery_bundle(&self, publisher: &mut impl Publisher, device_id: &str) { + let state_topic = format!("smartmeter/{device_id}/state"); + let faults_topic = format!("smartmeter/{device_id}/faults"); + + let sensors = [ + ("energy", "Energy", Some("kWh"), Some("energy"), state_topic.as_str(), "{{ value_json.e_kwh }}"), + ("power", "Power", Some("W"), Some("power"), state_topic.as_str(), "{{ value_json.p_w }}"), + ("p1", "Power L1", Some("W"), Some("power"), state_topic.as_str(), "{{ value_json.p1_w }}"), + ("p2", "Power L2", Some("W"), Some("power"), state_topic.as_str(), "{{ value_json.p2_w }}"), + ("p3", "Power L3", Some("W"), Some("power"), state_topic.as_str(), "{{ value_json.p3_w }}"), + ("bat_v", "Battery Voltage", Some("V"), Some("voltage"), state_topic.as_str(), "{{ value_json.bat_v }}"), + ("bat_pct", "Battery", Some("%"), Some("battery"), state_topic.as_str(), "{{ value_json.bat_pct }}"), + ("rssi", "LoRa RSSI", Some("dBm"), Some("signal_strength"), state_topic.as_str(), "{{ value_json.rssi }}"), + ("snr", "LoRa SNR", Some("dB"), None, state_topic.as_str(), "{{ value_json.snr }}"), + ("err_m", "Meter Read Errors", Some("count"), None, faults_topic.as_str(), "{{ value_json.err_m }}"), + ("err_d", "Decode Errors", Some("count"), None, faults_topic.as_str(), "{{ value_json.err_d }}"), + ("err_tx", "LoRa TX Errors", Some("count"), None, faults_topic.as_str(), "{{ value_json.err_tx }}"), + ("err_last", "Last Error Code", None, None, faults_topic.as_str(), "{{ value_json.err_last }}"), + ("err_last_text", "Last Error", None, None, faults_topic.as_str(), "{{ value_json.err_last_text }}"), + ("err_last_age", "Last Error Age", Some("s"), None, faults_topic.as_str(), "{{ value_json.err_last_age }}"), + ]; + + for (key, name, unit, device_class, topic, template) in sensors { + let spec = HaDiscoverySpec { + device_id, + key, + name, + unit, + device_class, + state_topic: topic, + value_template: template, + }; + let discovery_topic = build_ha_discovery_topic(device_id, key); + let discovery_payload = build_ha_discovery_payload(&spec); + publisher.publish_discovery(&discovery_topic, &discovery_payload); + } + } + + pub fn tick( + &mut self, + clock: &impl Clock, + radio: &mut impl Radio, + publisher: &mut impl Publisher, + storage: &mut impl Storage, + status_sink: &mut impl StatusSink, + ) { + let now_ms = clock.now_ms(); + + while let Some(raw) = radio.recv_frame(0, now_ms) { + let frame = match decode_frame(&raw, MsgKind::AckDown as u8) { + Ok(frame) => frame, + Err(_) => { + self.last_rx_reject = 3; + continue; + } + }; + + if frame.msg_kind != MsgKind::BatchUp { + continue; + } + + let packet = match self.process_batch_packet(frame.short_id, &frame.payload, now_ms) { + Ok(Some(pkt)) => pkt, + Ok(None) => continue, + Err(_) => { + self.note_fault_decode(); + continue; + } + }; + + let (batch_id, batch) = packet; + + let Some(sender_idx) = self.sender_idx_from_short_id(frame.short_id) else { + self.last_rx_reject = 6; + self.note_fault_decode(); + continue; + }; + + let expected_sender_id = (sender_idx as u16) + 1; + if batch.sender_id != expected_sender_id { + self.last_rx_reject = 4; + self.note_fault_decode(); + continue; + } + + let duplicate = self.last_batch_id_rx[sender_idx] == batch_id; + { + let status = &mut self.sender_statuses[sender_idx]; + status.rx_batches_total = status.rx_batches_total.saturating_add(1); + if duplicate { + status.rx_batches_duplicate = status.rx_batches_duplicate.saturating_add(1); + let dup_ts = if clock.now_utc() == 0 { batch.t_last } else { clock.now_utc() }; + status.rx_last_duplicate_ts_utc = dup_ts; + } + } + + self.send_batch_ack(batch_id, clock, radio); + if duplicate { + continue; + } + + self.last_batch_id_rx[sender_idx] = batch_id; + + if batch.n == 0 { + continue; + } + if batch.n as usize > METER_BATCH_MAX_SAMPLES { + self.note_fault_decode(); + continue; + } + if batch.present_mask.count_ones() as u8 != batch.n { + self.note_fault_decode(); + continue; + } + if batch.t_last < (METER_BATCH_MAX_SAMPLES as u32 - 1) || batch.t_last < MIN_ACCEPTED_EPOCH_UTC { + self.note_fault_decode(); + continue; + } + + let mut samples: Vec = Vec::with_capacity(batch.n as usize); + let count = batch.n as usize; + let window_start = batch.t_last - (METER_BATCH_MAX_SAMPLES as u32 - 1); + let mut s = 0usize; + + let short_id = if frame.short_id != 0 { + frame.short_id + } else { + self.short_id_from_sender_id(batch.sender_id).unwrap_or(0) + }; + let device_id = format!("dd3-{short_id:04X}"); + let bat_v = if batch.battery_mv > 0 { + batch.battery_mv as f32 / 1000.0 + } else { + f32::NAN + }; + + for slot in 0..METER_BATCH_MAX_SAMPLES { + if (batch.present_mask & (1u32 << slot)) == 0 { + continue; + } + if s >= count { + self.note_fault_decode(); + samples.clear(); + break; + } + let ts_utc = window_start + slot as u32; + if ts_utc < MIN_ACCEPTED_EPOCH_UTC { + self.note_fault_decode(); + samples.clear(); + break; + } + let p1 = batch.p1_w[s] as f32; + let p2 = batch.p2_w[s] as f32; + let p3 = batch.p3_w[s] as f32; + + let sample = MeterSample { + ts_utc, + energy_total_kwh: batch.energy_wh[s] as f32 / 1000.0, + phase_power_w: [p1, p2, p3], + total_power_w: p1 + p2 + p3, + battery_voltage_v: bat_v, + battery_percent: if bat_v.is_nan() { 0 } else { 75 }, + link_rssi_dbm: -70, + link_snr_db: 7.0, + err_meter_read: batch.err_m as u32, + err_decode: batch.err_d as u32, + err_lora_tx: batch.err_tx as u32, + err_last: batch.err_last, + rx_reject_reason: batch.err_rx_reject, + }; + samples.push(sample); + + let csv_line = format_csv_line(&CsvLineInput { + ts_utc: sample.ts_utc, + ts_hms_local: "00:00:00", + p_w: sample.total_power_w, + p1_w: sample.phase_power_w[0], + p2_w: sample.phase_power_w[1], + p3_w: sample.phase_power_w[2], + e_kwh: sample.energy_total_kwh, + bat_v: sample.battery_voltage_v, + bat_pct: sample.battery_percent, + rssi: sample.link_rssi_dbm, + snr: sample.link_snr_db, + err_m: sample.err_meter_read, + err_d: sample.err_decode, + err_tx: sample.err_lora_tx, + err_last_text: Some(match sample.err_last { + 1 => "meter", + 2 => "decode", + 3 => "loratx", + _ => "", + }), + include_error_text: s + 1 == count, + }); + storage.append_csv(&device_id, &csv_line); + + let state_payload = mqtt_state_json(&MqttState { + device_id: &device_id, + ts_utc: sample.ts_utc, + energy_total_kwh: sample.energy_total_kwh, + total_power_w: sample.total_power_w, + phase_power_w: sample.phase_power_w, + battery_voltage_v: sample.battery_voltage_v, + battery_percent: sample.battery_percent, + link_valid: true, + link_rssi_dbm: sample.link_rssi_dbm, + link_snr_db: sample.link_snr_db, + err_meter_read: sample.err_meter_read, + err_decode: sample.err_decode, + err_lora_tx: sample.err_lora_tx, + err_last: sample.err_last, + rx_reject_reason: sample.rx_reject_reason, + }); + publisher.publish_state(&format!("smartmeter/{device_id}/state"), &state_payload); + s += 1; + } + + if samples.len() != count { + self.note_fault_decode(); + continue; + } + + if !self.sender_discovery_sent[sender_idx] { + self.publish_discovery_bundle(publisher, &device_id); + self.sender_discovery_sent[sender_idx] = true; + } + + let faults_payload = format!( + "{{\"err_m\":{},\"err_d\":{},\"err_tx\":{},\"err_last\":{},\"err_last_text\":\"{}\",\"err_last_age\":0}}", + batch.err_m, + batch.err_d, + batch.err_tx, + batch.err_last, + match batch.err_last { + 1 => "meter", + 2 => "decode", + 3 => "loratx", + _ => "none", + } + ); + publisher.publish_faults(&format!("smartmeter/{device_id}/faults"), &faults_payload); + + { + let status = &mut self.sender_statuses[sender_idx]; + status.last_data = samples.last().copied(); + status.last_update_ts_utc = samples.last().map(|s| s.ts_utc).unwrap_or(0); + status.has_data = true; + } + } + + if !self.receiver_discovery_sent { + self.publish_discovery_bundle(publisher, &self.config.device_id); + self.receiver_discovery_sent = true; + } + + status_sink.receiver_status("RX_ACTIVE"); + } +} diff --git a/crates/dd3_core/src/sender.rs b/crates/dd3_core/src/sender.rs new file mode 100644 index 0000000..2cc5b4d --- /dev/null +++ b/crates/dd3_core/src/sender.rs @@ -0,0 +1,708 @@ +use std::collections::VecDeque; + +use dd3_protocol::{ + decode_ack_down_payload, decode_frame, encode_frame, encode_batch_v3, AckDownPayload, BatchInputV3, + MsgKind, +}; + +use crate::constants::*; +use crate::traits::{Clock, Radio, StatusSink}; +use crate::types::{FaultCounters, MeterSample}; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SenderConfig { + pub short_id: u16, + pub sender_id: u16, + pub device_id: String, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SenderPhase { + Syncing, + Normal, + Catchup, + WaitAck, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub struct SenderStats { + pub queue_depth: u8, + pub build_count: u8, + pub inflight_batch_id: u16, + pub last_sent_batch_id: u16, + pub last_acked_batch_id: u16, + pub retry_count: u8, + pub ack_pending: bool, + pub ack_timeout_total: u32, + pub ack_retry_total: u32, + pub ack_miss_streak: u32, + pub rx_window_ms: u32, +} + +#[derive(Debug, Clone)] +struct BatchBuffer { + batch_id: u16, + batch_id_valid: bool, + samples: Vec, +} + +#[derive(Debug, Clone)] +struct Inflight { + batch_id: u16, + sync_request: bool, + samples: Vec, + encoded_payload: Vec, +} + +pub struct SenderStateMachine { + config: SenderConfig, + phase: SenderPhase, + + time_acquired: bool, + sender_faults_reset_after_first_sync: bool, + sender_faults_reset_hour_utc: Option, + + queue: VecDeque, + build_samples: Vec, + + inflight: Option, + + last_sample_ms: u64, + last_send_ms: u64, + last_sync_request_ms: u64, + last_batch_send_ms: u64, + + batch_id: u16, + last_sent_batch_id: u16, + last_acked_batch_id: u16, + + ack_pending: bool, + retry_count: u8, + ack_timeout_ms: u32, + ack_timeout_total: u32, + ack_retry_total: u32, + ack_rtt_ewma_ms: u32, + ack_miss_streak: u32, + rx_window_ms: u32, + + sample_tick: u32, + sender_faults: FaultCounters, + sender_last_error: u8, + sender_rx_reject_reason: u8, + last_tx_build_encode_error: bool, +} + +impl SenderStateMachine { + pub fn new(config: SenderConfig, now_ms: u64) -> Self { + Self { + config, + phase: SenderPhase::Syncing, + time_acquired: false, + sender_faults_reset_after_first_sync: false, + sender_faults_reset_hour_utc: None, + queue: VecDeque::with_capacity(BATCH_QUEUE_DEPTH), + build_samples: Vec::with_capacity(METER_BATCH_MAX_SAMPLES), + inflight: None, + last_sample_ms: now_ms.saturating_sub(METER_SAMPLE_INTERVAL_MS as u64), + last_send_ms: now_ms, + last_sync_request_ms: now_ms.saturating_sub(SYNC_REQUEST_INTERVAL_MS as u64), + last_batch_send_ms: 0, + batch_id: 1, + last_sent_batch_id: 0, + last_acked_batch_id: 0, + ack_pending: false, + retry_count: 0, + ack_timeout_ms: 10_000, + ack_timeout_total: 0, + ack_retry_total: 0, + ack_rtt_ewma_ms: 0, + ack_miss_streak: 0, + rx_window_ms: 0, + sample_tick: 0, + sender_faults: FaultCounters::default(), + sender_last_error: 0, + sender_rx_reject_reason: 0, + last_tx_build_encode_error: false, + } + } + + pub fn stats(&self) -> SenderStats { + SenderStats { + queue_depth: self.queue.len() as u8, + build_count: self.build_samples.len() as u8, + inflight_batch_id: self.inflight.as_ref().map(|i| i.batch_id).unwrap_or(0), + last_sent_batch_id: self.last_sent_batch_id, + last_acked_batch_id: self.last_acked_batch_id, + retry_count: self.retry_count, + ack_pending: self.ack_pending, + ack_timeout_total: self.ack_timeout_total, + ack_retry_total: self.ack_retry_total, + ack_miss_streak: self.ack_miss_streak, + rx_window_ms: self.rx_window_ms, + } + } + + pub fn is_time_acquired(&self) -> bool { + self.time_acquired + } + + fn reset_sender_fault_stats(&mut self) { + self.sender_faults = FaultCounters::default(); + self.sender_last_error = 0; + } + + fn reset_faults_on_first_sync(&mut self, synced_utc: u32) { + if self.sender_faults_reset_after_first_sync || synced_utc < MIN_ACCEPTED_EPOCH_UTC { + return; + } + self.reset_sender_fault_stats(); + self.sender_faults_reset_after_first_sync = true; + self.sender_faults_reset_hour_utc = Some(synced_utc / 3600); + } + + fn reset_faults_on_hour_boundary(&mut self, now_utc: u32) { + if !self.time_acquired || !self.sender_faults_reset_after_first_sync { + return; + } + if now_utc < MIN_ACCEPTED_EPOCH_UTC { + return; + } + let now_hour = now_utc / 3600; + match self.sender_faults_reset_hour_utc { + None => self.sender_faults_reset_hour_utc = Some(now_hour), + Some(prev) if now_hour > prev => { + self.reset_sender_fault_stats(); + self.sender_faults_reset_hour_utc = Some(now_hour); + } + _ => {} + } + } + + fn generate_sample(&mut self, now_utc: u32) -> MeterSample { + self.sample_tick = self.sample_tick.saturating_add(1); + let ts_utc = if now_utc >= MIN_ACCEPTED_EPOCH_UTC { + now_utc + } else { + MIN_ACCEPTED_EPOCH_UTC.saturating_add(self.sample_tick) + }; + + let wh = self.sample_tick as f32; + let p1 = self.sample_tick as f32; + let p2 = self.sample_tick as f32; + let p3 = self.sample_tick as f32; + + MeterSample { + ts_utc, + energy_total_kwh: wh / 1000.0, + phase_power_w: [p1, p2, p3], + total_power_w: p1 + p2 + p3, + battery_voltage_v: 3.8, + battery_percent: 75, + link_rssi_dbm: -70, + link_snr_db: 7.0, + err_meter_read: self.sender_faults.meter_read_fail, + err_decode: self.sender_faults.decode_fail, + err_lora_tx: self.sender_faults.lora_tx_fail, + err_last: self.sender_last_error, + rx_reject_reason: self.sender_rx_reject_reason, + } + } + + fn append_meter_sample(&mut self, sample: MeterSample) { + self.build_samples.push(sample); + if self.build_samples.len() >= METER_BATCH_MAX_SAMPLES { + let samples = std::mem::take(&mut self.build_samples); + self.batch_queue_enqueue(samples); + } + } + + fn batch_queue_drop_oldest(&mut self) -> bool { + if self.queue.is_empty() { + return false; + } + + let dropped = self.queue.pop_front(); + if let Some(dropped_batch) = dropped { + let dropped_inflight = self + .inflight + .as_ref() + .map(|inflight| dropped_batch.batch_id_valid && inflight.batch_id == dropped_batch.batch_id) + .unwrap_or(false); + + if dropped_inflight { + self.ack_pending = false; + self.retry_count = 0; + self.inflight = None; + } + return dropped_inflight; + } + false + } + + fn batch_queue_enqueue(&mut self, samples: Vec) { + if samples.is_empty() { + return; + } + + if self.queue.len() >= BATCH_QUEUE_DEPTH { + if self.batch_queue_drop_oldest() { + self.batch_id = self.batch_id.wrapping_add(1); + } + } + + self.queue.push_back(BatchBuffer { + batch_id: 0, + batch_id_valid: false, + samples, + }); + } + + fn kwh_to_wh_from_float(v: f32) -> u32 { + if v.is_nan() { + return 0; + } + let mut wh = (v as f64) * 1000.0; + if wh < 0.0 { + wh = 0.0; + } + if wh > u32::MAX as f64 { + u32::MAX + } else { + wh.round() as u32 + } + } + + fn float_to_i16_w_clamped(v: f32) -> i16 { + if v.is_nan() { + return 0; + } + let rounded = v.round(); + if rounded < i16::MIN as f32 { + i16::MIN + } else if rounded > i16::MAX as f32 { + i16::MAX + } else { + rounded as i16 + } + } + + fn compute_airtime_ms(packet_len: usize) -> u32 { + let base = 120u32; + let per_byte = 6u32; + base.saturating_add((packet_len as u32).saturating_mul(per_byte)) + } + + fn compute_batch_ack_timeout_ms(payload_len: usize) -> u32 { + if payload_len == 0 { + return 10_000; + } + let chunk_count = ((payload_len + BATCH_CHUNK_PAYLOAD - 1) / BATCH_CHUNK_PAYLOAD) as u32; + let packet_len = 3 + BATCH_HEADER_SIZE + payload_len.min(BATCH_CHUNK_PAYLOAD) + 2; + let per_chunk_toa = Self::compute_airtime_ms(packet_len); + let timeout = chunk_count.saturating_mul(per_chunk_toa).saturating_add(BATCH_RX_MARGIN_MS); + timeout.max(10_000) + } + + fn prepare_inflight_from_queue(&mut self) -> bool { + if self.inflight.is_some() { + return true; + } + + let Some(front) = self.queue.front_mut() else { + return false; + }; + + if !front.batch_id_valid { + front.batch_id = self.batch_id; + front.batch_id_valid = true; + } + + self.inflight = Some(Inflight { + batch_id: front.batch_id, + sync_request: false, + samples: front.samples.clone(), + encoded_payload: Vec::new(), + }); + + true + } + + fn build_payload(&mut self, clock: &impl Clock) -> Result, ()> { + let inflight = self.inflight.as_ref().ok_or(())?; + + let mut input = BatchInputV3::default(); + input.sender_id = self.config.sender_id; + input.batch_id = inflight.batch_id; + input.t_last = if inflight.sync_request { + clock.now_utc() + } else { + inflight.samples.last().map(|s| s.ts_utc).unwrap_or(0) + }; + input.present_mask = 0; + input.n = 0; + input.battery_mv = if inflight.sync_request { + 3800 + } else { + inflight + .samples + .last() + .map(|s| (s.battery_voltage_v * 1000.0).round() as u16) + .unwrap_or(0) + }; + input.err_m = self.sender_faults.meter_read_fail.min(255) as u8; + input.err_d = self.sender_faults.decode_fail.min(255) as u8; + input.err_tx = self.sender_faults.lora_tx_fail.min(255) as u8; + input.err_last = self.sender_last_error; + input.err_rx_reject = self.sender_rx_reject_reason; + + if !inflight.sync_request { + if input.t_last < (METER_BATCH_MAX_SAMPLES as u32 - 1) { + return Err(()); + } + let window_start = input.t_last - (METER_BATCH_MAX_SAMPLES as u32 - 1); + let mut slot_samples: [Option; METER_BATCH_MAX_SAMPLES] = [None; METER_BATCH_MAX_SAMPLES]; + + for sample in &inflight.samples { + if sample.ts_utc < window_start || sample.ts_utc > input.t_last { + continue; + } + let slot = (sample.ts_utc - window_start) as usize; + slot_samples[slot] = Some(*sample); + } + + for (slot, sample_opt) in slot_samples.iter().enumerate() { + let Some(sample) = sample_opt else { + continue; + }; + let out_idx = input.n as usize; + if out_idx >= METER_BATCH_MAX_SAMPLES { + return Err(()); + } + input.present_mask |= 1u32 << slot; + input.n = input.n.saturating_add(1); + input.energy_wh[out_idx] = Self::kwh_to_wh_from_float(sample.energy_total_kwh); + input.p1_w[out_idx] = Self::float_to_i16_w_clamped(sample.phase_power_w[0]); + input.p2_w[out_idx] = Self::float_to_i16_w_clamped(sample.phase_power_w[1]); + input.p3_w[out_idx] = Self::float_to_i16_w_clamped(sample.phase_power_w[2]); + } + + for i in 1..(input.n as usize) { + if input.energy_wh[i] < input.energy_wh[i - 1] { + input.energy_wh[i] = input.energy_wh[i - 1]; + } + } + } + + encode_batch_v3(&input).map_err(|_| ()) + } + + fn send_batch_payload(&mut self, radio: &mut impl Radio, data: &[u8], batch_id: u16) -> bool { + if data.is_empty() || data.len() > BATCH_MAX_COMPRESSED { + return false; + } + + let chunk_count = ((data.len() + BATCH_CHUNK_PAYLOAD - 1) / BATCH_CHUNK_PAYLOAD) as u8; + if chunk_count == 0 { + return false; + } + + let mut all_ok = true; + let mut offset = 0usize; + + for chunk_index in 0..chunk_count { + let mut chunk_len = data.len() - offset; + if chunk_len > BATCH_CHUNK_PAYLOAD { + chunk_len = BATCH_CHUNK_PAYLOAD; + } + + let mut payload = Vec::with_capacity(BATCH_HEADER_SIZE + chunk_len); + payload.extend_from_slice(&batch_id.to_le_bytes()); + payload.push(chunk_index); + payload.push(chunk_count); + payload.extend_from_slice(&(data.len() as u16).to_le_bytes()); + payload.extend_from_slice(&data[offset..offset + chunk_len]); + + let frame = encode_frame(MsgKind::BatchUp, self.config.short_id, &payload); + let ok = radio.send_frame(&frame); + all_ok &= ok; + if !ok { + self.sender_faults.lora_tx_fail = self.sender_faults.lora_tx_fail.saturating_add(1); + self.sender_last_error = 3; + } + + offset += chunk_len; + } + + all_ok + } + + fn send_inflight_batch(&mut self, clock: &impl Clock, radio: &mut impl Radio) -> bool { + self.last_tx_build_encode_error = false; + let payload = { + let needs_encode = self + .inflight + .as_ref() + .map(|i| i.encoded_payload.is_empty()) + .unwrap_or(true); + if needs_encode { + let encoded = match self.build_payload(clock) { + Ok(v) => v, + Err(_) => { + self.last_tx_build_encode_error = true; + return false; + } + }; + if let Some(inflight) = self.inflight.as_mut() { + inflight.encoded_payload = encoded; + } + } + self.inflight + .as_ref() + .map(|i| i.encoded_payload.clone()) + .unwrap_or_default() + }; + + let batch_id = self.inflight.as_ref().map(|i| i.batch_id).unwrap_or(0); + self.ack_timeout_ms = Self::compute_batch_ack_timeout_ms(payload.len()); + + if self.send_batch_payload(radio, &payload, batch_id) { + self.last_batch_send_ms = clock.now_ms(); + true + } else { + false + } + } + + fn send_meter_batch(&mut self, clock: &impl Clock, radio: &mut impl Radio) -> bool { + if !self.prepare_inflight_from_queue() { + return false; + } + if let Some(inflight) = self.inflight.as_mut() { + inflight.sync_request = false; + } + + let ok = self.send_inflight_batch(clock, radio); + if ok { + self.last_sent_batch_id = self.inflight.as_ref().map(|i| i.batch_id).unwrap_or(0); + self.ack_pending = true; + } else { + if self.last_tx_build_encode_error { + self.finish_inflight_batch(true); + self.sender_faults.decode_fail = self.sender_faults.decode_fail.saturating_add(1); + self.sender_last_error = 2; + } else { + self.inflight = None; + } + } + ok + } + + fn send_sync_request(&mut self, clock: &impl Clock, radio: &mut impl Radio) -> bool { + if self.ack_pending { + return false; + } + + self.inflight = Some(Inflight { + batch_id: self.batch_id, + sync_request: true, + samples: Vec::new(), + encoded_payload: Vec::new(), + }); + + let ok = self.send_inflight_batch(clock, radio); + if ok { + self.last_sent_batch_id = self.inflight.as_ref().map(|i| i.batch_id).unwrap_or(0); + self.ack_pending = true; + } else { + self.inflight = None; + } + ok + } + + fn resend_inflight_batch(&mut self, clock: &impl Clock, radio: &mut impl Radio) -> bool { + if !self.ack_pending || self.inflight.is_none() { + return false; + } + self.send_inflight_batch(clock, radio) + } + + fn finish_inflight_batch(&mut self, pop_queue: bool) { + if pop_queue && !self.queue.is_empty() { + let _ = self.queue.pop_front(); + } + self.ack_pending = false; + self.retry_count = 0; + self.inflight = None; + if pop_queue { + self.batch_id = self.batch_id.wrapping_add(1); + } + } + + fn handle_ack_window(&mut self, clock: &impl Clock, radio: &mut impl Radio) { + if !self.ack_pending { + return; + } + + let ack_len = 3 + dd3_protocol::ACK_DOWN_PAYLOAD_LEN + 2; + let ack_air_ms = Self::compute_airtime_ms(ack_len); + + let mut ack_window_first_ms = ack_air_ms.saturating_add(200); + if self.ack_rtt_ewma_ms > 0 { + ack_window_first_ms = ack_window_first_ms.max(self.ack_rtt_ewma_ms.saturating_add(150)); + } + ack_window_first_ms = ack_window_first_ms + .saturating_add((self.ack_miss_streak.saturating_mul(150)).min(1200)) + .clamp(600, 2500); + + let mut ack_window_second_ms = ack_window_first_ms + (ack_window_first_ms / 2); + ack_window_second_ms = ack_window_second_ms + .max(ack_air_ms.saturating_add(400)) + .min(5000); + + let now_ms = clock.now_ms(); + let mut got_first = false; + let mut frame = radio.recv_frame(ack_window_first_ms, now_ms); + if frame.is_some() { + got_first = true; + } else { + frame = radio.recv_frame(ack_window_second_ms, now_ms); + } + + let rx_elapsed = if got_first { + ack_window_first_ms + } else { + ack_window_first_ms.saturating_add(ack_window_second_ms) + }; + self.rx_window_ms = self.rx_window_ms.saturating_add(rx_elapsed); + + let Some(raw) = frame else { + self.ack_miss_streak = self.ack_miss_streak.saturating_add(1); + return; + }; + + let parsed = match decode_frame(&raw, MsgKind::AckDown as u8) { + Ok(frame) => frame, + Err(_) => { + self.sender_rx_reject_reason = 3; + self.ack_miss_streak = self.ack_miss_streak.saturating_add(1); + return; + } + }; + + if parsed.msg_kind != MsgKind::AckDown { + self.sender_rx_reject_reason = 2; + self.ack_miss_streak = self.ack_miss_streak.saturating_add(1); + return; + } + + let ack = match decode_ack_down_payload(&parsed.payload) { + Ok(a) => a, + Err(_) => { + self.sender_rx_reject_reason = 3; + self.ack_miss_streak = self.ack_miss_streak.saturating_add(1); + return; + } + }; + + if ack.batch_id != self.last_sent_batch_id { + self.sender_rx_reject_reason = 5; + self.ack_miss_streak = self.ack_miss_streak.saturating_add(1); + return; + } + + self.last_acked_batch_id = ack.batch_id; + if self.ack_rtt_ewma_ms == 0 { + self.ack_rtt_ewma_ms = rx_elapsed; + } else { + self.ack_rtt_ewma_ms = (self.ack_rtt_ewma_ms.saturating_mul(3) + rx_elapsed + 1) / 4; + } + + if ack.time_valid && ack.epoch_utc >= MIN_ACCEPTED_EPOCH_UTC { + self.time_acquired = true; + self.reset_faults_on_first_sync(ack.epoch_utc); + } + + self.finish_inflight_batch(true); + self.ack_miss_streak = 0; + } + + fn send_batch_ack_preview(clock: &impl Clock, batch_id: u16) -> AckDownPayload { + let epoch = clock.now_utc(); + let time_valid = clock.is_time_synced() && epoch >= MIN_ACCEPTED_EPOCH_UTC; + AckDownPayload { + time_valid, + batch_id, + epoch_utc: if time_valid { epoch } else { 0 }, + } + } + + pub fn tick(&mut self, clock: &impl Clock, radio: &mut impl Radio, status: &mut impl StatusSink) { + let now_ms = clock.now_ms(); + + if self.time_acquired { + self.reset_faults_on_hour_boundary(clock.now_utc()); + while now_ms.saturating_sub(self.last_sample_ms) >= METER_SAMPLE_INTERVAL_MS as u64 { + self.last_sample_ms = self.last_sample_ms.saturating_add(METER_SAMPLE_INTERVAL_MS as u64); + let sample = self.generate_sample(clock.now_utc()); + self.append_meter_sample(sample); + } + + if !self.ack_pending && now_ms.saturating_sub(self.last_send_ms) >= METER_SEND_INTERVAL_MS as u64 { + self.last_send_ms = now_ms; + if !self.build_samples.is_empty() { + let samples = std::mem::take(&mut self.build_samples); + self.batch_queue_enqueue(samples); + } + if !self.queue.is_empty() { + let _ = self.send_meter_batch(clock, radio); + } + } + + if !self.ack_pending && self.queue.len() > 1 { + let _ = self.send_meter_batch(clock, radio); + } + } else if !self.ack_pending + && now_ms.saturating_sub(self.last_sync_request_ms) >= SYNC_REQUEST_INTERVAL_MS as u64 + { + self.last_sync_request_ms = now_ms; + let _ = self.send_sync_request(clock, radio); + } + + self.handle_ack_window(clock, radio); + + if self.ack_pending && now_ms.saturating_sub(self.last_batch_send_ms) >= self.ack_timeout_ms as u64 { + self.ack_timeout_total = self.ack_timeout_total.saturating_add(1); + if self.retry_count < BATCH_MAX_RETRIES { + self.retry_count = self.retry_count.saturating_add(1); + self.ack_retry_total = self.ack_retry_total.saturating_add(1); + let _ = self.resend_inflight_batch(clock, radio); + } else { + self.ack_pending = false; + self.retry_count = 0; + self.inflight = None; + self.sender_faults.lora_tx_fail = self.sender_faults.lora_tx_fail.saturating_add(1); + self.sender_last_error = 3; + } + } + + self.phase = if self.ack_pending { + SenderPhase::WaitAck + } else if !self.time_acquired { + SenderPhase::Syncing + } else if self.queue.len() > 1 { + SenderPhase::Catchup + } else { + SenderPhase::Normal + }; + + let phase_text = match self.phase { + SenderPhase::Syncing => "SYNCING", + SenderPhase::Normal => "NORMAL", + SenderPhase::Catchup => "CATCHUP", + SenderPhase::WaitAck => "WAIT_ACK", + }; + status.sender_phase(phase_text); + + let _ = Self::send_batch_ack_preview(clock, self.last_sent_batch_id); + } +} diff --git a/crates/dd3_core/src/traits.rs b/crates/dd3_core/src/traits.rs new file mode 100644 index 0000000..d7266e6 --- /dev/null +++ b/crates/dd3_core/src/traits.rs @@ -0,0 +1,27 @@ +use std::vec::Vec; + +pub trait Clock { + fn now_ms(&self) -> u64; + fn now_utc(&self) -> u32; + fn is_time_synced(&self) -> bool; +} + +pub trait Radio { + fn send_frame(&mut self, frame: &[u8]) -> bool; + fn recv_frame(&mut self, window_ms: u32, now_ms: u64) -> Option>; +} + +pub trait Publisher { + fn publish_state(&mut self, device_id: &str, payload: &str); + fn publish_faults(&mut self, device_id: &str, payload: &str); + fn publish_discovery(&mut self, device_id: &str, payload: &str); +} + +pub trait Storage { + fn append_csv(&mut self, device_id: &str, line: &str); +} + +pub trait StatusSink { + fn sender_phase(&mut self, _phase: &str) {} + fn receiver_status(&mut self, _status: &str) {} +} \ No newline at end of file diff --git a/crates/dd3_core/src/types.rs b/crates/dd3_core/src/types.rs new file mode 100644 index 0000000..45483f3 --- /dev/null +++ b/crates/dd3_core/src/types.rs @@ -0,0 +1,43 @@ +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub struct FaultCounters { + pub meter_read_fail: u32, + pub decode_fail: u32, + pub lora_tx_fail: u32, +} + +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct MeterSample { + pub ts_utc: u32, + pub energy_total_kwh: f32, + pub phase_power_w: [f32; 3], + pub total_power_w: f32, + pub battery_voltage_v: f32, + pub battery_percent: u8, + pub link_rssi_dbm: i16, + pub link_snr_db: f32, + pub err_meter_read: u32, + pub err_decode: u32, + pub err_lora_tx: u32, + pub err_last: u8, + pub rx_reject_reason: u8, +} + +impl Default for MeterSample { + fn default() -> Self { + Self { + ts_utc: 0, + energy_total_kwh: 0.0, + phase_power_w: [0.0, 0.0, 0.0], + total_power_w: 0.0, + battery_voltage_v: 0.0, + battery_percent: 0, + link_rssi_dbm: 0, + link_snr_db: f32::NAN, + err_meter_read: 0, + err_decode: 0, + err_lora_tx: 0, + err_last: 0, + rx_reject_reason: 0, + } + } +} \ No newline at end of file diff --git a/crates/dd3_firmware/Cargo.toml b/crates/dd3_firmware/Cargo.toml new file mode 100644 index 0000000..5c1d60d --- /dev/null +++ b/crates/dd3_firmware/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "dd3_firmware" +version = "0.1.0" +edition = "2021" + +[features] +default = [] +esp-idf = [] +esp-hal = [] + +[dependencies] +dd3_core = { path = "../dd3_core" } \ No newline at end of file diff --git a/crates/dd3_firmware/src/main.rs b/crates/dd3_firmware/src/main.rs new file mode 100644 index 0000000..c1304eb --- /dev/null +++ b/crates/dd3_firmware/src/main.rs @@ -0,0 +1,29 @@ +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum DeviceRole { + Sender, + Receiver, +} + +#[cfg(any(feature = "esp-idf", feature = "esp-hal"))] +fn detect_role_from_gpio14() -> DeviceRole { + // Placeholder: wire real GPIO14 read in hardware integration phase. + DeviceRole::Receiver +} + +#[cfg(not(any(feature = "esp-idf", feature = "esp-hal")))] +fn detect_role_from_gpio14() -> DeviceRole { + // Host placeholder so crate compiles in non-embedded environments. + DeviceRole::Receiver +} + +fn main() { + let role = detect_role_from_gpio14(); + match role { + DeviceRole::Sender => { + println!("dd3_firmware placeholder: sender mode"); + } + DeviceRole::Receiver => { + println!("dd3_firmware placeholder: receiver mode"); + } + } +} \ No newline at end of file diff --git a/crates/dd3_protocol/Cargo.toml b/crates/dd3_protocol/Cargo.toml new file mode 100644 index 0000000..cb70a3d --- /dev/null +++ b/crates/dd3_protocol/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "dd3_protocol" +version = "0.1.0" +edition = "2021" + +[features] +default = ["std"] +std = [] + +[dependencies] \ No newline at end of file diff --git a/crates/dd3_protocol/src/ack.rs b/crates/dd3_protocol/src/ack.rs new file mode 100644 index 0000000..d38daa4 --- /dev/null +++ b/crates/dd3_protocol/src/ack.rs @@ -0,0 +1,32 @@ +use crate::ACK_DOWN_PAYLOAD_LEN; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct AckDownPayload { + pub time_valid: bool, + pub batch_id: u16, + pub epoch_utc: u32, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum AckDecodeError { + LengthMismatch, +} + +pub fn encode_ack_down_payload(payload: AckDownPayload) -> [u8; ACK_DOWN_PAYLOAD_LEN] { + let mut out = [0u8; ACK_DOWN_PAYLOAD_LEN]; + out[0] = if payload.time_valid { 1 } else { 0 }; + out[1..3].copy_from_slice(&payload.batch_id.to_be_bytes()); + out[3..7].copy_from_slice(&payload.epoch_utc.to_be_bytes()); + out +} + +pub fn decode_ack_down_payload(bytes: &[u8]) -> Result { + if bytes.len() != ACK_DOWN_PAYLOAD_LEN { + return Err(AckDecodeError::LengthMismatch); + } + Ok(AckDownPayload { + time_valid: (bytes[0] & 0x01) != 0, + batch_id: u16::from_be_bytes([bytes[1], bytes[2]]), + epoch_utc: u32::from_be_bytes([bytes[3], bytes[4], bytes[5], bytes[6]]), + }) +} \ No newline at end of file diff --git a/crates/dd3_protocol/src/crc.rs b/crates/dd3_protocol/src/crc.rs new file mode 100644 index 0000000..76e7e0e --- /dev/null +++ b/crates/dd3_protocol/src/crc.rs @@ -0,0 +1,16 @@ +use core::num::Wrapping; + +pub fn crc16_ccitt(data: &[u8]) -> u16 { + let mut crc = Wrapping(0xFFFFu16); + for byte in data { + crc ^= Wrapping((*byte as u16) << 8); + for _ in 0..8 { + if (crc.0 & 0x8000) != 0 { + crc = Wrapping((crc.0 << 1) ^ 0x1021); + } else { + crc = Wrapping(crc.0 << 1); + } + } + } + crc.0 +} \ No newline at end of file diff --git a/crates/dd3_protocol/src/frame.rs b/crates/dd3_protocol/src/frame.rs new file mode 100644 index 0000000..53b2bae --- /dev/null +++ b/crates/dd3_protocol/src/frame.rs @@ -0,0 +1,76 @@ +use alloc::vec::Vec; + +use crate::crc16_ccitt; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u8)] +pub enum MsgKind { + BatchUp = 0, + AckDown = 1, +} + +impl MsgKind { + pub fn from_u8(value: u8) -> Option { + match value { + 0 => Some(Self::BatchUp), + 1 => Some(Self::AckDown), + _ => None, + } + } + + pub fn as_u8(self) -> u8 { + self as u8 + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Frame { + pub msg_kind: MsgKind, + pub short_id: u16, + pub payload: Vec, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FrameDecodeError { + LengthMismatch, + CrcFail, + InvalidMsgKind, +} + +pub fn encode_frame(msg_kind: MsgKind, short_id: u16, payload: &[u8]) -> Vec { + let mut out = Vec::with_capacity(payload.len() + 5); + out.push(msg_kind.as_u8()); + out.extend_from_slice(&short_id.to_be_bytes()); + out.extend_from_slice(payload); + let crc = crc16_ccitt(&out); + out.extend_from_slice(&crc.to_be_bytes()); + out +} + +pub fn decode_frame(frame: &[u8], max_msg_kind: u8) -> Result { + if frame.len() < 5 { + return Err(FrameDecodeError::LengthMismatch); + } + + let payload_len = frame.len() - 5; + let crc_calc = crc16_ccitt(&frame[..frame.len() - 2]); + let crc_rx = u16::from_be_bytes([frame[frame.len() - 2], frame[frame.len() - 1]]); + if crc_calc != crc_rx { + return Err(FrameDecodeError::CrcFail); + } + + let raw_kind = frame[0]; + if raw_kind > max_msg_kind { + return Err(FrameDecodeError::InvalidMsgKind); + } + let msg_kind = MsgKind::from_u8(raw_kind).ok_or(FrameDecodeError::InvalidMsgKind)?; + + let short_id = u16::from_be_bytes([frame[1], frame[2]]); + let payload = frame[3..3 + payload_len].to_vec(); + + Ok(Frame { + msg_kind, + short_id, + payload, + }) +} \ No newline at end of file diff --git a/crates/dd3_protocol/src/lib.rs b/crates/dd3_protocol/src/lib.rs new file mode 100644 index 0000000..885885d --- /dev/null +++ b/crates/dd3_protocol/src/lib.rs @@ -0,0 +1,29 @@ +#![cfg_attr(not(feature = "std"), no_std)] + +extern crate alloc; + +pub mod ack; +pub mod crc; +pub mod frame; +pub mod payload_v3; +pub mod reassembly; + +pub use ack::{decode_ack_down_payload, encode_ack_down_payload, AckDecodeError, AckDownPayload}; +pub use crc::crc16_ccitt; +pub use frame::{decode_frame, encode_frame, Frame, FrameDecodeError, MsgKind}; +pub use payload_v3::{ + decode_batch_v3, encode_batch_v3, BatchInputV3, PayloadDecodeError, PayloadEncodeError, +}; +pub use reassembly::{push_chunk, reset_reassembly, ReassemblyState, ReassemblyStatus}; + +pub const LORA_MAX_PAYLOAD: usize = 230; +pub const ACK_DOWN_PAYLOAD_LEN: usize = 7; + +pub const MIN_ACCEPTED_EPOCH_UTC: u32 = 1_769_904_000; +pub const SYNC_REQUEST_INTERVAL_MS: u32 = 15_000; +pub const METER_SAMPLE_INTERVAL_MS: u32 = 1_000; +pub const METER_SEND_INTERVAL_MS: u32 = 30_000; +pub const BATCH_MAX_RETRIES: u8 = 2; +pub const BATCH_QUEUE_DEPTH: usize = 10; +pub const ACK_REPEAT_COUNT: u8 = 3; +pub const ACK_REPEAT_DELAY_MS: u32 = 200; \ No newline at end of file diff --git a/crates/dd3_protocol/src/payload_v3.rs b/crates/dd3_protocol/src/payload_v3.rs new file mode 100644 index 0000000..9e44ee5 --- /dev/null +++ b/crates/dd3_protocol/src/payload_v3.rs @@ -0,0 +1,316 @@ +use alloc::vec::Vec; + +const MAGIC: u16 = 0xDDB3; +const SCHEMA: u8 = 3; +const FLAGS: u8 = 0x01; +const MAX_SAMPLES: usize = 30; +const PRESENT_MASK_VALID_BITS: u32 = 0x3FFF_FFFF; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BatchInputV3 { + pub sender_id: u16, + pub batch_id: u16, + pub t_last: u32, + pub present_mask: u32, + pub n: u8, + pub battery_mv: u16, + pub err_m: u8, + pub err_d: u8, + pub err_tx: u8, + pub err_last: u8, + pub err_rx_reject: u8, + pub energy_wh: [u32; MAX_SAMPLES], + pub p1_w: [i16; MAX_SAMPLES], + pub p2_w: [i16; MAX_SAMPLES], + pub p3_w: [i16; MAX_SAMPLES], +} + +impl Default for BatchInputV3 { + fn default() -> Self { + Self { + sender_id: 0, + batch_id: 0, + t_last: 0, + present_mask: 0, + n: 0, + battery_mv: 0, + err_m: 0, + err_d: 0, + err_tx: 0, + err_last: 0, + err_rx_reject: 0, + energy_wh: [0; MAX_SAMPLES], + p1_w: [0; MAX_SAMPLES], + p2_w: [0; MAX_SAMPLES], + p3_w: [0; MAX_SAMPLES], + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PayloadEncodeError { + InvalidN, + InvalidPresentMask, + BitCountMismatch, + InvalidSyncMask, + EnergyRegression, + VarintOverflow, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PayloadDecodeError { + LengthMismatch, + InvalidMagic, + InvalidSchema, + InvalidFlags, + InvalidN, + InvalidPresentMask, + BitCountMismatch, + InvalidSyncMask, + Truncated, + Overflow, + TrailingBytes, +} + +fn bit_count32(mut value: u32) -> u8 { + let mut count = 0u8; + while value != 0 { + value &= value - 1; + count = count.saturating_add(1); + } + count +} + +fn uleb128_encode(mut v: u32, out: &mut Vec) -> Result<(), PayloadEncodeError> { + for _ in 0..5 { + let mut byte = (v & 0x7F) as u8; + v >>= 7; + if v != 0 { + byte |= 0x80; + } + out.push(byte); + if v == 0 { + return Ok(()); + } + } + Err(PayloadEncodeError::VarintOverflow) +} + +fn uleb128_decode(buf: &[u8], pos: &mut usize) -> Result { + let mut result = 0u32; + let mut shift = 0u8; + for i in 0..5 { + if *pos >= buf.len() { + return Err(PayloadDecodeError::Truncated); + } + let byte = buf[*pos]; + *pos += 1; + + if i == 4 && (byte & 0xF0) != 0 { + return Err(PayloadDecodeError::Overflow); + } + + result |= ((byte & 0x7F) as u32) << shift; + if (byte & 0x80) == 0 { + return Ok(result); + } + shift = shift.saturating_add(7); + } + + Err(PayloadDecodeError::Overflow) +} + +fn zigzag32(x: i32) -> u32 { + ((x as u32) << 1) ^ ((x >> 31) as u32) +} + +fn unzigzag32(u: u32) -> i32 { + ((u >> 1) as i32) ^ -((u & 1) as i32) +} + +fn svarint_encode(x: i32, out: &mut Vec) -> Result<(), PayloadEncodeError> { + uleb128_encode(zigzag32(x), out) +} + +fn svarint_decode(buf: &[u8], pos: &mut usize) -> Result { + let u = uleb128_decode(buf, pos)?; + Ok(unzigzag32(u)) +} + +pub fn encode_batch_v3(input: &BatchInputV3) -> Result, PayloadEncodeError> { + if input.n as usize > MAX_SAMPLES { + return Err(PayloadEncodeError::InvalidN); + } + if (input.present_mask & !PRESENT_MASK_VALID_BITS) != 0 { + return Err(PayloadEncodeError::InvalidPresentMask); + } + if bit_count32(input.present_mask) != input.n { + return Err(PayloadEncodeError::BitCountMismatch); + } + if input.n == 0 && input.present_mask != 0 { + return Err(PayloadEncodeError::InvalidSyncMask); + } + + let mut out = Vec::with_capacity(128); + out.extend_from_slice(&MAGIC.to_le_bytes()); + out.push(SCHEMA); + out.push(FLAGS); + out.extend_from_slice(&input.sender_id.to_le_bytes()); + out.extend_from_slice(&input.batch_id.to_le_bytes()); + out.extend_from_slice(&input.t_last.to_le_bytes()); + out.extend_from_slice(&input.present_mask.to_le_bytes()); + out.push(input.n); + out.extend_from_slice(&input.battery_mv.to_le_bytes()); + out.push(input.err_m); + out.push(input.err_d); + out.push(input.err_tx); + out.push(input.err_last); + out.push(input.err_rx_reject); + + if input.n == 0 { + return Ok(out); + } + + let n = input.n as usize; + + out.extend_from_slice(&input.energy_wh[0].to_le_bytes()); + for i in 1..n { + if input.energy_wh[i] < input.energy_wh[i - 1] { + return Err(PayloadEncodeError::EnergyRegression); + } + let delta = input.energy_wh[i] - input.energy_wh[i - 1]; + uleb128_encode(delta, &mut out)?; + } + + let encode_phase = |phase: &[i16; MAX_SAMPLES], out: &mut Vec| -> Result<(), PayloadEncodeError> { + out.extend_from_slice(&phase[0].to_le_bytes()); + for i in 1..n { + let delta = phase[i] as i32 - phase[i - 1] as i32; + svarint_encode(delta, out)?; + } + Ok(()) + }; + + encode_phase(&input.p1_w, &mut out)?; + encode_phase(&input.p2_w, &mut out)?; + encode_phase(&input.p3_w, &mut out)?; + + Ok(out) +} + +pub fn decode_batch_v3(buf: &[u8]) -> Result { + if buf.len() < 24 { + return Err(PayloadDecodeError::LengthMismatch); + } + + let mut pos = 0usize; + + let magic = u16::from_le_bytes([buf[pos], buf[pos + 1]]); + pos += 2; + if magic != MAGIC { + return Err(PayloadDecodeError::InvalidMagic); + } + + let schema = buf[pos]; + pos += 1; + if schema != SCHEMA { + return Err(PayloadDecodeError::InvalidSchema); + } + + let flags = buf[pos]; + pos += 1; + if (flags & 0x01) == 0 { + return Err(PayloadDecodeError::InvalidFlags); + } + + let mut out = BatchInputV3::default(); + out.sender_id = u16::from_le_bytes([buf[pos], buf[pos + 1]]); + pos += 2; + out.batch_id = u16::from_le_bytes([buf[pos], buf[pos + 1]]); + pos += 2; + out.t_last = u32::from_le_bytes([buf[pos], buf[pos + 1], buf[pos + 2], buf[pos + 3]]); + pos += 4; + out.present_mask = u32::from_le_bytes([buf[pos], buf[pos + 1], buf[pos + 2], buf[pos + 3]]); + pos += 4; + out.n = buf[pos]; + pos += 1; + out.battery_mv = u16::from_le_bytes([buf[pos], buf[pos + 1]]); + pos += 2; + out.err_m = buf[pos]; + pos += 1; + out.err_d = buf[pos]; + pos += 1; + out.err_tx = buf[pos]; + pos += 1; + out.err_last = buf[pos]; + pos += 1; + out.err_rx_reject = buf[pos]; + pos += 1; + + if out.n as usize > MAX_SAMPLES { + return Err(PayloadDecodeError::InvalidN); + } + if (out.present_mask & !PRESENT_MASK_VALID_BITS) != 0 { + return Err(PayloadDecodeError::InvalidPresentMask); + } + if bit_count32(out.present_mask) != out.n { + return Err(PayloadDecodeError::BitCountMismatch); + } + if out.n == 0 && out.present_mask != 0 { + return Err(PayloadDecodeError::InvalidSyncMask); + } + + if out.n == 0 { + return if pos == buf.len() { + Ok(out) + } else { + Err(PayloadDecodeError::TrailingBytes) + }; + } + + let n = out.n as usize; + + if pos + 4 > buf.len() { + return Err(PayloadDecodeError::Truncated); + } + out.energy_wh[0] = u32::from_le_bytes([buf[pos], buf[pos + 1], buf[pos + 2], buf[pos + 3]]); + pos += 4; + + for i in 1..n { + let delta = uleb128_decode(buf, &mut pos)?; + let sum = out.energy_wh[i - 1] as u64 + delta as u64; + if sum > u32::MAX as u64 { + return Err(PayloadDecodeError::Overflow); + } + out.energy_wh[i] = sum as u32; + } + + let decode_phase = |dst: &mut [i16; MAX_SAMPLES], pos: &mut usize| -> Result<(), PayloadDecodeError> { + if *pos + 2 > buf.len() { + return Err(PayloadDecodeError::Truncated); + } + dst[0] = i16::from_le_bytes([buf[*pos], buf[*pos + 1]]); + *pos += 2; + let mut prev = dst[0] as i32; + for i in 1..n { + let delta = svarint_decode(buf, pos)?; + let value = prev + delta; + if value < i16::MIN as i32 || value > i16::MAX as i32 { + return Err(PayloadDecodeError::Overflow); + } + dst[i] = value as i16; + prev = value; + } + Ok(()) + }; + + decode_phase(&mut out.p1_w, &mut pos)?; + decode_phase(&mut out.p2_w, &mut pos)?; + decode_phase(&mut out.p3_w, &mut pos)?; + + if pos != buf.len() { + return Err(PayloadDecodeError::TrailingBytes); + } + + Ok(out) +} \ No newline at end of file diff --git a/crates/dd3_protocol/src/reassembly.rs b/crates/dd3_protocol/src/reassembly.rs new file mode 100644 index 0000000..8b260f7 --- /dev/null +++ b/crates/dd3_protocol/src/reassembly.rs @@ -0,0 +1,109 @@ +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct ReassemblyState { + pub active: bool, + pub batch_id: u16, + pub next_index: u8, + pub expected_chunks: u8, + pub total_len: u16, + pub received_len: u16, + pub last_rx_ms: u32, + pub timeout_ms: u32, +} + +impl Default for ReassemblyState { + fn default() -> Self { + Self { + active: false, + batch_id: 0, + next_index: 0, + expected_chunks: 0, + total_len: 0, + received_len: 0, + last_rx_ms: 0, + timeout_ms: 0, + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ReassemblyStatus { + InProgress, + Complete { complete_len: u16 }, + ErrorReset, +} + +pub fn reset_reassembly(state: &mut ReassemblyState) { + *state = ReassemblyState::default(); +} + +#[allow(clippy::too_many_arguments)] +pub fn push_chunk( + state: &mut ReassemblyState, + batch_id: u16, + chunk_index: u8, + chunk_count: u8, + total_len: u16, + chunk_data: &[u8], + now_ms: u32, + timeout_ms_for_new_batch: u32, + max_total_len: u16, + buffer: &mut [u8], +) -> ReassemblyStatus { + if chunk_data.len() > 0 && total_len == 0 { + reset_reassembly(state); + return ReassemblyStatus::ErrorReset; + } + + let expired = state.timeout_ms > 0 + && now_ms.wrapping_sub(state.last_rx_ms) > state.timeout_ms; + + if !state.active || batch_id != state.batch_id || expired { + if chunk_index != 0 { + reset_reassembly(state); + return ReassemblyStatus::ErrorReset; + } + if total_len == 0 || total_len > max_total_len || chunk_count == 0 { + reset_reassembly(state); + return ReassemblyStatus::ErrorReset; + } + + state.active = true; + state.batch_id = batch_id; + state.expected_chunks = chunk_count; + state.total_len = total_len; + state.received_len = 0; + state.next_index = 0; + state.last_rx_ms = now_ms; + state.timeout_ms = timeout_ms_for_new_batch; + } + + if !state.active || chunk_index != state.next_index || chunk_count != state.expected_chunks { + reset_reassembly(state); + return ReassemblyStatus::ErrorReset; + } + + let next_received = state.received_len as usize + chunk_data.len(); + if next_received > state.total_len as usize + || next_received > max_total_len as usize + || next_received > buffer.len() + { + reset_reassembly(state); + return ReassemblyStatus::ErrorReset; + } + + let start = state.received_len as usize; + let end = start + chunk_data.len(); + buffer[start..end].copy_from_slice(chunk_data); + + state.received_len = next_received as u16; + state.next_index = state.next_index.wrapping_add(1); + state.last_rx_ms = now_ms; + + if state.next_index == state.expected_chunks && state.received_len == state.total_len { + let complete_len = state.received_len; + reset_reassembly(state); + return ReassemblyStatus::Complete { complete_len }; + } + + ReassemblyStatus::InProgress +} \ No newline at end of file diff --git a/crates/dd3_protocol/tests/protocol_tests.rs b/crates/dd3_protocol/tests/protocol_tests.rs new file mode 100644 index 0000000..c9d8db6 --- /dev/null +++ b/crates/dd3_protocol/tests/protocol_tests.rs @@ -0,0 +1,347 @@ +use dd3_protocol::{ + crc16_ccitt, decode_ack_down_payload, decode_batch_v3, decode_frame, encode_ack_down_payload, + encode_batch_v3, encode_frame, push_chunk, AckDownPayload, BatchInputV3, FrameDecodeError, MsgKind, + ReassemblyState, ReassemblyStatus, +}; + +const FIX_FRAME_OK: &[u8] = include_bytes!("../../../fixtures/protocol/frames/batchup_f19c_payload_0102a5.bin"); +const FIX_FRAME_BAD_CRC: &[u8] = + include_bytes!("../../../fixtures/protocol/frames/batchup_f19c_payload_0102a5_bad_crc.bin"); + +const FIX_CHUNK_OK: &[u8] = include_bytes!("../../../fixtures/protocol/chunks/in_order_ok.bin"); +const FIX_CHUNK_MISSING: &[u8] = include_bytes!("../../../fixtures/protocol/chunks/missing_chunk.bin"); +const FIX_CHUNK_OUT_OF_ORDER: &[u8] = + include_bytes!("../../../fixtures/protocol/chunks/out_of_order_start.bin"); +const FIX_CHUNK_WRONG_LEN: &[u8] = + include_bytes!("../../../fixtures/protocol/chunks/wrong_total_len.bin"); + +const FIX_SYNC_EMPTY: &[u8] = include_bytes!("../../../fixtures/protocol/payload_v3/sync_empty.bin"); +const FIX_SPARSE_5: &[u8] = include_bytes!("../../../fixtures/protocol/payload_v3/sparse_5.bin"); +const FIX_FULL_30: &[u8] = include_bytes!("../../../fixtures/protocol/payload_v3/full_30.bin"); + +fn parse_chunk_records(bytes: &[u8]) -> Vec<(u16, u8, u8, u16, Vec)> { + let mut out = Vec::new(); + let mut pos = 0usize; + while pos < bytes.len() { + let batch_id = u16::from_le_bytes([bytes[pos], bytes[pos + 1]]); + pos += 2; + let idx = bytes[pos]; + pos += 1; + let count = bytes[pos]; + pos += 1; + let total_len = u16::from_le_bytes([bytes[pos], bytes[pos + 1]]); + pos += 2; + let chunk_len = bytes[pos] as usize; + pos += 1; + let chunk = bytes[pos..pos + chunk_len].to_vec(); + pos += chunk_len; + out.push((batch_id, idx, count, total_len, chunk)); + } + out +} + +fn fill_sparse_batch() -> BatchInputV3 { + let mut input = BatchInputV3::default(); + input.sender_id = 1; + input.batch_id = 42; + input.t_last = 1_700_000_000; + input.present_mask = (1u32 << 0) | (1u32 << 2) | (1u32 << 3) | (1u32 << 10) | (1u32 << 29); + input.n = 5; + input.battery_mv = 3750; + input.err_m = 2; + input.err_d = 1; + input.err_tx = 3; + input.err_last = 2; + input.err_rx_reject = 1; + + input.energy_wh[0] = 100_000; + input.energy_wh[1] = 100_001; + input.energy_wh[2] = 100_050; + input.energy_wh[3] = 100_050; + input.energy_wh[4] = 100_200; + + input.p1_w[0] = -120; + input.p1_w[1] = -90; + input.p1_w[2] = 1910; + input.p1_w[3] = -90; + input.p1_w[4] = 500; + + input.p2_w[0] = 50; + input.p2_w[1] = -1950; + input.p2_w[2] = 60; + input.p2_w[3] = 2060; + input.p2_w[4] = -10; + + input.p3_w[0] = 0; + input.p3_w[1] = 10; + input.p3_w[2] = -1990; + input.p3_w[3] = 10; + input.p3_w[4] = 20; + input +} + +fn fill_full_batch() -> BatchInputV3 { + let mut input = BatchInputV3::default(); + input.sender_id = 1; + input.batch_id = 0xBEEF; + input.t_last = 1_769_904_999; + input.present_mask = 0x3FFF_FFFF; + input.n = 30; + input.battery_mv = 4095; + input.err_m = 10; + input.err_d = 20; + input.err_tx = 30; + input.err_last = 3; + input.err_rx_reject = 6; + + for i in 0..30usize { + input.energy_wh[i] = 500_000 + (i as u32 * i as u32 * 3); + input.p1_w[i] = -1000 + (i as i16 * 25); + input.p2_w[i] = 500 - (i as i16 * 30); + input.p3_w[i] = if i % 2 == 0 { + 100 + (i as i16 * 5) + } else { + -100 + (i as i16 * 5) + }; + } + + input +} + +#[test] +fn crc16_known_vectors() { + assert_eq!(0x29B1, crc16_ccitt(b"123456789")); + assert_eq!(0x1C0F, crc16_ccitt(&[0x00, 0x01, 0x02, 0x03, 0x04])); +} + +#[test] +fn frame_encode_decode_and_crc_reject() { + let encoded = encode_frame(MsgKind::BatchUp, 0xF19C, &[0x01, 0x02, 0xA5]); + assert_eq!(FIX_FRAME_OK, encoded.as_slice()); + + let frame = decode_frame(FIX_FRAME_OK, MsgKind::AckDown as u8).unwrap(); + assert_eq!(MsgKind::BatchUp, frame.msg_kind); + assert_eq!(0xF19C, frame.short_id); + assert_eq!(&[0x01, 0x02, 0xA5], frame.payload.as_slice()); + + let err = decode_frame(FIX_FRAME_BAD_CRC, MsgKind::AckDown as u8).unwrap_err(); + assert_eq!(FrameDecodeError::CrcFail, err); +} + +#[test] +fn frame_rejects_invalid_msg_kind_and_short_length() { + let mut frame = encode_frame(MsgKind::BatchUp, 0xF19C, &[0x42]); + frame[0] = 2; + let crc = crc16_ccitt(&frame[..frame.len() - 2]); + let n = frame.len(); + frame[n - 2] = (crc >> 8) as u8; + frame[n - 1] = (crc & 0xFF) as u8; + + let bad_kind = decode_frame(&frame, MsgKind::AckDown as u8).unwrap_err(); + assert_eq!(FrameDecodeError::InvalidMsgKind, bad_kind); + + let short_len = decode_frame(&frame[..4], MsgKind::AckDown as u8).unwrap_err(); + assert_eq!(FrameDecodeError::LengthMismatch, short_len); +} + +#[test] +fn ack_payload_fixed_7byte_contract() { + let payload = AckDownPayload { + time_valid: true, + batch_id: 0x1234, + epoch_utc: 1_769_904_000, + }; + let encoded = encode_ack_down_payload(payload); + assert_eq!(7, encoded.len()); + assert_eq!(1, encoded[0]); + assert_eq!([0x12, 0x34], [encoded[1], encoded[2]]); + + let decoded = decode_ack_down_payload(&encoded).unwrap(); + assert_eq!(payload, decoded); + + assert!(decode_ack_down_payload(&encoded[..6]).is_err()); +} + +#[test] +fn chunk_reassembly_in_order_success() { + let records = parse_chunk_records(FIX_CHUNK_OK); + let mut state = ReassemblyState::default(); + let mut buffer = [0u8; 32]; + + let mut status = ReassemblyStatus::InProgress; + for (i, rec) in records.iter().enumerate() { + status = push_chunk( + &mut state, + rec.0, + rec.1, + rec.2, + rec.3, + &rec.4, + 1000 + (i as u32 * 100), + 5000, + 32, + &mut buffer, + ); + } + + assert_eq!(ReassemblyStatus::Complete { complete_len: 7 }, status); + assert_eq!(&[1, 2, 3, 4, 5, 6, 7], &buffer[..7]); + assert!(!state.active); +} + +#[test] +fn chunk_reassembly_missing_or_out_of_order_fails_deterministically() { + let records = parse_chunk_records(FIX_CHUNK_MISSING); + let mut state = ReassemblyState::default(); + let mut buffer = [0u8; 32]; + + let s0 = push_chunk( + &mut state, + records[0].0, + records[0].1, + records[0].2, + records[0].3, + &records[0].4, + 1000, + 5000, + 32, + &mut buffer, + ); + assert_eq!(ReassemblyStatus::InProgress, s0); + + let s1 = push_chunk( + &mut state, + records[1].0, + records[1].1, + records[1].2, + records[1].3, + &records[1].4, + 1100, + 5000, + 32, + &mut buffer, + ); + assert_eq!(ReassemblyStatus::ErrorReset, s1); + assert!(!state.active); + + let out_records = parse_chunk_records(FIX_CHUNK_OUT_OF_ORDER); + let s2 = push_chunk( + &mut state, + out_records[0].0, + out_records[0].1, + out_records[0].2, + out_records[0].3, + &out_records[0].4, + 1200, + 5000, + 32, + &mut buffer, + ); + assert_eq!(ReassemblyStatus::ErrorReset, s2); +} + +#[test] +fn chunk_reassembly_wrong_total_length_fails() { + let records = parse_chunk_records(FIX_CHUNK_WRONG_LEN); + let mut state = ReassemblyState::default(); + let mut buffer = [0u8; 8]; + + let s0 = push_chunk( + &mut state, + records[0].0, + records[0].1, + records[0].2, + records[0].3, + &records[0].4, + 1000, + 5000, + 8, + &mut buffer, + ); + assert_eq!(ReassemblyStatus::InProgress, s0); + + let s1 = push_chunk( + &mut state, + records[1].0, + records[1].1, + records[1].2, + records[1].3, + &records[1].4, + 1100, + 5000, + 8, + &mut buffer, + ); + assert_eq!(ReassemblyStatus::ErrorReset, s1); + assert!(!state.active); +} + +#[test] +fn payload_golden_vectors_roundtrip() { + let decoded_sync = decode_batch_v3(FIX_SYNC_EMPTY).unwrap(); + assert_eq!(1, decoded_sync.sender_id); + assert_eq!(0x1234, decoded_sync.batch_id); + assert_eq!(1_769_904_100, decoded_sync.t_last); + assert_eq!(0, decoded_sync.present_mask); + assert_eq!(0, decoded_sync.n); + assert_eq!(3750, decoded_sync.battery_mv); + + let decoded_sparse = decode_batch_v3(FIX_SPARSE_5).unwrap(); + assert_eq!(fill_sparse_batch(), decoded_sparse); + + let decoded_full = decode_batch_v3(FIX_FULL_30).unwrap(); + assert_eq!(fill_full_batch(), decoded_full); + + assert_eq!(FIX_SYNC_EMPTY, encode_batch_v3(&decoded_sync).unwrap().as_slice()); + assert_eq!(FIX_SPARSE_5, encode_batch_v3(&decoded_sparse).unwrap().as_slice()); + assert_eq!(FIX_FULL_30, encode_batch_v3(&decoded_full).unwrap().as_slice()); +} + +#[test] +fn payload_decode_rejects_bad_magic_schema_flags() { + let mut bad_magic = FIX_SPARSE_5.to_vec(); + bad_magic[0] = 0x00; + assert!(decode_batch_v3(&bad_magic).is_err()); + + let mut bad_schema = FIX_SPARSE_5.to_vec(); + bad_schema[2] = 0x02; + assert!(decode_batch_v3(&bad_schema).is_err()); + + let mut bad_flags = FIX_SPARSE_5.to_vec(); + bad_flags[3] = 0x00; + assert!(decode_batch_v3(&bad_flags).is_err()); +} + +#[test] +fn payload_decode_rejects_truncated_and_trailing() { + assert!(decode_batch_v3(&FIX_SPARSE_5[..FIX_SPARSE_5.len() - 1]).is_err()); + assert!(decode_batch_v3(&FIX_SPARSE_5[..12]).is_err()); + + let mut with_tail = FIX_SPARSE_5.to_vec(); + with_tail.push(0xAA); + assert!(decode_batch_v3(&with_tail).is_err()); +} + +#[test] +fn payload_encode_decode_reject_invalid_mask_and_n() { + let mut input = fill_sparse_batch(); + input.present_mask = 0x4000_0000; + assert!(encode_batch_v3(&input).is_err()); + + let mut input2 = fill_sparse_batch(); + input2.n = 31; + assert!(encode_batch_v3(&input2).is_err()); + + let mut input3 = fill_sparse_batch(); + input3.n = 0; + input3.present_mask = 1; + assert!(encode_batch_v3(&input3).is_err()); + + let mut invalid_bits = FIX_SPARSE_5.to_vec(); + invalid_bits[15] |= 0x40; + assert!(decode_batch_v3(&invalid_bits).is_err()); + + let mut bitcount_mismatch = FIX_SPARSE_5.to_vec(); + bitcount_mismatch[16] = 0x01; + assert!(decode_batch_v3(&bitcount_mismatch).is_err()); +} diff --git a/crates/dd3_sim/Cargo.toml b/crates/dd3_sim/Cargo.toml new file mode 100644 index 0000000..5702a45 --- /dev/null +++ b/crates/dd3_sim/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "dd3_sim" +version = "0.1.0" +edition = "2021" + +[dependencies] +dd3_protocol = { path = "../dd3_protocol" } +dd3_core = { path = "../dd3_core" } +dd3_contracts = { path = "../dd3_contracts" } \ No newline at end of file diff --git a/crates/dd3_sim/src/fake_clock.rs b/crates/dd3_sim/src/fake_clock.rs new file mode 100644 index 0000000..2ef9305 --- /dev/null +++ b/crates/dd3_sim/src/fake_clock.rs @@ -0,0 +1,47 @@ +use dd3_core::Clock; + +#[derive(Debug, Clone)] +pub struct FakeClock { + now_ms: u64, + now_utc: u32, + synced: bool, +} + +impl FakeClock { + pub fn new(now_ms: u64, now_utc: u32, synced: bool) -> Self { + Self { + now_ms, + now_utc, + synced, + } + } + + pub fn advance_ms(&mut self, delta_ms: u64) { + let old_sec = self.now_ms / 1000; + self.now_ms = self.now_ms.saturating_add(delta_ms); + let new_sec = self.now_ms / 1000; + if self.synced && new_sec > old_sec { + let delta_sec = (new_sec - old_sec).min(u32::MAX as u64) as u32; + self.now_utc = self.now_utc.saturating_add(delta_sec); + } + } + + pub fn set_time(&mut self, now_utc: u32, synced: bool) { + self.now_utc = now_utc; + self.synced = synced; + } +} + +impl Clock for FakeClock { + fn now_ms(&self) -> u64 { + self.now_ms + } + + fn now_utc(&self) -> u32 { + self.now_utc + } + + fn is_time_synced(&self) -> bool { + self.synced + } +} \ No newline at end of file diff --git a/crates/dd3_sim/src/fake_radio.rs b/crates/dd3_sim/src/fake_radio.rs new file mode 100644 index 0000000..7196692 --- /dev/null +++ b/crates/dd3_sim/src/fake_radio.rs @@ -0,0 +1,161 @@ +use std::cell::RefCell; +use std::rc::Rc; + +use dd3_core::Radio; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Endpoint { + Sender, + Receiver, +} + +#[derive(Debug, Clone, Copy)] +pub struct FakeRadioConfig { + pub loss_pct: u8, + pub duplicate_pct: u8, + pub max_delay_ms: u32, + pub seed: u64, +} + +impl Default for FakeRadioConfig { + fn default() -> Self { + Self { + loss_pct: 0, + duplicate_pct: 0, + max_delay_ms: 0, + seed: 0xC0FFEE, + } + } +} + +#[derive(Debug, Clone)] +struct AirPacket { + to: Endpoint, + deliver_at_ms: u64, + payload: Vec, +} + +#[derive(Debug)] +pub struct FakeRadioBus { + cfg: FakeRadioConfig, + rng_state: u64, + packets: Vec, +} + +impl FakeRadioBus { + pub fn new(cfg: FakeRadioConfig) -> Self { + Self { + cfg, + rng_state: cfg.seed, + packets: Vec::new(), + } + } + + fn next_rand(&mut self) -> u32 { + self.rng_state = self + .rng_state + .wrapping_mul(6364136223846793005) + .wrapping_add(1); + (self.rng_state >> 32) as u32 + } + + fn chance(&mut self, pct: u8) -> bool { + if pct == 0 { + return false; + } + (self.next_rand() % 100) < pct as u32 + } + + fn push_from(&mut self, from: Endpoint, now_ms: u64, payload: &[u8]) { + if self.chance(self.cfg.loss_pct) { + return; + } + + let to = match from { + Endpoint::Sender => Endpoint::Receiver, + Endpoint::Receiver => Endpoint::Sender, + }; + + let delay = if self.cfg.max_delay_ms == 0 { + 0 + } else { + self.next_rand() % (self.cfg.max_delay_ms + 1) + } as u64; + + self.packets.push(AirPacket { + to, + deliver_at_ms: now_ms.saturating_add(delay), + payload: payload.to_vec(), + }); + + if self.chance(self.cfg.duplicate_pct) { + let extra_delay = if self.cfg.max_delay_ms == 0 { + 1 + } else { + (self.next_rand() % (self.cfg.max_delay_ms + 1) + 1) as u64 + }; + self.packets.push(AirPacket { + to, + deliver_at_ms: now_ms.saturating_add(delay).saturating_add(extra_delay), + payload: payload.to_vec(), + }); + } + } + + fn pop_for(&mut self, to: Endpoint, now_ms: u64, window_ms: u32) -> Option> { + let deadline = now_ms.saturating_add(window_ms as u64); + + let mut pick: Option = None; + let mut best_time = u64::MAX; + + for (idx, pkt) in self.packets.iter().enumerate() { + if pkt.to != to { + continue; + } + if pkt.deliver_at_ms > deadline { + continue; + } + if pkt.deliver_at_ms < best_time { + best_time = pkt.deliver_at_ms; + pick = Some(idx); + } + } + + pick.map(|idx| self.packets.swap_remove(idx).payload) + } +} + +#[derive(Debug, Clone)] +pub struct RadioEndpoint { + side: Endpoint, + bus: Rc>, + now_ms: u64, +} + +impl RadioEndpoint { + pub fn new(side: Endpoint, bus: Rc>) -> Self { + Self { + side, + bus, + now_ms: 0, + } + } +} + +impl Radio for RadioEndpoint { + fn send_frame(&mut self, frame: &[u8]) -> bool { + self.bus + .borrow_mut() + .push_from(self.side, self.now_ms, frame); + true + } + + fn recv_frame(&mut self, window_ms: u32, now_ms: u64) -> Option> { + self.now_ms = now_ms; + self.bus.borrow_mut().pop_for(self.side, now_ms, window_ms) + } +} + +pub fn shared_bus(cfg: FakeRadioConfig) -> Rc> { + Rc::new(RefCell::new(FakeRadioBus::new(cfg))) +} diff --git a/crates/dd3_sim/src/lib.rs b/crates/dd3_sim/src/lib.rs new file mode 100644 index 0000000..c8365e4 --- /dev/null +++ b/crates/dd3_sim/src/lib.rs @@ -0,0 +1,7 @@ +pub mod fake_clock; +pub mod fake_radio; +pub mod scenario; + +pub use fake_clock::FakeClock; +pub use fake_radio::{Endpoint, FakeRadioBus, FakeRadioConfig, RadioEndpoint}; +pub use scenario::{MockPublisher, MockStatusSink, MockStorage, ScenarioRunner}; \ No newline at end of file diff --git a/crates/dd3_sim/src/scenario.rs b/crates/dd3_sim/src/scenario.rs new file mode 100644 index 0000000..c3f9407 --- /dev/null +++ b/crates/dd3_sim/src/scenario.rs @@ -0,0 +1,121 @@ +use std::cell::RefCell; +use std::rc::Rc; + +use dd3_core::{ + Publisher, ReceiverConfig, ReceiverPipeline, SenderConfig, SenderStateMachine, StatusSink, Storage, +}; + +use crate::fake_clock::FakeClock; +use crate::fake_radio::{shared_bus, Endpoint, FakeRadioConfig, RadioEndpoint}; + +#[derive(Debug, Default)] +pub struct MockPublisher { + pub state_messages: Vec<(String, String)>, + pub fault_messages: Vec<(String, String)>, + pub discovery_messages: Vec<(String, String)>, +} + +impl Publisher for MockPublisher { + fn publish_state(&mut self, device_id: &str, payload: &str) { + self.state_messages + .push((device_id.to_string(), payload.to_string())); + } + + fn publish_faults(&mut self, device_id: &str, payload: &str) { + self.fault_messages + .push((device_id.to_string(), payload.to_string())); + } + + fn publish_discovery(&mut self, device_id: &str, payload: &str) { + self.discovery_messages + .push((device_id.to_string(), payload.to_string())); + } +} + +#[derive(Debug, Default)] +pub struct MockStorage { + pub csv_lines: Vec<(String, String)>, +} + +impl Storage for MockStorage { + fn append_csv(&mut self, device_id: &str, line: &str) { + self.csv_lines + .push((device_id.to_string(), line.to_string())); + } +} + +#[derive(Debug, Default)] +pub struct MockStatusSink { + pub last_sender_phase: String, + pub last_receiver_status: String, +} + +impl StatusSink for MockStatusSink { + fn sender_phase(&mut self, phase: &str) { + self.last_sender_phase = phase.to_string(); + } + + fn receiver_status(&mut self, status: &str) { + self.last_receiver_status = status.to_string(); + } +} + +pub struct ScenarioRunner { + pub clock: FakeClock, + pub sender: SenderStateMachine, + pub receiver: ReceiverPipeline, + pub sender_radio: RadioEndpoint, + pub receiver_radio: RadioEndpoint, + pub publisher: MockPublisher, + pub storage: MockStorage, + pub status: MockStatusSink, + _bus: Rc>, +} + +impl ScenarioRunner { + pub fn new(radio_cfg: FakeRadioConfig) -> Self { + let bus = shared_bus(radio_cfg); + let sender_radio = RadioEndpoint::new(Endpoint::Sender, bus.clone()); + let receiver_radio = RadioEndpoint::new(Endpoint::Receiver, bus.clone()); + + Self { + clock: FakeClock::new(0, dd3_core::MIN_ACCEPTED_EPOCH_UTC, true), + sender: SenderStateMachine::new( + SenderConfig { + short_id: 0xF19C, + sender_id: 1, + device_id: "dd3-F19C".to_string(), + }, + 0, + ), + receiver: ReceiverPipeline::new(ReceiverConfig { + short_id: 0xBEEF, + device_id: "dd3-BEEF".to_string(), + expected_sender_ids: vec![0xF19C], + }), + sender_radio, + receiver_radio, + publisher: MockPublisher::default(), + storage: MockStorage::default(), + status: MockStatusSink::default(), + _bus: bus, + } + } + + pub fn tick(&mut self, ms: u64) { + for _ in 0..ms { + self.clock.advance_ms(1); + self.sender + .tick(&self.clock, &mut self.sender_radio, &mut self.status); + self.receiver.tick( + &self.clock, + &mut self.receiver_radio, + &mut self.publisher, + &mut self.storage, + &mut self.status, + ); + self.sender + .tick(&self.clock, &mut self.sender_radio, &mut self.status); + } + } +} \ No newline at end of file diff --git a/crates/dd3_sim/tests/state_machine_tests.rs b/crates/dd3_sim/tests/state_machine_tests.rs new file mode 100644 index 0000000..fa5d5bd --- /dev/null +++ b/crates/dd3_sim/tests/state_machine_tests.rs @@ -0,0 +1,214 @@ +use std::collections::HashSet; + +use dd3_core::{Clock, Radio, ReceiverConfig, ReceiverPipeline, SenderConfig, SenderStateMachine, StatusSink}; +use dd3_protocol::{encode_frame, MsgKind}; +use dd3_sim::{ + Endpoint, FakeClock, FakeRadioConfig, MockPublisher, MockStatusSink, MockStorage, RadioEndpoint, + ScenarioRunner, +}; + +#[derive(Default)] +struct NoopStatus; +impl StatusSink for NoopStatus {} + +fn make_sync_ack(batch_id: u16, epoch: u32) -> Vec { + let payload = dd3_protocol::encode_ack_down_payload(dd3_protocol::AckDownPayload { + time_valid: true, + batch_id, + epoch_utc: epoch, + }); + encode_frame(MsgKind::AckDown, 0xBEEF, &payload) +} + +#[test] +fn no_receiver_present_stays_unsynced_and_sync_only() { + let bus = dd3_sim::fake_radio::shared_bus(FakeRadioConfig::default()); + let mut sender_radio = RadioEndpoint::new(Endpoint::Sender, bus.clone()); + let mut sender = SenderStateMachine::new( + SenderConfig { + short_id: 0xF19C, + sender_id: 1, + device_id: "dd3-F19C".to_string(), + }, + 0, + ); + let mut clock = FakeClock::new(0, dd3_core::MIN_ACCEPTED_EPOCH_UTC, true); + let mut status = NoopStatus::default(); + + for _ in 0..90_000u64 { + clock.advance_ms(1); + sender.tick(&clock, &mut sender_radio, &mut status); + } + + let stats = sender.stats(); + assert!(!sender.is_time_acquired()); + assert_eq!(0, stats.build_count); + assert_eq!(0, stats.queue_depth); +} + +#[test] +fn receiver_bootstrap_unlocks_sampling_and_batches() { + let mut runner = ScenarioRunner::new(FakeRadioConfig::default()); + + // Initial unsynced period should result in sync request + ACK bootstrap. + runner.tick(20_000); + assert!(runner.sender.is_time_acquired()); + + // After unlock, sender should sample and send normal batches. + runner.tick(35_000); + assert!(!runner.publisher.state_messages.is_empty()); + assert!(!runner.storage.csv_lines.is_empty()); +} + +#[test] +fn packet_loss_eventually_progresses_without_duplicate_commit() { + let mut runner = ScenarioRunner::new(FakeRadioConfig { + loss_pct: 20, + duplicate_pct: 10, + max_delay_ms: 80, + seed: 0xBADC0DE, + }); + + runner.tick(180_000); + + let sender_stats = runner.sender.stats(); + assert!(sender_stats.last_acked_batch_id > 0); + assert!(!runner.publisher.state_messages.is_empty()); + + let mut uniq = HashSet::new(); + for (topic, payload) in &runner.publisher.state_messages { + let key = format!("{topic}|{payload}"); + assert!(uniq.insert(key), "duplicate committed state payload detected"); + } +} + +#[test] +fn ack_mismatch_is_ignored_and_inflight_unchanged() { + let bus = dd3_sim::fake_radio::shared_bus(FakeRadioConfig::default()); + let mut sender_radio = RadioEndpoint::new(Endpoint::Sender, bus.clone()); + let mut inject_radio = RadioEndpoint::new(Endpoint::Receiver, bus.clone()); + + let mut sender = SenderStateMachine::new( + SenderConfig { + short_id: 0xF19C, + sender_id: 1, + device_id: "dd3-F19C".to_string(), + }, + 0, + ); + let mut clock = FakeClock::new(0, dd3_core::MIN_ACCEPTED_EPOCH_UTC, true); + let mut status = MockStatusSink::default(); + + // Trigger sync-request send so we have inflight batch_id=1 and ack pending. + clock.advance_ms(15_000); + sender.tick(&clock, &mut sender_radio, &mut status); + assert!(sender.stats().ack_pending); + + let bad_ack = make_sync_ack(999, dd3_core::MIN_ACCEPTED_EPOCH_UTC + 5); + assert!(inject_radio.send_frame(&bad_ack)); + + sender.tick(&clock, &mut sender_radio, &mut status); + let stats = sender.stats(); + assert!(stats.ack_pending, "wrong ack must not clear inflight"); + assert_eq!(0, stats.last_acked_batch_id); +} + +#[test] +fn backpressure_queue_depth_stays_bounded() { + let bus = dd3_sim::fake_radio::shared_bus(FakeRadioConfig::default()); + let mut sender_radio = RadioEndpoint::new(Endpoint::Sender, bus.clone()); + let mut inject_radio = RadioEndpoint::new(Endpoint::Receiver, bus.clone()); + + let mut sender = SenderStateMachine::new( + SenderConfig { + short_id: 0xF19C, + sender_id: 1, + device_id: "dd3-F19C".to_string(), + }, + 0, + ); + let mut clock = FakeClock::new(0, dd3_core::MIN_ACCEPTED_EPOCH_UTC, true); + let mut status = MockStatusSink::default(); + + // Bootstrap one valid ACK so sender enters normal mode. + clock.advance_ms(15_000); + sender.tick(&clock, &mut sender_radio, &mut status); + let ack = make_sync_ack(1, dd3_core::MIN_ACCEPTED_EPOCH_UTC + 1); + assert!(inject_radio.send_frame(&ack)); + sender.tick(&clock, &mut sender_radio, &mut status); + assert!(sender.is_time_acquired()); + + for _ in 0..600_000u64 { + clock.advance_ms(1); + sender.tick(&clock, &mut sender_radio, &mut status); + } + + assert!(sender.stats().queue_depth as usize <= dd3_core::BATCH_QUEUE_DEPTH); +} + +#[test] +fn duplicate_batch_updates_counters_but_suppresses_publish_and_log() { + let bus = dd3_sim::fake_radio::shared_bus(FakeRadioConfig::default()); + let mut tx_radio = RadioEndpoint::new(Endpoint::Sender, bus.clone()); + let mut rx_radio = RadioEndpoint::new(Endpoint::Receiver, bus.clone()); + + let mut receiver = ReceiverPipeline::new(ReceiverConfig { + short_id: 0xBEEF, + device_id: "dd3-BEEF".to_string(), + expected_sender_ids: vec![0xF19C], + }); + + let mut clock = FakeClock::new(0, dd3_core::MIN_ACCEPTED_EPOCH_UTC + 100, true); + let mut pubsub = MockPublisher::default(); + let mut storage = MockStorage::default(); + let mut status = MockStatusSink::default(); + + let payload = include_bytes!("../../../fixtures/protocol/payload_v3/sparse_5.bin"); + let batch_id = 42u16; + let total_len = payload.len() as u16; + let chunk_size = dd3_core::BATCH_CHUNK_PAYLOAD; + let chunks = ((payload.len() + chunk_size - 1) / chunk_size) as u8; + + { + let mut offset = 0usize; + for idx in 0..chunks { + let part_len = (payload.len() - offset).min(chunk_size); + let mut pkt_payload = Vec::new(); + pkt_payload.extend_from_slice(&batch_id.to_le_bytes()); + pkt_payload.push(idx); + pkt_payload.push(chunks); + pkt_payload.extend_from_slice(&total_len.to_le_bytes()); + pkt_payload.extend_from_slice(&payload[offset..offset + part_len]); + offset += part_len; + let frame = encode_frame(MsgKind::BatchUp, 0xF19C, &pkt_payload); + assert!(tx_radio.send_frame(&frame)); + } + receiver.tick(&clock, &mut rx_radio, &mut pubsub, &mut storage, &mut status); + } + let first_state = pubsub.state_messages.len(); + let first_csv = storage.csv_lines.len(); + + { + let mut offset = 0usize; + for idx in 0..chunks { + let part_len = (payload.len() - offset).min(chunk_size); + let mut pkt_payload = Vec::new(); + pkt_payload.extend_from_slice(&batch_id.to_le_bytes()); + pkt_payload.push(idx); + pkt_payload.push(chunks); + pkt_payload.extend_from_slice(&total_len.to_le_bytes()); + pkt_payload.extend_from_slice(&payload[offset..offset + part_len]); + offset += part_len; + let frame = encode_frame(MsgKind::BatchUp, 0xF19C, &pkt_payload); + assert!(tx_radio.send_frame(&frame)); + } + receiver.tick(&clock, &mut rx_radio, &mut pubsub, &mut storage, &mut status); + } + + let statuses = receiver.sender_statuses(); + assert_eq!(2, statuses[0].rx_batches_total); + assert_eq!(1, statuses[0].rx_batches_duplicate); + + assert_eq!(first_state, pubsub.state_messages.len(), "duplicate should not republish state"); + assert_eq!(first_csv, storage.csv_lines.len(), "duplicate should not duplicate csv log"); +} diff --git a/crates/xtask/Cargo.toml b/crates/xtask/Cargo.toml new file mode 100644 index 0000000..8129c55 --- /dev/null +++ b/crates/xtask/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "xtask" +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow = "1" +regex = "1" \ No newline at end of file diff --git a/crates/xtask/src/main.rs b/crates/xtask/src/main.rs new file mode 100644 index 0000000..f27c7fb --- /dev/null +++ b/crates/xtask/src/main.rs @@ -0,0 +1,234 @@ +use std::fs; +use std::path::{Path, PathBuf}; + +use anyhow::{anyhow, Context, Result}; +use regex::Regex; + +const BASELINE_COMMIT: &str = "a3c61f9b929fbc55bfb502b443fba2f98023b3f1"; + +fn main() -> Result<()> { + let mut args = std::env::args().skip(1); + let cmd = args.next().unwrap_or_default(); + + match cmd.as_str() { + "sync-fixtures" => sync_fixtures(), + "check-manufacturer" => check_manufacturer(), + "verify-fixture-sources" => verify_fixture_sources(), + _ => { + eprintln!("usage: cargo run -p xtask -- "); + Err(anyhow!("unknown command")) + } + } +} + +fn repo_root() -> Result { + let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + let root = manifest_dir + .parent() + .and_then(|p| p.parent()) + .ok_or_else(|| anyhow!("failed to resolve workspace root"))?; + Ok(root.to_path_buf()) +} + +fn parse_cpp_array(src: &str, name: &str) -> Result> { + let re = Regex::new(&format!( + r"(?s)static\s+const\s+uint8_t\s+{}\[\]\s*=\s*\{{(?P.*?)\}};", + name + ))?; + let caps = re + .captures(src) + .ok_or_else(|| anyhow!("array {name} not found"))?; + let body = caps.name("body").unwrap().as_str(); + let item_re = Regex::new(r"0x([0-9A-Fa-f]{1,2})")?; + let mut out = Vec::new(); + for cap in item_re.captures_iter(body) { + let byte = u8::from_str_radix(&cap[1], 16)?; + out.push(byte); + } + Ok(out) +} + +fn canonicalize_full_30_vector(mut bytes: Vec) -> Vec { + // Upstream pinned baseline vector in test_payload_codec.cpp is truncated by two + // final p3 signed-delta varints (expected +205, -195 => 0x9A 0x03 0x85 0x03). + // Keep raw provenance separately and write canonical bytes for host tests. + if bytes.len() == 183 { + bytes.extend_from_slice(&[0x9A, 0x03, 0x85, 0x03]); + } + bytes +} + +fn crc16_ccitt(data: &[u8]) -> u16 { + let mut crc: u16 = 0xFFFF; + for byte in data { + crc ^= (*byte as u16) << 8; + for _ in 0..8 { + if (crc & 0x8000) != 0 { + crc = (crc << 1) ^ 0x1021; + } else { + crc <<= 1; + } + } + } + crc +} + +fn write_bin(path: &Path, bytes: &[u8]) -> Result<()> { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent)?; + } + fs::write(path, bytes)?; + Ok(()) +} + +fn sync_fixtures() -> Result<()> { + let root = repo_root()?; + + let payload_test = root.join("vendor/dd3-cpp/test/test_payload_codec/test_payload_codec.cpp"); + let payload_src = fs::read_to_string(&payload_test).with_context(|| { + format!("failed reading {}", payload_test.display()) + })?; + + let sync_empty = parse_cpp_array(&payload_src, "VECTOR_SYNC_EMPTY")?; + let sparse_5 = parse_cpp_array(&payload_src, "VECTOR_SPARSE_5")?; + let full_30_raw = parse_cpp_array(&payload_src, "VECTOR_FULL_30")?; + let full_30 = canonicalize_full_30_vector(full_30_raw.clone()); + + write_bin(&root.join("fixtures/protocol/payload_v3/sync_empty.bin"), &sync_empty)?; + write_bin(&root.join("fixtures/protocol/payload_v3/sparse_5.bin"), &sparse_5)?; + write_bin(&root.join("fixtures/protocol/payload_v3/full_30.bin"), &full_30)?; + write_bin( + &root.join("fixtures/protocol/payload_v3/full_30_upstream_raw.bin"), + &full_30_raw, + )?; + + let frame_payload = [0x01u8, 0x02, 0xA5]; + let mut frame = vec![0x00, 0xF1, 0x9C]; + frame.extend_from_slice(&frame_payload); + let crc = crc16_ccitt(&frame); + frame.extend_from_slice(&crc.to_be_bytes()); + write_bin(&root.join("fixtures/protocol/frames/batchup_f19c_payload_0102a5.bin"), &frame)?; + + let mut frame_bad = frame.clone(); + let last = frame_bad.len() - 1; + frame_bad[last] ^= 0x01; + write_bin(&root.join("fixtures/protocol/frames/batchup_f19c_payload_0102a5_bad_crc.bin"), &frame_bad)?; + + let mut chunk_ok = Vec::new(); + // record format: [batch_id_le:2][idx:1][count:1][total_len_le:2][chunk_len:1][chunk_data] + chunk_ok.extend_from_slice(&77u16.to_le_bytes()); + chunk_ok.extend_from_slice(&[0, 3]); + chunk_ok.extend_from_slice(&7u16.to_le_bytes()); + chunk_ok.extend_from_slice(&[3, 1, 2, 3]); + chunk_ok.extend_from_slice(&77u16.to_le_bytes()); + chunk_ok.extend_from_slice(&[1, 3]); + chunk_ok.extend_from_slice(&7u16.to_le_bytes()); + chunk_ok.extend_from_slice(&[2, 4, 5]); + chunk_ok.extend_from_slice(&77u16.to_le_bytes()); + chunk_ok.extend_from_slice(&[2, 3]); + chunk_ok.extend_from_slice(&7u16.to_le_bytes()); + chunk_ok.extend_from_slice(&[2, 6, 7]); + write_bin(&root.join("fixtures/protocol/chunks/in_order_ok.bin"), &chunk_ok)?; + + let mut chunk_missing = Vec::new(); + chunk_missing.extend_from_slice(&10u16.to_le_bytes()); + chunk_missing.extend_from_slice(&[0, 3]); + chunk_missing.extend_from_slice(&6u16.to_le_bytes()); + chunk_missing.extend_from_slice(&[2, 9, 8]); + chunk_missing.extend_from_slice(&10u16.to_le_bytes()); + chunk_missing.extend_from_slice(&[2, 3]); + chunk_missing.extend_from_slice(&6u16.to_le_bytes()); + chunk_missing.extend_from_slice(&[2, 7, 6]); + write_bin(&root.join("fixtures/protocol/chunks/missing_chunk.bin"), &chunk_missing)?; + + let mut chunk_wrong_total = Vec::new(); + chunk_wrong_total.extend_from_slice(&55u16.to_le_bytes()); + chunk_wrong_total.extend_from_slice(&[0, 2]); + chunk_wrong_total.extend_from_slice(&5u16.to_le_bytes()); + chunk_wrong_total.extend_from_slice(&[3, 1, 2, 3]); + chunk_wrong_total.extend_from_slice(&55u16.to_le_bytes()); + chunk_wrong_total.extend_from_slice(&[1, 2]); + chunk_wrong_total.extend_from_slice(&5u16.to_le_bytes()); + chunk_wrong_total.extend_from_slice(&[3, 4, 5, 6]); + write_bin(&root.join("fixtures/protocol/chunks/wrong_total_len.bin"), &chunk_wrong_total)?; + + let sources = format!( + "# Fixture Sources\n\n- Baseline repository: C3MA/DD3-LoRa-Bridge-MultiSender\n- Baseline branch: lora-refactor\n- Baseline commit: {BASELINE_COMMIT}\n- Payload vectors: vendor/dd3-cpp/test/test_payload_codec/test_payload_codec.cpp\n- Payload note: VECTOR_FULL_30 in pinned commit is 183-byte upstream raw (stored as full_30_upstream_raw.bin); canonical full_30.bin appends final two p3 deltas `9A 03 85 03` to satisfy baseline codec semantics.\n- Frame/chunk vectors: derived from vendor/dd3-cpp/test/test_lora_transport/test_lora_transport.cpp semantics\n" + ); + fs::write(root.join("fixtures/protocol/SOURCES.md"), sources)?; + + println!("fixtures synced"); + Ok(()) +} + +fn check_manufacturer() -> Result<()> { + let root = repo_root()?; + let mut offenders = Vec::new(); + + fn walk(dir: &Path, files: &mut Vec) -> Result<()> { + for entry in fs::read_dir(dir)? { + let entry = entry?; + let path = entry.path(); + if path.is_dir() { + walk(&path, files)?; + } else if path.extension().and_then(|x| x.to_str()) == Some("rs") { + files.push(path); + } + } + Ok(()) + } + + let mut files = Vec::new(); + walk(&root.join("crates"), &mut files)?; + + for file in files { + let txt = fs::read_to_string(&file)?; + if txt.contains("\"AcidBurns\"") { + let file_norm = file.to_string_lossy().replace('\\', "/"); + let allow = file_norm.ends_with("/crates/dd3_contracts/src/lib.rs") + || file_norm.ends_with("/crates/dd3_contracts/tests/contracts_tests.rs"); + if !allow { + offenders.push(file); + } + } + } + + if offenders.is_empty() { + println!("manufacturer drift check passed"); + Ok(()) + } else { + Err(anyhow!( + "unexpected hardcoded manufacturer literal(s): {:?}", + offenders + )) + } +} + +fn verify_fixture_sources() -> Result<()> { + let root = repo_root()?; + let path = root.join("fixtures/protocol/SOURCES.md"); + let txt = fs::read_to_string(&path) + .with_context(|| format!("missing {}", path.display()))?; + + if !txt.contains(BASELINE_COMMIT) { + return Err(anyhow!("SOURCES.md does not contain baseline commit")); + } + + let required = [ + "fixtures/protocol/payload_v3/sync_empty.bin", + "fixtures/protocol/payload_v3/sparse_5.bin", + "fixtures/protocol/payload_v3/full_30.bin", + "fixtures/protocol/frames/batchup_f19c_payload_0102a5.bin", + "fixtures/protocol/chunks/in_order_ok.bin", + ]; + + for rel in required { + let full = root.join(rel); + if !full.exists() { + return Err(anyhow!("missing fixture {rel}")); + } + } + + println!("fixture source metadata verified"); + Ok(()) +} diff --git a/docs/INTEROP_CHECKLIST.md b/docs/INTEROP_CHECKLIST.md new file mode 100644 index 0000000..af10456 --- /dev/null +++ b/docs/INTEROP_CHECKLIST.md @@ -0,0 +1,36 @@ +# Interop Checklist + +## A) Rust Receiver <-> C++ Sender +1. Run C++ sender on `lora-refactor` baseline and Rust receiver host simulation harness. +2. Verify receiver accepts sender short-id and sender-id mapping. +3. Verify ACK payload bytes: +- `[flags:1][batch_id_be:2][epoch_utc_be:4]` +4. Verify duplicate batch handling: +- ACK still sent +- duplicate counters increment +- publish/log suppressed +5. Compare MQTT state and fault payload keys/semantics against baseline. +6. Compare CSV line/header output for shared sample windows. + +## B) Rust Sender <-> C++ Receiver +1. Run Rust sender with sync-request startup behavior. +2. Confirm C++ receiver decodes payload schema v3 and sparse mask reconstruction. +3. Confirm ACK mismatch handling does not clear inflight batch. +4. Confirm retry and catch-up behavior under injected packet loss. + +## C) Contract Comparison +1. Home Assistant discovery topics and payload fields: +- topic: `homeassistant/sensor///config` +- `unique_id`, `device.identifiers`, `device.name`, `device.model`, `device.manufacturer` +2. MQTT state payload: +- required keys present +- legacy keys absent +3. CSV output: +- exact header order +- stable numeric formatting + +## D) Port Validation Items +- Sender unsynced boot sends only sync-requests. +- ACK bootstrap gate enforced (`time_valid=1` and epoch >= `MIN_ACCEPTED_EPOCH_UTC`). +- Frame/CRC/ACK/payload golden fixtures all pass. +- All protocol encode/decode paths covered by fixture-backed tests. \ No newline at end of file diff --git a/docs/SPEC_LINKS.md b/docs/SPEC_LINKS.md new file mode 100644 index 0000000..3009ea7 --- /dev/null +++ b/docs/SPEC_LINKS.md @@ -0,0 +1,36 @@ +# SPEC Links and Constant Extraction + +Authoritative behavior spec: +- https://git.mannheim.ccc.de/C3MA/DD3-LoRa-Bridge-MultiSender/src/branch/lora-refactor/Requirements.md + +Pinned baseline commit for fixture provenance: +- `a3c61f9b929fbc55bfb502b443fba2f98023b3f1` + +Primary baseline sources used: +- `test/test_lora_transport/test_lora_transport.cpp` +- `test/test_payload_codec/test_payload_codec.cpp` +- `test/test_json_codec/test_json_codec.cpp` +- `test/test_html_escape/test_html_escape.cpp` +- `include/config.h` +- `src/sender_state_machine.cpp` +- `src/receiver_pipeline.cpp` +- `lib/dd3_transport_logic/src/lora_frame_logic.cpp` +- `lib/dd3_transport_logic/src/batch_reassembly_logic.cpp` +- `lib/dd3_legacy_core/src/payload_codec.cpp` + +Extracted constants locked in this port: +- `MIN_ACCEPTED_EPOCH_UTC = 1769904000` +- `SYNC_REQUEST_INTERVAL_MS = 15000` +- `METER_SAMPLE_INTERVAL_MS = 1000` +- `METER_SEND_INTERVAL_MS = 30000` +- `BATCH_MAX_RETRIES = 2` +- `BATCH_QUEUE_DEPTH = 10` +- `ACK_REPEAT_COUNT = 3` +- `ACK_REPEAT_DELAY_MS = 200` +- `LoraMsgKind::BatchUp = 0`, `LoraMsgKind::AckDown = 1` +- ACK payload fixed size `7` +- Home Assistant manufacturer exact string: `AcidBurns` + +Fixture provenance: +- `fixtures/protocol/SOURCES.md` +- `xtask sync-fixtures` refreshes payload/frame/chunk fixtures. \ No newline at end of file diff --git a/docs/TEST_STRATEGY.md b/docs/TEST_STRATEGY.md new file mode 100644 index 0000000..72bd777 --- /dev/null +++ b/docs/TEST_STRATEGY.md @@ -0,0 +1,63 @@ +# Test Strategy + +This repository uses four host-side test pillars to preserve compatibility and deterministic behavior. + +## 1) Byte-exact protocol compatibility +- Crate: `dd3_protocol` +- Scope: + - CRC16-CCITT vectors + - Frame encode/decode byte identity + - ACK payload byte identity + - Chunk reassembly deterministic reset semantics + - Payload schema v3 decode/re-encode golden vectors +- Fixtures: + - `fixtures/protocol/frames/*.bin` + - `fixtures/protocol/chunks/*.bin` + - `fixtures/protocol/payload_v3/*.bin` + +## 2) Contract stability +- Crate: `dd3_contracts` +- Scope: + - Home Assistant discovery JSON contract + - MQTT state JSON key/semantic stability + - CSV header and line format stability + - sanitize/html/url behavior including adversarial inputs + - manufacturer drift guard +- Fixtures: + - `fixtures/contracts/ha_discovery/*.json` + - `fixtures/contracts/mqtt_state/*.json` + - `fixtures/contracts/sd_csv/*.csv` + +## 3) Deterministic state machine behavior +- Crates: `dd3_core`, `dd3_sim` +- Scope: + - unsynced sender sync-request cadence + - time bootstrap unlock via valid ACK + - stop-and-wait retry behavior under loss/backpressure + - ACK mismatch handling + - duplicate-batch suppression on receiver publish/log paths + +## 4) Robustness fuzz/property checks +- `fuzz/` targets: + - `frame_decode` + - `chunk_stream_ingest` + - `payload_decode_v3` + - `sanitize_device_id` + - `url_encode_component` + +## Running Tests (host only) +Prerequisite: install Rust toolchain (`rustup`, `cargo`, `rustfmt`, `clippy`). + +- Workspace tests: + - `make test` + - or `cargo test --workspace` +- Lint: + - `make lint` +- Fuzz smoke (optional, skipped when `cargo-fuzz` missing): + - `make fuzz-smoke` + +## Fixture Refresh +- Ensure baseline subtree exists at `vendor/dd3-cpp` pinned to baseline commit. +- Run: + - `cargo run -p xtask -- sync-fixtures` + - `cargo run -p xtask -- verify-fixture-sources` \ No newline at end of file diff --git a/fixtures/contracts/ha_discovery/energy.json b/fixtures/contracts/ha_discovery/energy.json new file mode 100644 index 0000000..1712ce2 --- /dev/null +++ b/fixtures/contracts/ha_discovery/energy.json @@ -0,0 +1 @@ +{"name":"dd3-F19C Energy","state_topic":"smartmeter/dd3-F19C/state","unique_id":"dd3-F19C_energy","unit_of_measurement":"kWh","device_class":"energy","value_template":"{{ value_json.e_kwh }}","device":{"identifiers":["dd3-F19C"],"name":"dd3-F19C","model":"DD3-LoRa-Bridge","manufacturer":"AcidBurns"}} \ No newline at end of file diff --git a/fixtures/contracts/mqtt_state/sample.json b/fixtures/contracts/mqtt_state/sample.json new file mode 100644 index 0000000..cc97559 --- /dev/null +++ b/fixtures/contracts/mqtt_state/sample.json @@ -0,0 +1 @@ +{"id":"F19C","ts":1769905000,"e_kwh":1234.57,"p_w":322,"p1_w":100,"p2_w":110,"p3_w":111,"bat_v":3.88,"bat_pct":77,"rssi":-71,"snr":7.25,"err_m":1,"err_d":2,"err_tx":3,"err_last":2,"rx_reject":1,"rx_reject_text":"crc_fail"} \ No newline at end of file diff --git a/fixtures/contracts/sd_csv/sample.csv b/fixtures/contracts/sd_csv/sample.csv new file mode 100644 index 0000000..7d7229c --- /dev/null +++ b/fixtures/contracts/sd_csv/sample.csv @@ -0,0 +1,2 @@ +ts_utc,ts_hms_local,p_w,p1_w,p2_w,p3_w,e_kwh,bat_v,bat_pct,rssi,snr,err_m,err_d,err_tx,err_last +1769905000,12:34:56,321.6,100.4,110.4,110.8,1234.568,3.88,77,-71,7.3,1,2,3,decode diff --git a/fixtures/meter/iec_samples/README.md b/fixtures/meter/iec_samples/README.md new file mode 100644 index 0000000..6a79c40 --- /dev/null +++ b/fixtures/meter/iec_samples/README.md @@ -0,0 +1,3 @@ +Placeholder fixtures for IEC meter parser phase. + +Host-side parser tests will add representative OBIS sample text frames here. \ No newline at end of file diff --git a/fixtures/protocol/SOURCES.md b/fixtures/protocol/SOURCES.md new file mode 100644 index 0000000..c80928b --- /dev/null +++ b/fixtures/protocol/SOURCES.md @@ -0,0 +1,8 @@ +# Fixture Sources + +- Baseline repository: C3MA/DD3-LoRa-Bridge-MultiSender +- Baseline branch: lora-refactor +- Baseline commit: a3c61f9b929fbc55bfb502b443fba2f98023b3f1 +- Payload vectors: vendor/dd3-cpp/test/test_payload_codec/test_payload_codec.cpp +- Payload note: VECTOR_FULL_30 in pinned commit is 183-byte upstream raw (stored as full_30_upstream_raw.bin); canonical full_30.bin appends final two p3 deltas `9A 03 85 03` to satisfy baseline codec semantics. +- Frame/chunk vectors: derived from vendor/dd3-cpp/test/test_lora_transport/test_lora_transport.cpp semantics diff --git a/fixtures/protocol/chunks/in_order_ok.bin b/fixtures/protocol/chunks/in_order_ok.bin new file mode 100644 index 0000000..82a2f36 Binary files /dev/null and b/fixtures/protocol/chunks/in_order_ok.bin differ diff --git a/fixtures/protocol/chunks/missing_chunk.bin b/fixtures/protocol/chunks/missing_chunk.bin new file mode 100644 index 0000000..612c06d Binary files /dev/null and b/fixtures/protocol/chunks/missing_chunk.bin differ diff --git a/fixtures/protocol/chunks/out_of_order_start.bin b/fixtures/protocol/chunks/out_of_order_start.bin new file mode 100644 index 0000000..f46a403 Binary files /dev/null and b/fixtures/protocol/chunks/out_of_order_start.bin differ diff --git a/fixtures/protocol/chunks/wrong_total_len.bin b/fixtures/protocol/chunks/wrong_total_len.bin new file mode 100644 index 0000000..b9d3d68 Binary files /dev/null and b/fixtures/protocol/chunks/wrong_total_len.bin differ diff --git a/fixtures/protocol/frames/batchup_f19c_payload_0102a5.bin b/fixtures/protocol/frames/batchup_f19c_payload_0102a5.bin new file mode 100644 index 0000000..06f4150 Binary files /dev/null and b/fixtures/protocol/frames/batchup_f19c_payload_0102a5.bin differ diff --git a/fixtures/protocol/frames/batchup_f19c_payload_0102a5_bad_crc.bin b/fixtures/protocol/frames/batchup_f19c_payload_0102a5_bad_crc.bin new file mode 100644 index 0000000..8fcf703 Binary files /dev/null and b/fixtures/protocol/frames/batchup_f19c_payload_0102a5_bad_crc.bin differ diff --git a/fixtures/protocol/payload_v3/full_30.bin b/fixtures/protocol/payload_v3/full_30.bin new file mode 100644 index 0000000..de196c8 Binary files /dev/null and b/fixtures/protocol/payload_v3/full_30.bin differ diff --git a/fixtures/protocol/payload_v3/full_30_upstream_raw.bin b/fixtures/protocol/payload_v3/full_30_upstream_raw.bin new file mode 100644 index 0000000..698c16c Binary files /dev/null and b/fixtures/protocol/payload_v3/full_30_upstream_raw.bin differ diff --git a/fixtures/protocol/payload_v3/sparse_5.bin b/fixtures/protocol/payload_v3/sparse_5.bin new file mode 100644 index 0000000..60abe44 Binary files /dev/null and b/fixtures/protocol/payload_v3/sparse_5.bin differ diff --git a/fixtures/protocol/payload_v3/sync_empty.bin b/fixtures/protocol/payload_v3/sync_empty.bin new file mode 100644 index 0000000..92a77a0 Binary files /dev/null and b/fixtures/protocol/payload_v3/sync_empty.bin differ diff --git a/fuzz/Cargo.toml b/fuzz/Cargo.toml new file mode 100644 index 0000000..c8846b8 --- /dev/null +++ b/fuzz/Cargo.toml @@ -0,0 +1,48 @@ +[package] +name = "dd3-fuzz" +version = "0.0.0" +publish = false +edition = "2021" + +[package.metadata] +cargo-fuzz = true + +[dependencies] +libfuzzer-sys = "0.4" +dd3_protocol = { path = "../crates/dd3_protocol" } +dd3_contracts = { path = "../crates/dd3_contracts" } + +[[bin]] +name = "frame_decode" +path = "fuzz_targets/frame_decode.rs" +test = false +doc = false +bench = false + +[[bin]] +name = "chunk_stream_ingest" +path = "fuzz_targets/chunk_stream_ingest.rs" +test = false +doc = false +bench = false + +[[bin]] +name = "payload_decode_v3" +path = "fuzz_targets/payload_decode_v3.rs" +test = false +doc = false +bench = false + +[[bin]] +name = "sanitize_device_id" +path = "fuzz_targets/sanitize_device_id.rs" +test = false +doc = false +bench = false + +[[bin]] +name = "url_encode_component" +path = "fuzz_targets/url_encode_component.rs" +test = false +doc = false +bench = false \ No newline at end of file diff --git a/fuzz/fuzz_targets/chunk_stream_ingest.rs b/fuzz/fuzz_targets/chunk_stream_ingest.rs new file mode 100644 index 0000000..9ec65f7 --- /dev/null +++ b/fuzz/fuzz_targets/chunk_stream_ingest.rs @@ -0,0 +1,35 @@ +#![no_main] +use libfuzzer_sys::fuzz_target; + +fuzz_target!(|data: &[u8]| { + let mut state = dd3_protocol::ReassemblyState::default(); + let mut buffer = [0u8; 4096]; + + let mut pos = 0usize; + while pos + 7 <= data.len() { + let batch_id = u16::from_le_bytes([data[pos], data[pos + 1]]); + let idx = data[pos + 2]; + let cnt = data[pos + 3]; + let total_len = u16::from_le_bytes([data[pos + 4], data[pos + 5]]); + let chunk_len = data[pos + 6] as usize; + pos += 7; + if pos + chunk_len > data.len() { + break; + } + let chunk = &data[pos..pos + chunk_len]; + pos += chunk_len; + + let _ = dd3_protocol::push_chunk( + &mut state, + batch_id, + idx, + cnt, + total_len, + chunk, + 0, + 5000, + 4096, + &mut buffer, + ); + } +}); \ No newline at end of file diff --git a/fuzz/fuzz_targets/frame_decode.rs b/fuzz/fuzz_targets/frame_decode.rs new file mode 100644 index 0000000..bc58bbf --- /dev/null +++ b/fuzz/fuzz_targets/frame_decode.rs @@ -0,0 +1,6 @@ +#![no_main] +use libfuzzer_sys::fuzz_target; + +fuzz_target!(|data: &[u8]| { + let _ = dd3_protocol::decode_frame(data, dd3_protocol::MsgKind::AckDown as u8); +}); \ No newline at end of file diff --git a/fuzz/fuzz_targets/payload_decode_v3.rs b/fuzz/fuzz_targets/payload_decode_v3.rs new file mode 100644 index 0000000..687efb5 --- /dev/null +++ b/fuzz/fuzz_targets/payload_decode_v3.rs @@ -0,0 +1,6 @@ +#![no_main] +use libfuzzer_sys::fuzz_target; + +fuzz_target!(|data: &[u8]| { + let _ = dd3_protocol::decode_batch_v3(data); +}); \ No newline at end of file diff --git a/fuzz/fuzz_targets/sanitize_device_id.rs b/fuzz/fuzz_targets/sanitize_device_id.rs new file mode 100644 index 0000000..143c91e --- /dev/null +++ b/fuzz/fuzz_targets/sanitize_device_id.rs @@ -0,0 +1,8 @@ +#![no_main] +use libfuzzer_sys::fuzz_target; + +fuzz_target!(|data: &[u8]| { + if let Ok(s) = core::str::from_utf8(data) { + let _ = dd3_contracts::sanitize_device_id(s); + } +}); \ No newline at end of file diff --git a/fuzz/fuzz_targets/url_encode_component.rs b/fuzz/fuzz_targets/url_encode_component.rs new file mode 100644 index 0000000..35fbdea --- /dev/null +++ b/fuzz/fuzz_targets/url_encode_component.rs @@ -0,0 +1,8 @@ +#![no_main] +use libfuzzer_sys::fuzz_target; + +fuzz_target!(|data: &[u8]| { + if let Ok(s) = core::str::from_utf8(data) { + let _ = dd3_contracts::url_encode_component(s); + } +}); \ No newline at end of file diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 0000000..02d090f --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,3 @@ +[toolchain] +channel = "stable" +components = ["rustfmt", "clippy"] \ No newline at end of file diff --git a/vendor/dd3-cpp b/vendor/dd3-cpp new file mode 160000 index 0000000..a3c61f9 --- /dev/null +++ b/vendor/dd3-cpp @@ -0,0 +1 @@ +Subproject commit a3c61f9b929fbc55bfb502b443fba2f98023b3f1