From 8fba67fcf3fab3bb39df9697f82bcd35ab6afea4 Mon Sep 17 00:00:00 2001 From: acidburns Date: Sat, 31 Jan 2026 01:53:02 +0100 Subject: [PATCH] Update batch schema and add ACK handling --- README.md | 52 +++++++++-- include/config.h | 5 +- include/data_model.h | 1 - include/json_codec.h | 3 +- src/display_ui.cpp | 12 +-- src/json_codec.cpp | 214 +++++++++++++++++++++++++++---------------- src/main.cpp | 151 ++++++++++++++++++++++-------- src/meter_driver.cpp | 185 +------------------------------------ src/mqtt_client.cpp | 3 - 9 files changed, 305 insertions(+), 321 deletions(-) diff --git a/README.md b/README.md index 3a72b2c..300b8c8 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,6 @@ Variants: - Energy total: 1-0:1.8.0*255 - Total power: 1-0:16.7.0*255 - Phase power: 36.7 / 56.7 / 76.7 - - Phase voltage: 32.7 / 52.7 / 72.7 - Reads battery voltage and estimates SoC. - Builds JSON payload, compresses, wraps in LoRa packet, transmits. - Light sleeps between meter reads; batches are sent every 30s. @@ -56,11 +55,11 @@ Variants: **Sender flow (pseudo-code)**: ```cpp void sender_loop() { - meter_read_every_second(); // SML/OBIS -> MeterData samples + meter_read_every_second(); // OBIS -> MeterData samples read_battery(data); // VBAT + SoC if (time_to_send_batch()) { - json = meterBatchToJson(samples); + json = meterBatchToJson(samples, batch_id); compressed = compressBuffer(json); lora_send(packet(MeterBatch, compressed)); } @@ -77,7 +76,7 @@ void sender_loop() { **Key sender functions**: ```cpp -bool meter_read(MeterData &data); // parse SML frame, set OBIS fields +bool meter_read(MeterData &data); // parse OBIS fields void read_battery(MeterData &data); // ADC -> volts + percent bool meterDataToJson(const MeterData&, String&); bool compressBuffer(const uint8_t*, size_t, uint8_t*, size_t, size_t&); @@ -89,6 +88,7 @@ bool lora_send(const LoraPacket &pkt); // add header + CRC16 and transmit - NTP sync (UTC) and local display in Europe/Berlin. - Receives LoRa packets, verifies CRC16, decompresses, parses JSON. - Publishes meter JSON to MQTT. +- Sends ACKs for MeterBatch packets and de-duplicates by batch_id. - Web UI: - AP mode: status + WiFi/MQTT config. - STA mode: status + per-sender pages. @@ -169,7 +169,7 @@ Packet layout: [0] protocol_version (1) [1] role (0=sender, 1=receiver) [2..3] device_id_short (uint16) -[4] payload_type (0=meter, 1=test, 2=time_sync, 3=meter_batch) +[4] payload_type (0=meter, 1=test, 2=time_sync, 3=meter_batch, 4=ack) [5..N-3] compressed payload [N-2..N-1] CRC16 (bytes 0..N-3) ``` @@ -190,14 +190,38 @@ JSON payload (sender + MQTT): "p1_w": 500.00, "p2_w": 450.00, "p3_w": 0.00, - "v1_v": 230.10, - "v2_v": 229.80, - "v3_v": 231.00, "bat_v": 3.92, "bat_pct": 78 } ``` +MeterBatch JSON (compressed over LoRa) uses per-field arrays with integer units for easier ingestion: + +```json +{ + "schema": 1, + "sender": "s01", + "batch_id": 1842, + "t0": 1738288000, + "dt_s": 1, + "n": 3, + "energy_wh": [123456700, 123456701, 123456701], + "p_w": [930, 940, 950], + "p1_w": [480, 490, 500], + "p2_w": [450, 450, 450], + "p3_w": [0, 0, 0], + "meta": { + "rssi": -92, + "snr": 7.5, + "rx_ts": 1738288031 + } +} +``` + +Notes: +- `sender` maps to `EXPECTED_SENDER_IDS` order (`s01` = first sender). +- `meta` is injected by the receiver after batch reassembly. + ## Device IDs - Derived from WiFi STA MAC. - `short_id = (MAC[4] << 8) | MAC[5]` @@ -253,12 +277,20 @@ inline constexpr uint16_t EXPECTED_SENDER_IDS[NUM_SENDERS] = { 0xF19C }; - `lilygo-t3-v1-6-1-868`: production build for 868 MHz modules - `lilygo-t3-v1-6-1-868-test`: test build for 868 MHz modules +## Config Knobs +Key timing settings in `include/config.h`: +- `METER_SAMPLE_INTERVAL_MS` +- `METER_SEND_INTERVAL_MS` +- `BATCH_ACK_TIMEOUT_MS` +- `BATCH_MAX_RETRIES` + ## Limits & Known Constraints - **Compression**: uses lightweight RLE (good for JSON but not optimal). -- **OBIS parsing**: supports IEC 62056-21 ASCII (Mode D) and SML; may need tuning for some meters. +- **OBIS parsing**: supports IEC 62056-21 ASCII (Mode D); may need tuning for some meters. - **Payload size**: single JSON frames < 256 bytes (ArduinoJson static doc); batch frames are chunked and reassembled. - **Battery ADC**: uses simple linear calibration constant in `power_manager.cpp`. - **OLED**: no hardware reset line is used (matches working reference). +- **Batch ACKs**: sender waits for ACK after a batch and retries up to `BATCH_MAX_RETRIES` with `BATCH_ACK_TIMEOUT_MS` between attempts. ## Files & Modules - `include/config.h`, `src/config.cpp`: pins, radio settings, sender IDs @@ -266,7 +298,7 @@ inline constexpr uint16_t EXPECTED_SENDER_IDS[NUM_SENDERS] = { 0xF19C }; - `include/json_codec.h`, `src/json_codec.cpp`: JSON encode/decode - `include/compressor.h`, `src/compressor.cpp`: RLE compression - `include/lora_transport.h`, `src/lora_transport.cpp`: LoRa packet + CRC -- `include/meter_driver.h`, `src/meter_driver.cpp`: IEC 62056-21 ASCII + SML parse +- `include/meter_driver.h`, `src/meter_driver.cpp`: IEC 62056-21 ASCII parse - `include/power_manager.h`, `src/power_manager.cpp`: ADC + sleep - `include/time_manager.h`, `src/time_manager.cpp`: NTP + time sync - `include/wifi_manager.h`, `src/wifi_manager.cpp`: NVS config + WiFi diff --git a/include/config.h b/include/config.h index 6018d03..0a25faa 100644 --- a/include/config.h +++ b/include/config.h @@ -11,7 +11,8 @@ enum class PayloadType : uint8_t { MeterData = 0, TestCode = 1, TimeSync = 2, - MeterBatch = 3 + MeterBatch = 3, + Ack = 4 }; constexpr uint8_t PROTOCOL_VERSION = 1; @@ -59,6 +60,8 @@ constexpr uint32_t OLED_AUTO_OFF_MS = 10UL * 60UL * 1000UL; constexpr uint32_t SENDER_OLED_READ_MS = 10000; constexpr uint32_t METER_SAMPLE_INTERVAL_MS = 1000; constexpr uint32_t METER_SEND_INTERVAL_MS = 30000; +constexpr uint32_t BATCH_ACK_TIMEOUT_MS = 3000; +constexpr uint8_t BATCH_MAX_RETRIES = 2; constexpr uint8_t METER_BATCH_MAX_SAMPLES = 30; constexpr uint32_t WATCHDOG_TIMEOUT_SEC = 120; constexpr bool ENABLE_HA_DISCOVERY = true; diff --git a/include/data_model.h b/include/data_model.h index 027eb44..94ed471 100644 --- a/include/data_model.h +++ b/include/data_model.h @@ -22,7 +22,6 @@ struct MeterData { float energy_total_kwh; float phase_power_w[3]; float total_power_w; - float phase_voltage_v[3]; float battery_voltage_v; uint8_t battery_percent; bool valid; diff --git a/include/json_codec.h b/include/json_codec.h index 48397be..86a0fd7 100644 --- a/include/json_codec.h +++ b/include/json_codec.h @@ -5,5 +5,6 @@ bool meterDataToJson(const MeterData &data, String &out_json); bool jsonToMeterData(const String &json, MeterData &data); -bool meterBatchToJson(const MeterData *samples, size_t count, String &out_json, const FaultCounters *faults = nullptr, FaultType last_error = FaultType::None); +bool meterBatchToJson(const MeterData *samples, size_t count, uint16_t batch_id, String &out_json, + const FaultCounters *faults = nullptr, FaultType last_error = FaultType::None); bool jsonToMeterBatch(const String &json, MeterData *out_samples, size_t max_count, size_t &out_count); diff --git a/src/display_ui.cpp b/src/display_ui.cpp index 7cde378..e0ba429 100644 --- a/src/display_ui.cpp +++ b/src/display_ui.cpp @@ -219,11 +219,11 @@ static void render_sender_measurement() { display.setCursor(0, 12); display.printf("P %.0fW", g_last_meter.total_power_w); display.setCursor(0, 24); - display.printf("L1 %.0fV %.0fW", g_last_meter.phase_voltage_v[0], g_last_meter.phase_power_w[0]); + display.printf("L1 %.0fW", g_last_meter.phase_power_w[0]); display.setCursor(0, 36); - display.printf("L2 %.0fV %.0fW", g_last_meter.phase_voltage_v[1], g_last_meter.phase_power_w[1]); + display.printf("L2 %.0fW", g_last_meter.phase_power_w[1]); display.setCursor(0, 48); - display.printf("L3 %.0fV %.0fW", g_last_meter.phase_voltage_v[2], g_last_meter.phase_power_w[2]); + display.printf("L3 %.0fW", g_last_meter.phase_power_w[2]); display.display(); } @@ -318,14 +318,14 @@ static void render_receiver_sender(uint8_t index) { display.setCursor(0, 24); display.printf("P %.0fW", status.last_data.total_power_w); display.setCursor(0, 36); - display.printf("L1 %.0fV %.0fW", status.last_data.phase_voltage_v[0], status.last_data.phase_power_w[0]); + display.printf("L1 %.0fW", status.last_data.phase_power_w[0]); display.setCursor(0, 48); - display.printf("L2 %.0fV %.0fW", status.last_data.phase_voltage_v[1], status.last_data.phase_power_w[1]); + display.printf("L2 %.0fW", status.last_data.phase_power_w[1]); display.setCursor(0, 56); if (status.last_data.link_valid) { display.printf("R:%d S:%.1f", status.last_data.link_rssi_dbm, status.last_data.link_snr_db); } else { - display.printf("L3 %.0fV %.0fW", status.last_data.phase_voltage_v[2], status.last_data.phase_power_w[2]); + display.printf("L3 %.0fW", status.last_data.phase_power_w[2]); } display.display(); } diff --git a/src/json_codec.cpp b/src/json_codec.cpp index cc4371e..f57694e 100644 --- a/src/json_codec.cpp +++ b/src/json_codec.cpp @@ -1,6 +1,8 @@ #include "json_codec.h" #include +#include #include +#include "config.h" #include "power_manager.h" static float round2(float value) { @@ -10,6 +12,34 @@ static float round2(float value) { return roundf(value * 100.0f) / 100.0f; } +static uint32_t kwh_to_wh(float value) { + if (isnan(value)) { + return 0; + } + double wh = static_cast(value) * 1000.0; + if (wh < 0.0) { + wh = 0.0; + } + if (wh > static_cast(UINT32_MAX)) { + wh = static_cast(UINT32_MAX); + } + return static_cast(llround(wh)); +} + +static int32_t round_to_i32(float value) { + if (isnan(value)) { + return 0; + } + long rounded = lroundf(value); + if (rounded > INT32_MAX) { + return INT32_MAX; + } + if (rounded < INT32_MIN) { + return INT32_MIN; + } + return static_cast(rounded); +} + static const char *short_id_from_device_id(const char *device_id) { if (!device_id) { return ""; @@ -21,6 +51,31 @@ static const char *short_id_from_device_id(const char *device_id) { return device_id; } +static void sender_label_from_short_id(uint16_t short_id, char *out, size_t out_len) { + if (!out || out_len == 0) { + return; + } + for (uint8_t i = 0; i < NUM_SENDERS; ++i) { + if (EXPECTED_SENDER_IDS[i] == short_id) { + snprintf(out, out_len, "s%02u", static_cast(i + 1)); + return; + } + } + snprintf(out, out_len, "s00"); +} + +static uint16_t short_id_from_sender_label(const char *sender_label) { + if (!sender_label || strlen(sender_label) < 2 || sender_label[0] != 's') { + return 0; + } + char *end = nullptr; + long idx = strtol(sender_label + 1, &end, 10); + if (end == sender_label + 1 || idx <= 0 || idx > NUM_SENDERS) { + return 0; + } + return EXPECTED_SENDER_IDS[idx - 1]; +} + static void format_float_2(char *buf, size_t buf_len, float value) { if (!buf || buf_len == 0) { return; @@ -47,12 +102,6 @@ bool meterDataToJson(const MeterData &data, String &out_json) { doc["p2_w"] = serialized(buf); format_float_2(buf, sizeof(buf), data.phase_power_w[2]); doc["p3_w"] = serialized(buf); - format_float_2(buf, sizeof(buf), data.phase_voltage_v[0]); - doc["v1_v"] = serialized(buf); - format_float_2(buf, sizeof(buf), data.phase_voltage_v[1]); - doc["v2_v"] = serialized(buf); - format_float_2(buf, sizeof(buf), data.phase_voltage_v[2]); - doc["v3_v"] = serialized(buf); format_float_2(buf, sizeof(buf), data.battery_voltage_v); doc["bat_v"] = serialized(buf); doc["bat_pct"] = data.battery_percent; @@ -78,13 +127,6 @@ bool meterDataToJson(const MeterData &data, String &out_json) { return len > 0 && len < 256; } -static float read_float_or_legacy(JsonDocument &doc, const char *key, const char *legacy_key) { - if (doc[key].isNull()) { - return doc[legacy_key] | NAN; - } - return doc[key] | NAN; -} - bool jsonToMeterData(const String &json, MeterData &data) { StaticJsonDocument<256> doc; DeserializationError err = deserializeJson(doc, json); @@ -101,14 +143,11 @@ bool jsonToMeterData(const String &json, MeterData &data) { data.device_id[sizeof(data.device_id) - 1] = '\0'; data.ts_utc = doc["ts"] | 0; - data.energy_total_kwh = read_float_or_legacy(doc, "e_kwh", "energy_kwh"); - data.total_power_w = read_float_or_legacy(doc, "p_w", "p_total_w"); + data.energy_total_kwh = doc["e_kwh"] | NAN; + data.total_power_w = doc["p_w"] | NAN; data.phase_power_w[0] = doc["p1_w"] | NAN; data.phase_power_w[1] = doc["p2_w"] | NAN; data.phase_power_w[2] = doc["p3_w"] | NAN; - data.phase_voltage_v[0] = doc["v1_v"] | NAN; - data.phase_voltage_v[1] = doc["v2_v"] | NAN; - data.phase_voltage_v[2] = doc["v3_v"] | NAN; data.battery_voltage_v = doc["bat_v"] | NAN; if (doc["bat_pct"].isNull() && !isnan(data.battery_voltage_v)) { data.battery_percent = battery_percent_from_voltage(data.battery_voltage_v); @@ -132,15 +171,21 @@ bool jsonToMeterData(const String &json, MeterData &data) { return true; } -bool meterBatchToJson(const MeterData *samples, size_t count, String &out_json, const FaultCounters *faults, FaultType last_error) { +bool meterBatchToJson(const MeterData *samples, size_t count, uint16_t batch_id, String &out_json, const FaultCounters *faults, FaultType last_error) { if (!samples || count == 0) { return false; } DynamicJsonDocument doc(8192); - doc["id"] = short_id_from_device_id(samples[count - 1].device_id); - doc["bat_v"] = round2(samples[count - 1].battery_voltage_v); - doc["bat_pct"] = samples[count - 1].battery_percent; + doc["schema"] = 1; + char sender_label[8] = {}; + sender_label_from_short_id(samples[count - 1].short_id, sender_label, sizeof(sender_label)); + doc["sender"] = sender_label; + doc["batch_id"] = batch_id; + doc["t0"] = samples[0].ts_utc; + uint32_t dt_s = METER_SAMPLE_INTERVAL_MS / 1000; + doc["dt_s"] = dt_s > 0 ? dt_s : 1; + doc["n"] = static_cast(count); if (faults) { if (faults->meter_read_fail > 0) { doc["err_m"] = faults->meter_read_fail; @@ -152,19 +197,18 @@ bool meterBatchToJson(const MeterData *samples, size_t count, String &out_json, if (last_error != FaultType::None) { doc["err_last"] = static_cast(last_error); } - JsonArray arr = doc.createNestedArray("s"); + + JsonArray energy = doc.createNestedArray("energy_wh"); + JsonArray p_w = doc.createNestedArray("p_w"); + JsonArray p1_w = doc.createNestedArray("p1_w"); + JsonArray p2_w = doc.createNestedArray("p2_w"); + JsonArray p3_w = doc.createNestedArray("p3_w"); for (size_t i = 0; i < count; ++i) { - JsonArray row = arr.createNestedArray(); - row.add(samples[i].ts_utc); - row.add(round2(samples[i].energy_total_kwh)); - row.add(round2(samples[i].total_power_w)); - row.add(round2(samples[i].phase_power_w[0])); - row.add(round2(samples[i].phase_power_w[1])); - row.add(round2(samples[i].phase_power_w[2])); - row.add(round2(samples[i].phase_voltage_v[0])); - row.add(round2(samples[i].phase_voltage_v[1])); - row.add(round2(samples[i].phase_voltage_v[2])); - row.add(samples[i].valid ? 1 : 0); + energy.add(kwh_to_wh(samples[i].energy_total_kwh)); + p_w.add(round_to_i32(samples[i].total_power_w)); + p1_w.add(round_to_i32(samples[i].phase_power_w[0])); + p2_w.add(round_to_i32(samples[i].phase_power_w[1])); + p3_w.add(round_to_i32(samples[i].phase_power_w[2])); } out_json = ""; @@ -184,62 +228,72 @@ bool jsonToMeterBatch(const String &json, MeterData *out_samples, size_t max_cou return false; } - JsonArray arr = doc["s"].as(); - if (arr.isNull()) { - return false; - } - const char *id = doc["id"] | ""; - float bat_v = doc["bat_v"] | NAN; - uint8_t bat_pct = doc["bat_pct"] | 0; + const char *sender = doc["sender"] | ""; uint32_t err_m = doc["err_m"] | 0; uint32_t err_tx = doc["err_tx"] | 0; FaultType last_error = static_cast(doc["err_last"] | 0); - size_t idx = 0; - for (JsonArray row : arr) { - if (idx >= max_count) { - break; + if (!doc["schema"].isNull()) { + if ((doc["schema"] | 0) != 1) { + return false; } - MeterData &data = out_samples[idx]; - data = {}; - if (strlen(id) == 4) { - snprintf(data.device_id, sizeof(data.device_id), "dd3-%s", id); - } else { - strncpy(data.device_id, id, sizeof(data.device_id)); + size_t count = doc["n"] | 0; + if (count == 0) { + return false; } - data.device_id[sizeof(data.device_id) - 1] = '\0'; - data.ts_utc = row[0] | 0; - data.energy_total_kwh = row[1] | NAN; - data.total_power_w = row[2] | NAN; - data.phase_power_w[0] = row[3] | NAN; - data.phase_power_w[1] = row[4] | NAN; - data.phase_power_w[2] = row[5] | NAN; - data.phase_voltage_v[0] = row[6] | NAN; - data.phase_voltage_v[1] = row[7] | NAN; - data.phase_voltage_v[2] = row[8] | NAN; - data.valid = (row[9] | 1) != 0; - data.battery_voltage_v = bat_v; - if (doc["bat_pct"].isNull() && !isnan(bat_v)) { - data.battery_percent = battery_percent_from_voltage(bat_v); - } else { - data.battery_percent = bat_pct; + if (count > max_count) { + count = max_count; } - data.link_valid = false; - data.link_rssi_dbm = 0; - data.link_snr_db = NAN; - data.err_meter_read = err_m; - data.err_decode = 0; - data.err_lora_tx = err_tx; - data.last_error = last_error; - if (strlen(data.device_id) >= 8) { - const char *suffix = data.device_id + strlen(data.device_id) - 4; - data.short_id = static_cast(strtoul(suffix, nullptr, 16)); + uint32_t t0 = doc["t0"] | 0; + uint32_t dt_s = doc["dt_s"] | 1; + JsonArray energy = doc["energy_wh"].as(); + JsonArray p_w = doc["p_w"].as(); + JsonArray p1_w = doc["p1_w"].as(); + JsonArray p2_w = doc["p2_w"].as(); + JsonArray p3_w = doc["p3_w"].as(); + + for (size_t idx = 0; idx < count; ++idx) { + MeterData &data = out_samples[idx]; + data = {}; + uint16_t short_id = short_id_from_sender_label(sender); + if (short_id != 0) { + snprintf(data.device_id, sizeof(data.device_id), "dd3-%04X", short_id); + data.short_id = short_id; + } else if (id[0] != '\0') { + strncpy(data.device_id, id, sizeof(data.device_id)); + data.device_id[sizeof(data.device_id) - 1] = '\0'; + } else { + snprintf(data.device_id, sizeof(data.device_id), "dd3-0000"); + } + + data.ts_utc = t0 + static_cast(idx) * dt_s; + data.energy_total_kwh = static_cast((energy[idx] | 0)) / 1000.0f; + data.total_power_w = static_cast(p_w[idx] | 0); + data.phase_power_w[0] = static_cast(p1_w[idx] | 0); + data.phase_power_w[1] = static_cast(p2_w[idx] | 0); + data.phase_power_w[2] = static_cast(p3_w[idx] | 0); + data.battery_voltage_v = NAN; + data.battery_percent = 0; + data.valid = true; + data.link_valid = false; + data.link_rssi_dbm = 0; + data.link_snr_db = NAN; + data.err_meter_read = err_m; + data.err_decode = 0; + data.err_lora_tx = err_tx; + data.last_error = last_error; + + if (data.short_id == 0 && strlen(data.device_id) >= 8) { + const char *suffix = data.device_id + strlen(data.device_id) - 4; + data.short_id = static_cast(strtoul(suffix, nullptr, 16)); + } } - idx++; + + out_count = count; + return count > 0; } - out_count = idx; - return idx > 0; + return false; } diff --git a/src/main.cpp b/src/main.cpp index c3b87f3..4d86cf7 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -12,6 +12,7 @@ #include "web_server.h" #include "display_ui.h" #include "test_mode.h" +#include #ifdef ARDUINO_ARCH_ESP32 #include #endif @@ -56,7 +57,13 @@ static uint8_t g_meter_sample_count = 0; static uint8_t g_meter_sample_head = 0; static uint32_t g_last_sample_ms = 0; static uint32_t g_last_send_ms = 0; +static uint32_t g_last_batch_send_ms = 0; static uint16_t g_batch_id = 1; +static uint16_t g_last_sent_batch_id = 0; +static uint8_t g_batch_retry_count = 0; +static bool g_batch_ack_pending = false; + +static uint16_t g_last_batch_id_rx[NUM_SENDERS] = {}; struct BatchRxState { bool active; @@ -159,7 +166,11 @@ static void publish_faults_if_needed(const char *device_id, const FaultCounters #ifdef ARDUINO_ARCH_ESP32 static void watchdog_init() { - esp_task_wdt_init(WATCHDOG_TIMEOUT_SEC, true); + esp_task_wdt_config_t config = {}; + config.timeout_ms = WATCHDOG_TIMEOUT_SEC * 1000; + config.idle_core_mask = 0; + config.trigger_panic = true; + esp_task_wdt_init(&config); esp_task_wdt_add(nullptr); } @@ -180,6 +191,22 @@ static uint16_t read_u16_le(const uint8_t *src) { return static_cast(src[0]) | (static_cast(src[1]) << 8); } +static bool inject_batch_meta(String &json, int16_t rssi_dbm, float snr_db, uint32_t rx_ts_utc) { + DynamicJsonDocument doc(8192); + DeserializationError err = deserializeJson(doc, json); + if (err) { + return false; + } + + JsonObject meta = doc.createNestedObject("meta"); + meta["rssi"] = rssi_dbm; + meta["snr"] = snr_db; + meta["rx_ts"] = rx_ts_utc; + + json = ""; + return serializeJson(doc, json) > 0; +} + static bool send_batch_payload(const uint8_t *data, size_t len, uint32_t ts_for_display) { if (!data || len == 0 || len > BATCH_MAX_COMPRESSED) { return false; @@ -227,6 +254,17 @@ static bool send_batch_payload(const uint8_t *data, size_t len, uint32_t ts_for_ return all_ok; } +static void send_batch_ack(uint16_t batch_id) { + LoraPacket ack = {}; + ack.protocol_version = PROTOCOL_VERSION; + ack.role = DeviceRole::Receiver; + ack.device_id_short = g_short_id; + ack.payload_type = PayloadType::Ack; + ack.payload_len = 2; + write_u16_le(ack.payload, batch_id); + lora_send(ack); +} + static bool send_meter_batch(uint32_t ts_for_display) { MeterData ordered[METER_BATCH_MAX_SAMPLES]; size_t count = copy_meter_samples(ordered, METER_BATCH_MAX_SAMPLES); @@ -235,7 +273,7 @@ static bool send_meter_batch(uint32_t ts_for_display) { } String json; - if (!meterBatchToJson(ordered, count, json, &g_sender_faults, g_sender_last_error)) { + if (!meterBatchToJson(ordered, count, g_batch_id, json, &g_sender_faults, g_sender_last_error)) { return false; } @@ -246,9 +284,10 @@ static bool send_meter_batch(uint32_t ts_for_display) { } bool ok = send_batch_payload(compressed, compressed_len, ts_for_display); + g_last_batch_send_ms = millis(); if (ok) { - g_meter_sample_count = 0; - g_meter_sample_head = 0; + g_last_sent_batch_id = static_cast(g_batch_id - 1); + g_batch_ack_pending = true; } return ok; } @@ -263,7 +302,7 @@ static void reset_batch_rx() { g_batch_rx.last_rx_ms = 0; } -static bool process_batch_packet(const LoraPacket &pkt, String &out_json, bool &decode_error) { +static bool process_batch_packet(const LoraPacket &pkt, String &out_json, bool &decode_error, uint16_t &out_batch_id) { decode_error = false; if (pkt.payload_len < BATCH_HEADER_SIZE) { return false; @@ -323,6 +362,7 @@ static bool process_batch_packet(const LoraPacket &pkt, String &out_json, bool & } decompressed[decompressed_len] = '\0'; out_json = String(reinterpret_cast(decompressed)); + out_batch_id = batch_id; reset_batch_rx(); return true; } @@ -407,14 +447,38 @@ static void sender_loop() { display_set_last_read(meter_ok, data.ts_utc); } - if (now_ms - g_last_send_ms >= METER_SEND_INTERVAL_MS) { + if (!g_batch_ack_pending && now_ms - g_last_send_ms >= METER_SEND_INTERVAL_MS) { g_last_send_ms = now_ms; send_meter_batch(last_sample_ts()); } LoraPacket rx = {}; - if (lora_receive(rx, 0) && rx.protocol_version == PROTOCOL_VERSION && rx.payload_type == PayloadType::TimeSync) { - time_handle_timesync_payload(rx.payload, rx.payload_len); + if (lora_receive(rx, 0) && rx.protocol_version == PROTOCOL_VERSION) { + if (rx.payload_type == PayloadType::TimeSync) { + time_handle_timesync_payload(rx.payload, rx.payload_len); + } else if (rx.payload_type == PayloadType::Ack && rx.payload_len >= 2) { + uint16_t ack_id = read_u16_le(rx.payload); + if (g_batch_ack_pending && ack_id == g_last_sent_batch_id) { + g_batch_ack_pending = false; + g_batch_retry_count = 0; + g_meter_sample_count = 0; + g_meter_sample_head = 0; + } + } + } + + if (g_batch_ack_pending && (now_ms - g_last_batch_send_ms >= BATCH_ACK_TIMEOUT_MS)) { + if (g_batch_retry_count < BATCH_MAX_RETRIES) { + g_batch_retry_count++; + send_meter_batch(last_sample_ts()); + } else { + g_batch_ack_pending = false; + g_batch_retry_count = 0; + g_meter_sample_count = 0; + g_meter_sample_head = 0; + note_fault(g_sender_faults, g_sender_last_error, g_sender_last_error_utc, g_sender_last_error_ms, FaultType::LoraTx); + display_set_last_error(g_sender_last_error, g_sender_last_error_utc, g_sender_last_error_ms); + } } display_tick(); @@ -482,37 +546,52 @@ static void receiver_loop() { } else if (pkt.payload_type == PayloadType::MeterBatch) { String json; bool decode_error = false; - if (process_batch_packet(pkt, json, decode_error)) { + uint16_t batch_id = 0; + if (process_batch_packet(pkt, json, decode_error, batch_id)) { + uint32_t rx_ts_utc = time_get_utc(); + if (rx_ts_utc == 0) { + rx_ts_utc = millis() / 1000; + } + inject_batch_meta(json, pkt.rssi_dbm, pkt.snr_db, rx_ts_utc); MeterData samples[METER_BATCH_MAX_SAMPLES]; size_t count = 0; - if (jsonToMeterBatch(json, samples, METER_BATCH_MAX_SAMPLES, count)) { - for (uint8_t i = 0; i < NUM_SENDERS; ++i) { - if (pkt.device_id_short == EXPECTED_SENDER_IDS[i]) { - for (size_t s = 0; s < count; ++s) { - samples[s].link_valid = true; - samples[s].link_rssi_dbm = pkt.rssi_dbm; - samples[s].link_snr_db = pkt.snr_db; - samples[s].short_id = pkt.device_id_short; - mqtt_publish_state(samples[s]); - } - if (count > 0) { - g_sender_statuses[i].last_data = samples[count - 1]; - g_sender_statuses[i].last_update_ts_utc = samples[count - 1].ts_utc; - g_sender_statuses[i].has_data = true; - g_sender_faults_remote[i].meter_read_fail = samples[count - 1].err_meter_read; - g_sender_faults_remote[i].lora_tx_fail = samples[count - 1].err_lora_tx; - g_sender_last_error_remote[i] = samples[count - 1].last_error; - g_sender_last_error_remote_utc[i] = time_get_utc(); - g_sender_last_error_remote_ms[i] = millis(); - if (ENABLE_HA_DISCOVERY && !g_sender_discovery_sent[i]) { - g_sender_discovery_sent[i] = mqtt_publish_discovery(samples[count - 1].device_id); - } - publish_faults_if_needed(samples[count - 1].device_id, g_sender_faults_remote[i], g_sender_faults_remote_published[i], - g_sender_last_error_remote[i], g_sender_last_error_remote_published[i], - g_sender_last_error_remote_utc[i], g_sender_last_error_remote_ms[i]); - } - break; + int8_t sender_idx = -1; + for (uint8_t i = 0; i < NUM_SENDERS; ++i) { + if (pkt.device_id_short == EXPECTED_SENDER_IDS[i]) { + sender_idx = static_cast(i); + break; + } + } + bool duplicate = sender_idx >= 0 && g_last_batch_id_rx[sender_idx] == batch_id; + if (duplicate) { + send_batch_ack(batch_id); + } else if (jsonToMeterBatch(json, samples, METER_BATCH_MAX_SAMPLES, count)) { + if (sender_idx >= 0) { + for (size_t s = 0; s < count; ++s) { + samples[s].link_valid = true; + samples[s].link_rssi_dbm = pkt.rssi_dbm; + samples[s].link_snr_db = pkt.snr_db; + samples[s].short_id = pkt.device_id_short; + mqtt_publish_state(samples[s]); } + if (count > 0) { + g_sender_statuses[sender_idx].last_data = samples[count - 1]; + g_sender_statuses[sender_idx].last_update_ts_utc = samples[count - 1].ts_utc; + g_sender_statuses[sender_idx].has_data = true; + g_sender_faults_remote[sender_idx].meter_read_fail = samples[count - 1].err_meter_read; + g_sender_faults_remote[sender_idx].lora_tx_fail = samples[count - 1].err_lora_tx; + g_sender_last_error_remote[sender_idx] = samples[count - 1].last_error; + g_sender_last_error_remote_utc[sender_idx] = time_get_utc(); + g_sender_last_error_remote_ms[sender_idx] = millis(); + if (ENABLE_HA_DISCOVERY && !g_sender_discovery_sent[sender_idx]) { + g_sender_discovery_sent[sender_idx] = mqtt_publish_discovery(samples[count - 1].device_id); + } + publish_faults_if_needed(samples[count - 1].device_id, g_sender_faults_remote[sender_idx], g_sender_faults_remote_published[sender_idx], + g_sender_last_error_remote[sender_idx], g_sender_last_error_remote_published[sender_idx], + g_sender_last_error_remote_utc[sender_idx], g_sender_last_error_remote_ms[sender_idx]); + } + g_last_batch_id_rx[sender_idx] = batch_id; + send_batch_ack(batch_id); } } else { note_fault(g_receiver_faults, g_receiver_last_error, g_receiver_last_error_utc, g_receiver_last_error_ms, FaultType::Decode); diff --git a/src/meter_driver.cpp b/src/meter_driver.cpp index 1816501..507f3bc 100644 --- a/src/meter_driver.cpp +++ b/src/meter_driver.cpp @@ -5,167 +5,11 @@ #include static constexpr uint32_t METER_READ_TIMEOUT_MS = 2000; -static constexpr size_t SML_BUFFER_SIZE = 2048; - -static const uint8_t OBIS_ENERGY_TOTAL[6] = {0x01, 0x00, 0x01, 0x08, 0x00, 0xFF}; -static const uint8_t OBIS_TOTAL_POWER[6] = {0x01, 0x00, 0x10, 0x07, 0x00, 0xFF}; -static const uint8_t OBIS_P1[6] = {0x01, 0x00, 0x24, 0x07, 0x00, 0xFF}; -static const uint8_t OBIS_P2[6] = {0x01, 0x00, 0x38, 0x07, 0x00, 0xFF}; -static const uint8_t OBIS_P3[6] = {0x01, 0x00, 0x4C, 0x07, 0x00, 0xFF}; -static const uint8_t OBIS_V1[6] = {0x01, 0x00, 0x20, 0x07, 0x00, 0xFF}; -static const uint8_t OBIS_V2[6] = {0x01, 0x00, 0x34, 0x07, 0x00, 0xFF}; -static const uint8_t OBIS_V3[6] = {0x01, 0x00, 0x48, 0x07, 0x00, 0xFF}; - -static bool find_obis_value(const uint8_t *buf, size_t len, const uint8_t *obis, float &out_value) { - for (size_t i = 0; i + 6 < len; ++i) { - if (memcmp(&buf[i], obis, 6) == 0) { - int8_t scaler = 0; - bool scaler_found = false; - bool value_found = false; - int64_t value = 0; - size_t cursor = i + 6; - size_t limit = (i + 6 + 120 < len) ? i + 6 + 120 : len; - - while (cursor < limit) { - uint8_t tl = buf[cursor++]; - if (tl == 0x00) { - continue; - } - uint8_t type = (tl >> 4) & 0x0F; - uint8_t tlen = tl & 0x0F; - if (tlen == 0 || cursor + tlen > len) { - continue; - } - - if (type == 0x05 || type == 0x06) { - int64_t val = 0; - for (uint8_t b = 0; b < tlen; ++b) { - val = (val << 8) | buf[cursor + b]; - } - if (type == 0x05) { - int64_t sign_bit = 1LL << ((tlen * 8) - 1); - if (val & sign_bit) { - int64_t mask = (1LL << (tlen * 8)) - 1; - val = -((~val + 1) & mask); - } - } - - if (!scaler_found && tlen <= 2 && val >= -6 && val <= 6) { - scaler = static_cast(val); - scaler_found = true; - } else if (!value_found) { - value = val; - value_found = true; - } - } - cursor += tlen; - if (value_found && scaler_found) { - break; - } - } - - if (value_found) { - out_value = static_cast(value) * powf(10.0f, scaler); - return true; - } - } - } - return false; -} void meter_init() { Serial2.begin(9600, SERIAL_7E1, PIN_METER_RX, -1); } -static bool meter_read_sml(MeterData &data) { - uint8_t buffer[SML_BUFFER_SIZE]; - size_t len = 0; - bool started = false; - uint32_t start = millis(); - const uint8_t start_seq[] = {0x1B, 0x1B, 0x1B, 0x1B, 0x01, 0x01, 0x01, 0x01}; - const uint8_t end_seq[] = {0x1B, 0x1B, 0x1B, 0x1B, 0x1A}; - - while (millis() - start < METER_READ_TIMEOUT_MS) { - while (Serial2.available()) { - uint8_t b = Serial2.read(); - if (!started) { - buffer[len++] = b; - if (len >= sizeof(start_seq)) { - if (memcmp(&buffer[len - sizeof(start_seq)], start_seq, sizeof(start_seq)) == 0) { - started = true; - len = 0; - } - } - if (len >= sizeof(buffer)) { - len = 0; - } - } else { - if (len < sizeof(buffer)) { - buffer[len++] = b; - if (len >= sizeof(end_seq)) { - if (memcmp(&buffer[len - sizeof(end_seq)], end_seq, sizeof(end_seq)) == 0) { - start = millis(); - goto parse_frame; - } - } - } - } - } - delay(5); - } - -parse_frame: - if (!started || len == 0) { - return false; - } - - data.energy_total_kwh = NAN; - data.total_power_w = NAN; - data.phase_power_w[0] = NAN; - data.phase_power_w[1] = NAN; - data.phase_power_w[2] = NAN; - data.phase_voltage_v[0] = NAN; - data.phase_voltage_v[1] = NAN; - data.phase_voltage_v[2] = NAN; - - bool ok = true; - float value = 0.0f; - - if (find_obis_value(buffer, len, OBIS_ENERGY_TOTAL, value)) { - data.energy_total_kwh = value; - } else { - ok = false; - } - - if (find_obis_value(buffer, len, OBIS_TOTAL_POWER, value)) { - data.total_power_w = value; - } else { - ok = false; - } - - if (find_obis_value(buffer, len, OBIS_P1, value)) { - data.phase_power_w[0] = value; - } - if (find_obis_value(buffer, len, OBIS_P2, value)) { - data.phase_power_w[1] = value; - } - if (find_obis_value(buffer, len, OBIS_P3, value)) { - data.phase_power_w[2] = value; - } - if (find_obis_value(buffer, len, OBIS_V1, value)) { - data.phase_voltage_v[0] = value; - } - if (find_obis_value(buffer, len, OBIS_V2, value)) { - data.phase_voltage_v[1] = value; - } - if (find_obis_value(buffer, len, OBIS_V3, value)) { - data.phase_voltage_v[2] = value; - } - - data.valid = ok; - return ok; -} - static bool parse_obis_ascii_value(const char *line, const char *obis, float &out_value) { const char *p = strstr(line, obis); if (!p) { @@ -243,10 +87,6 @@ static bool meter_read_ascii(MeterData &data) { bool p1_ok = false; bool p2_ok = false; bool p3_ok = false; - bool v1_ok = false; - bool v2_ok = false; - bool v3_ok = false; - char line[128]; size_t line_len = 0; @@ -298,21 +138,6 @@ static bool meter_read_ascii(MeterData &data) { p3_ok = true; got_any = true; } - if (parse_obis_ascii_value(line, "1-0:32.7.0", value)) { - data.phase_voltage_v[0] = value; - v1_ok = true; - got_any = true; - } - if (parse_obis_ascii_value(line, "1-0:52.7.0", value)) { - data.phase_voltage_v[1] = value; - v2_ok = true; - got_any = true; - } - if (parse_obis_ascii_value(line, "1-0:72.7.0", value)) { - data.phase_voltage_v[2] = value; - v3_ok = true; - got_any = true; - } line_len = 0; continue; @@ -324,7 +149,7 @@ static bool meter_read_ascii(MeterData &data) { delay(5); } - data.valid = energy_ok || total_p_ok || p1_ok || p2_ok || p3_ok || v1_ok || v2_ok || v3_ok; + data.valid = energy_ok || total_p_ok || p1_ok || p2_ok || p3_ok; return data.valid; } @@ -334,13 +159,7 @@ bool meter_read(MeterData &data) { data.phase_power_w[0] = NAN; data.phase_power_w[1] = NAN; data.phase_power_w[2] = NAN; - data.phase_voltage_v[0] = NAN; - data.phase_voltage_v[1] = NAN; - data.phase_voltage_v[2] = NAN; data.valid = false; - if (meter_read_ascii(data)) { - return true; - } - return meter_read_sml(data); + return meter_read_ascii(data); } diff --git a/src/mqtt_client.cpp b/src/mqtt_client.cpp index cf81cf6..d08c616 100644 --- a/src/mqtt_client.cpp +++ b/src/mqtt_client.cpp @@ -126,9 +126,6 @@ bool mqtt_publish_discovery(const char *device_id) { bool ok = true; ok = ok && publish_discovery_sensor(device_id, "energy", "Energy", "kWh", "energy", state_topic.c_str(), "{{ value_json.e_kwh }}"); ok = ok && publish_discovery_sensor(device_id, "power", "Power", "W", "power", state_topic.c_str(), "{{ value_json.p_w }}"); - ok = ok && publish_discovery_sensor(device_id, "v1", "Voltage L1", "V", "voltage", state_topic.c_str(), "{{ value_json.v1_v }}"); - ok = ok && publish_discovery_sensor(device_id, "v2", "Voltage L2", "V", "voltage", state_topic.c_str(), "{{ value_json.v2_v }}"); - ok = ok && publish_discovery_sensor(device_id, "v3", "Voltage L3", "V", "voltage", state_topic.c_str(), "{{ value_json.v3_v }}"); ok = ok && publish_discovery_sensor(device_id, "p1", "Power L1", "W", "power", state_topic.c_str(), "{{ value_json.p1_w }}"); ok = ok && publish_discovery_sensor(device_id, "p2", "Power L2", "W", "power", state_topic.c_str(), "{{ value_json.p2_w }}"); ok = ok && publish_discovery_sensor(device_id, "p3", "Power L3", "W", "power", state_topic.c_str(), "{{ value_json.p3_w }}");