From 7540af7d71e58fc8ddf4404dc5ec01b0911736c0 Mon Sep 17 00:00:00 2001 From: acidburns Date: Fri, 13 Feb 2026 10:44:26 +0100 Subject: [PATCH] Decouple sender meter reads and harden meter RX robustness --- src/main.cpp | 169 +++++++++++++++++++++++++++++++++++-------- src/meter_driver.cpp | 8 +- 2 files changed, 147 insertions(+), 30 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index b3cc989..3543549 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -17,6 +17,9 @@ #ifdef ARDUINO_ARCH_ESP32 #include #include +#include +#include +#include #endif static DeviceRole g_role = DeviceRole::Sender; @@ -98,14 +101,26 @@ static uint32_t g_sender_rx_reject_log_ms = 0; static MeterData g_last_meter_data = {}; static bool g_last_meter_valid = false; static uint32_t g_last_meter_rx_ms = 0; -static uint32_t g_last_meter_seq = 0; -static uint32_t g_last_meter_seq_used = 0; static uint32_t g_meter_stale_seconds = 0; static bool g_time_acquired = false; static uint32_t g_last_sync_request_ms = 0; static uint32_t g_build_attempts = 0; static uint32_t g_build_valid = 0; static uint32_t g_build_invalid = 0; +static constexpr uint32_t METER_SAMPLE_MAX_AGE_MS = 15000; +struct MeterSampleEvent { + MeterData data; + uint32_t rx_ms; +}; +#ifdef ARDUINO_ARCH_ESP32 +static QueueHandle_t g_meter_sample_queue = nullptr; +static TaskHandle_t g_meter_reader_task = nullptr; +static bool g_meter_reader_task_running = false; +static constexpr UBaseType_t METER_SAMPLE_QUEUE_LEN = 8; +static constexpr uint32_t METER_READER_TASK_STACK_WORDS = 4096; +static constexpr UBaseType_t METER_READER_TASK_PRIORITY = 2; +static constexpr BaseType_t METER_READER_TASK_CORE = 0; +#endif static constexpr uint32_t DEBUG_FORCED_REBOOT_INTERVAL_MS = 3UL * 60UL * 60UL * 1000UL; enum class TxBuildError : uint8_t { @@ -130,6 +145,117 @@ static void serial_debug_printf(const char *fmt, ...) { Serial.println(buf); } +static void set_last_meter_sample(const MeterData &parsed, uint32_t rx_ms) { + g_last_meter_data = parsed; + g_last_meter_valid = true; + g_last_meter_rx_ms = rx_ms; + g_meter_stale_seconds = 0; +} + +static bool parse_meter_frame_sample(const char *frame, size_t frame_len, MeterData &parsed) { + parsed = {}; + parsed.energy_total_kwh = NAN; + parsed.total_power_w = NAN; + parsed.phase_power_w[0] = NAN; + parsed.phase_power_w[1] = NAN; + parsed.phase_power_w[2] = NAN; + parsed.valid = false; + return meter_parse_frame(frame, frame_len, parsed); +} + +#ifdef ARDUINO_ARCH_ESP32 +static void meter_queue_push_latest(const MeterSampleEvent &event) { + if (!g_meter_sample_queue) { + return; + } + if (xQueueSend(g_meter_sample_queue, &event, 0) == pdTRUE) { + return; + } + MeterSampleEvent dropped = {}; + xQueueReceive(g_meter_sample_queue, &dropped, 0); + if (xQueueSend(g_meter_sample_queue, &event, 0) != pdTRUE && SERIAL_DEBUG_MODE) { + serial_debug_printf("meter: queue push failed"); + } +} + +static void meter_reader_task_entry(void *arg) { + (void)arg; + for (;;) { + const char *frame = nullptr; + size_t frame_len = 0; + if (!meter_poll_frame(frame, frame_len)) { + vTaskDelay(pdMS_TO_TICKS(5)); + continue; + } + + MeterData parsed = {}; + if (parse_meter_frame_sample(frame, frame_len, parsed)) { + MeterSampleEvent event = {}; + event.data = parsed; + event.rx_ms = millis(); + meter_queue_push_latest(event); + } + } +} + +static bool meter_reader_start() { + if (g_meter_reader_task_running) { + return true; + } + if (!g_meter_sample_queue) { + g_meter_sample_queue = xQueueCreate(METER_SAMPLE_QUEUE_LEN, sizeof(MeterSampleEvent)); + if (!g_meter_sample_queue) { + if (SERIAL_DEBUG_MODE) { + serial_debug_printf("meter: queue alloc failed"); + } + return false; + } + } + + BaseType_t rc = xTaskCreatePinnedToCore( + meter_reader_task_entry, + "meter_reader", + METER_READER_TASK_STACK_WORDS, + nullptr, + METER_READER_TASK_PRIORITY, + &g_meter_reader_task, + METER_READER_TASK_CORE); + if (rc != pdPASS) { + if (SERIAL_DEBUG_MODE) { + serial_debug_printf("meter: task start failed rc=%ld", static_cast(rc)); + } + return false; + } + g_meter_reader_task_running = true; + serial_debug_printf("meter: reader task core=%ld queue=%u", + static_cast(METER_READER_TASK_CORE), + static_cast(METER_SAMPLE_QUEUE_LEN)); + return true; +} +#endif + +static void meter_reader_pump(uint32_t now_ms) { +#ifdef ARDUINO_ARCH_ESP32 + if (g_meter_reader_task_running && g_meter_sample_queue) { + MeterSampleEvent event = {}; + while (xQueueReceive(g_meter_sample_queue, &event, 0) == pdTRUE) { + set_last_meter_sample(event.data, event.rx_ms); + } + return; + } +#endif + + const char *frame = nullptr; + size_t frame_len = 0; + if (!meter_poll_frame(frame, frame_len)) { + return; + } + MeterData parsed = {}; + if (parse_meter_frame_sample(frame, frame_len, parsed)) { + set_last_meter_sample(parsed, now_ms); + } +} + static uint16_t g_last_batch_id_rx[NUM_SENDERS] = {}; struct BatchRxState { @@ -831,6 +957,11 @@ void setup() { power_sender_init(); power_configure_unused_pins_sender(); meter_init(); +#ifdef ARDUINO_ARCH_ESP32 + if (!meter_reader_start()) { + serial_debug_printf("meter: using inline polling fallback"); + } +#endif g_last_sample_ms = millis() - METER_SAMPLE_INTERVAL_MS; g_last_send_ms = millis(); g_last_sync_request_ms = millis() - SYNC_REQUEST_INTERVAL_MS; @@ -888,28 +1019,9 @@ static void sender_loop() { g_batch_retry_count); } - if (g_time_acquired) { - const char *frame = nullptr; - size_t frame_len = 0; - if (meter_poll_frame(frame, frame_len)) { - MeterData parsed = {}; - parsed.energy_total_kwh = NAN; - parsed.total_power_w = NAN; - parsed.phase_power_w[0] = NAN; - parsed.phase_power_w[1] = NAN; - parsed.phase_power_w[2] = NAN; - parsed.valid = false; - if (meter_parse_frame(frame, frame_len, parsed)) { - g_last_meter_data = parsed; - g_last_meter_valid = true; - g_last_meter_rx_ms = now_ms; - g_meter_stale_seconds = 0; - g_last_meter_seq++; - } else { - g_last_meter_valid = false; - } - } + meter_reader_pump(now_ms); + if (g_time_acquired) { if (now_ms - g_last_sample_ms >= METER_SAMPLE_INTERVAL_MS) { g_last_sample_ms = now_ms; MeterData data = {}; @@ -922,19 +1034,18 @@ static void sender_loop() { data.phase_power_w[2] = NAN; g_build_attempts++; - // Avoid reusing stale meter frames: only accept new parsed data once per sample. - bool meter_ok = g_last_meter_valid && (g_last_meter_seq != g_last_meter_seq_used); + uint32_t meter_age_ms = g_last_meter_valid ? (now_ms - g_last_meter_rx_ms) : UINT32_MAX; + // Reuse recent good samples to bridge short parser gaps without accepting stale data forever. + bool meter_ok = g_last_meter_valid && meter_age_ms <= METER_SAMPLE_MAX_AGE_MS; if (meter_ok) { data.energy_total_kwh = g_last_meter_data.energy_total_kwh; data.total_power_w = g_last_meter_data.total_power_w; data.phase_power_w[0] = g_last_meter_data.phase_power_w[0]; data.phase_power_w[1] = g_last_meter_data.phase_power_w[1]; data.phase_power_w[2] = g_last_meter_data.phase_power_w[2]; - uint32_t age_ms = now_ms - g_last_meter_rx_ms; - g_meter_stale_seconds = age_ms >= 1000 ? (age_ms / 1000) : 0; - g_last_meter_seq_used = g_last_meter_seq; + g_meter_stale_seconds = meter_age_ms >= 1000 ? (meter_age_ms / 1000) : 0; } else { - g_meter_stale_seconds++; + g_meter_stale_seconds = g_last_meter_valid ? (meter_age_ms / 1000) : (g_meter_stale_seconds + 1); } if (!meter_ok) { note_fault(g_sender_faults, g_sender_last_error, g_sender_last_error_utc, g_sender_last_error_ms, FaultType::MeterRead); diff --git a/src/meter_driver.cpp b/src/meter_driver.cpp index 665c4a1..50c9f00 100644 --- a/src/meter_driver.cpp +++ b/src/meter_driver.cpp @@ -4,7 +4,9 @@ #include #include -static constexpr uint32_t METER_FRAME_TIMEOUT_MS = 1500; +// LoRa TX/RX windows can block the main loop for several seconds at SF12. +// Keep partial frame state long enough so valid telegrams are not dropped. +static constexpr uint32_t METER_FRAME_TIMEOUT_MS = 20000; static constexpr size_t METER_FRAME_MAX = 512; enum class MeterRxState : uint8_t { @@ -24,6 +26,10 @@ static uint32_t g_rx_timeout = 0; static uint32_t g_last_log_ms = 0; void meter_init() { +#ifdef ARDUINO_ARCH_ESP32 + // Buffer enough serial data to survive long LoRa blocking sections. + Serial2.setRxBufferSize(8192); +#endif Serial2.begin(9600, SERIAL_7E1, PIN_METER_RX, -1); }