#include #include "config.h" #include "data_model.h" #include "json_codec.h" #include "compressor.h" #include "lora_transport.h" #include "meter_driver.h" #include "power_manager.h" #include "time_manager.h" #include "wifi_manager.h" #include "mqtt_client.h" #include "web_server.h" #include "display_ui.h" #include "test_mode.h" #include #include #ifdef ARDUINO_ARCH_ESP32 #include #include #endif static DeviceRole g_role = DeviceRole::Sender; static uint16_t g_short_id = 0; static char g_device_id[16] = ""; static SenderStatus g_sender_statuses[NUM_SENDERS]; static bool g_ap_mode = false; static WifiMqttConfig g_cfg; static uint32_t g_last_timesync_ms = 0; static constexpr uint32_t TIME_SYNC_OFFSET_MS = 15000; static uint32_t g_boot_ms = 0; static FaultCounters g_sender_faults = {}; static FaultCounters g_receiver_faults = {}; static FaultCounters g_receiver_faults_published = {}; static FaultCounters g_sender_faults_remote[NUM_SENDERS] = {}; static FaultCounters g_sender_faults_remote_published[NUM_SENDERS] = {}; static FaultType g_sender_last_error = FaultType::None; static FaultType g_receiver_last_error = FaultType::None; static FaultType g_sender_last_error_remote[NUM_SENDERS] = {}; static FaultType g_sender_last_error_remote_published[NUM_SENDERS] = {}; static FaultType g_receiver_last_error_published = FaultType::None; static uint32_t g_sender_last_error_utc = 0; static uint32_t g_sender_last_error_ms = 0; static uint32_t g_receiver_last_error_utc = 0; static uint32_t g_receiver_last_error_ms = 0; static uint32_t g_sender_last_error_remote_utc[NUM_SENDERS] = {}; static uint32_t g_sender_last_error_remote_ms[NUM_SENDERS] = {}; static bool g_sender_discovery_sent[NUM_SENDERS] = {}; static bool g_receiver_discovery_sent = false; static constexpr size_t BATCH_HEADER_SIZE = 6; static constexpr size_t BATCH_CHUNK_PAYLOAD = LORA_MAX_PAYLOAD - BATCH_HEADER_SIZE; static constexpr size_t BATCH_MAX_COMPRESSED = 4096; static constexpr size_t BATCH_MAX_DECOMPRESSED = 8192; static constexpr uint32_t BATCH_RX_MARGIN_MS = 800; struct BatchBuffer { uint16_t batch_id; bool batch_id_valid; uint8_t count; MeterData samples[METER_BATCH_MAX_SAMPLES]; }; static BatchBuffer g_batch_queue[BATCH_QUEUE_DEPTH]; static uint8_t g_batch_head = 0; static uint8_t g_batch_tail = 0; static uint8_t g_batch_count = 0; static MeterData g_build_samples[METER_BATCH_MAX_SAMPLES]; static uint8_t g_build_count = 0; static uint32_t g_last_sample_ms = 0; static uint32_t g_last_sample_ts_utc = 0; static uint32_t g_last_send_ms = 0; static uint32_t g_last_batch_send_ms = 0; static float g_last_battery_voltage_v = NAN; static uint8_t g_last_battery_percent = 0; static uint32_t g_last_battery_ms = 0; static uint16_t g_batch_id = 1; static uint16_t g_last_sent_batch_id = 0; static uint16_t g_last_acked_batch_id = 0; static uint8_t g_batch_retry_count = 0; static bool g_batch_ack_pending = false; static MeterData g_inflight_samples[METER_BATCH_MAX_SAMPLES]; static uint8_t g_inflight_count = 0; static uint16_t g_inflight_batch_id = 0; static bool g_inflight_active = false; static uint32_t g_last_debug_log_ms = 0; static void watchdog_kick(); static void serial_debug_printf(const char *fmt, ...) { if (!SERIAL_DEBUG_MODE) { return; } char buf[256]; va_list args; va_start(args, fmt); vsnprintf(buf, sizeof(buf), fmt, args); va_end(args); Serial.println(buf); } static void serial_debug_print_json(const String &json) { if (!SERIAL_DEBUG_MODE || !SERIAL_DEBUG_DUMP_JSON) { return; } const char *data = json.c_str(); size_t len = json.length(); const size_t chunk = 128; for (size_t i = 0; i < len; i += chunk) { size_t n = len - i; if (n > chunk) { n = chunk; } Serial.write(reinterpret_cast(data + i), n); watchdog_kick(); delay(0); } Serial.write('\n'); } static uint16_t g_last_batch_id_rx[NUM_SENDERS] = {}; struct BatchRxState { bool active; uint16_t batch_id; uint8_t next_index; uint8_t expected_chunks; uint16_t total_len; uint16_t received_len; uint32_t last_rx_ms; uint32_t timeout_ms; uint8_t buffer[BATCH_MAX_COMPRESSED]; }; static BatchRxState g_batch_rx = {}; static void init_sender_statuses() { for (uint8_t i = 0; i < NUM_SENDERS; ++i) { g_sender_statuses[i] = {}; g_sender_statuses[i].has_data = false; g_sender_statuses[i].last_update_ts_utc = 0; g_sender_statuses[i].last_data.short_id = EXPECTED_SENDER_IDS[i]; snprintf(g_sender_statuses[i].last_data.device_id, sizeof(g_sender_statuses[i].last_data.device_id), "dd3-%04X", EXPECTED_SENDER_IDS[i]); g_sender_faults_remote[i] = {}; g_sender_faults_remote_published[i] = {}; g_sender_last_error_remote[i] = FaultType::None; g_sender_last_error_remote_published[i] = FaultType::None; g_sender_last_error_remote_utc[i] = 0; g_sender_last_error_remote_ms[i] = 0; g_sender_discovery_sent[i] = false; } } static void update_battery_cache() { MeterData tmp = {}; read_battery(tmp); g_last_battery_voltage_v = tmp.battery_voltage_v; g_last_battery_percent = tmp.battery_percent; g_last_battery_ms = millis(); } static bool batch_queue_drop_oldest() { if (g_batch_count == 0) { return false; } bool dropped_inflight = g_inflight_active && g_batch_queue[g_batch_tail].batch_id_valid && g_inflight_batch_id == g_batch_queue[g_batch_tail].batch_id; if (dropped_inflight) { g_batch_ack_pending = false; g_batch_retry_count = 0; g_inflight_active = false; g_inflight_count = 0; g_inflight_batch_id = 0; } g_batch_tail = (g_batch_tail + 1) % BATCH_QUEUE_DEPTH; g_batch_count--; return dropped_inflight; } static BatchBuffer *batch_queue_peek() { if (g_batch_count == 0) { return nullptr; } return &g_batch_queue[g_batch_tail]; } static void batch_queue_enqueue(const MeterData *samples, uint8_t count) { if (!samples || count == 0) { return; } if (g_batch_count >= BATCH_QUEUE_DEPTH) { if (batch_queue_drop_oldest()) { g_batch_id++; } } BatchBuffer &slot = g_batch_queue[g_batch_head]; slot.batch_id = 0; slot.batch_id_valid = false; slot.count = count; for (uint8_t i = 0; i < count; ++i) { slot.samples[i] = samples[i]; } g_batch_head = (g_batch_head + 1) % BATCH_QUEUE_DEPTH; g_batch_count++; } static uint32_t last_sample_ts() { if (g_last_sample_ts_utc == 0) { uint32_t now_utc = time_get_utc(); return now_utc > 0 ? now_utc : millis() / 1000; } return g_last_sample_ts_utc; } static void note_fault(FaultCounters &counters, FaultType &last_type, uint32_t &last_ts_utc, uint32_t &last_ts_ms, FaultType type) { if (type == FaultType::MeterRead) { counters.meter_read_fail++; } else if (type == FaultType::Decode) { counters.decode_fail++; } else if (type == FaultType::LoraTx) { counters.lora_tx_fail++; } last_type = type; last_ts_utc = time_get_utc(); last_ts_ms = millis(); } static uint32_t age_seconds(uint32_t ts_utc, uint32_t ts_ms) { if (time_is_synced() && ts_utc > 0) { uint32_t now = time_get_utc(); return now > ts_utc ? now - ts_utc : 0; } return (millis() - ts_ms) / 1000; } static bool counters_changed(const FaultCounters &a, const FaultCounters &b) { return a.meter_read_fail != b.meter_read_fail || a.decode_fail != b.decode_fail || a.lora_tx_fail != b.lora_tx_fail; } static void publish_faults_if_needed(const char *device_id, const FaultCounters &counters, FaultCounters &last_published, FaultType last_error, FaultType &last_error_published, uint32_t last_error_utc, uint32_t last_error_ms) { if (!mqtt_is_connected()) { return; } if (!counters_changed(counters, last_published) && last_error == last_error_published) { return; } uint32_t age = last_error != FaultType::None ? age_seconds(last_error_utc, last_error_ms) : 0; if (mqtt_publish_faults(device_id, counters, last_error, age)) { last_published = counters; last_error_published = last_error; } } #ifdef ARDUINO_ARCH_ESP32 static void watchdog_init() { esp_task_wdt_deinit(); esp_task_wdt_config_t config = {}; config.timeout_ms = WATCHDOG_TIMEOUT_SEC * 1000; config.idle_core_mask = 0; config.trigger_panic = true; esp_task_wdt_init(&config); esp_task_wdt_add(nullptr); } static void watchdog_kick() { esp_task_wdt_reset(); } #else static void watchdog_init() {} static void watchdog_kick() {} #endif static void write_u16_le(uint8_t *dst, uint16_t value) { dst[0] = static_cast(value & 0xFF); dst[1] = static_cast((value >> 8) & 0xFF); } static uint16_t read_u16_le(const uint8_t *src) { return static_cast(src[0]) | (static_cast(src[1]) << 8); } static uint32_t compute_batch_rx_timeout_ms(uint16_t total_len, uint8_t chunk_count) { if (total_len == 0 || chunk_count == 0) { return 10000; } size_t max_chunk_payload = total_len > BATCH_CHUNK_PAYLOAD ? BATCH_CHUNK_PAYLOAD : total_len; size_t payload_len = BATCH_HEADER_SIZE + max_chunk_payload; size_t packet_len = 5 + payload_len + 2; uint32_t per_chunk_toa_ms = lora_airtime_ms(packet_len); uint32_t timeout_ms = static_cast(chunk_count) * per_chunk_toa_ms + BATCH_RX_MARGIN_MS; return timeout_ms < 10000 ? 10000 : timeout_ms; } static bool inject_batch_meta(String &json, int16_t rssi_dbm, float snr_db, uint32_t rx_ts_utc) { DynamicJsonDocument doc(8192); DeserializationError err = deserializeJson(doc, json); if (err) { return false; } JsonObject meta = doc.createNestedObject("meta"); meta["rssi"] = rssi_dbm; meta["snr"] = snr_db; meta["rx_ts"] = rx_ts_utc; json = ""; return serializeJson(doc, json) > 0; } static bool send_batch_payload(const uint8_t *data, size_t len, uint32_t ts_for_display, uint16_t batch_id) { if (!data || len == 0 || len > BATCH_MAX_COMPRESSED) { return false; } uint8_t chunk_count = static_cast((len + BATCH_CHUNK_PAYLOAD - 1) / BATCH_CHUNK_PAYLOAD); if (chunk_count == 0) { return false; } bool all_ok = true; size_t offset = 0; for (uint8_t i = 0; i < chunk_count; ++i) { size_t chunk_len = len - offset; if (chunk_len > BATCH_CHUNK_PAYLOAD) { chunk_len = BATCH_CHUNK_PAYLOAD; } LoraPacket pkt = {}; pkt.protocol_version = PROTOCOL_VERSION; pkt.role = DeviceRole::Sender; pkt.device_id_short = g_short_id; pkt.payload_type = PayloadType::MeterBatch; pkt.payload_len = chunk_len + BATCH_HEADER_SIZE; uint8_t *payload = pkt.payload; write_u16_le(&payload[0], batch_id); payload[2] = i; payload[3] = chunk_count; write_u16_le(&payload[4], static_cast(len)); memcpy(&payload[BATCH_HEADER_SIZE], data + offset, chunk_len); watchdog_kick(); uint32_t tx_start = millis(); bool ok = lora_send(pkt); uint32_t tx_ms = millis() - tx_start; all_ok = all_ok && ok; if (!ok) { note_fault(g_sender_faults, g_sender_last_error, g_sender_last_error_utc, g_sender_last_error_ms, FaultType::LoraTx); display_set_last_error(g_sender_last_error, g_sender_last_error_utc, g_sender_last_error_ms); } if (SERIAL_DEBUG_MODE && tx_ms > 500) { serial_debug_printf("tx: chunk %u/%u took %lums ok=%u", static_cast(i + 1), static_cast(chunk_count), static_cast(tx_ms), ok ? 1 : 0); } offset += chunk_len; delay(10); } display_set_last_tx(all_ok, ts_for_display); return all_ok; } static void send_batch_ack(uint16_t batch_id, uint16_t sender_id) { LoraPacket ack = {}; ack.protocol_version = PROTOCOL_VERSION; ack.role = DeviceRole::Receiver; ack.device_id_short = g_short_id; ack.payload_type = PayloadType::Ack; ack.payload_len = 6; write_u16_le(&ack.payload[0], batch_id); write_u16_le(&ack.payload[2], sender_id); write_u16_le(&ack.payload[4], g_short_id); lora_send(ack); } static bool prepare_inflight_from_queue() { if (g_inflight_active) { return true; } BatchBuffer *batch = batch_queue_peek(); if (!batch || batch->count == 0) { return false; } if (!batch->batch_id_valid) { batch->batch_id = g_batch_id; batch->batch_id_valid = true; } g_inflight_count = batch->count; g_inflight_batch_id = batch->batch_id; for (uint8_t i = 0; i < g_inflight_count; ++i) { g_inflight_samples[i] = batch->samples[i]; } g_inflight_active = true; return true; } static bool send_inflight_batch(uint32_t ts_for_display) { if (!g_inflight_active || g_inflight_count == 0) { return false; } uint32_t json_start = millis(); String json; if (!meterBatchToJson(g_inflight_samples, g_inflight_count, g_inflight_batch_id, json, &g_sender_faults, g_sender_last_error)) { return false; } uint32_t json_ms = millis() - json_start; if (SERIAL_DEBUG_MODE) { serial_debug_printf("tx: batch_id=%u count=%u json_len=%u", g_inflight_batch_id, g_inflight_count, static_cast(json.length())); if (json_ms > 200) { serial_debug_printf("tx: json encode took %lums", static_cast(json_ms)); } serial_debug_print_json(json); } static uint8_t compressed[BATCH_MAX_COMPRESSED]; size_t compressed_len = 0; uint32_t compress_start = millis(); if (!compressBuffer(reinterpret_cast(json.c_str()), json.length(), compressed, sizeof(compressed), compressed_len)) { return false; } uint32_t compress_ms = millis() - compress_start; if (SERIAL_DEBUG_MODE && compress_ms > 200) { serial_debug_printf("tx: compress took %lums", static_cast(compress_ms)); } uint32_t send_start = millis(); bool ok = send_batch_payload(compressed, compressed_len, ts_for_display, g_inflight_batch_id); uint32_t send_ms = millis() - send_start; if (SERIAL_DEBUG_MODE && send_ms > 1000) { serial_debug_printf("tx: send batch took %lums", static_cast(send_ms)); } if (ok) { g_last_batch_send_ms = millis(); serial_debug_printf("tx: sent batch_id=%u len=%u", g_inflight_batch_id, static_cast(compressed_len)); } else { serial_debug_printf("tx: send failed batch_id=%u", g_inflight_batch_id); } return ok; } static bool send_meter_batch(uint32_t ts_for_display) { if (!prepare_inflight_from_queue()) { return false; } bool ok = send_inflight_batch(ts_for_display); if (ok) { g_last_sent_batch_id = g_inflight_batch_id; g_batch_ack_pending = true; } else { g_inflight_active = false; g_inflight_count = 0; g_inflight_batch_id = 0; } return ok; } static bool resend_inflight_batch(uint32_t ts_for_display) { if (!g_batch_ack_pending || !g_inflight_active || g_inflight_count == 0) { return false; } return send_inflight_batch(ts_for_display); } static void finish_inflight_batch() { if (g_batch_count > 0) { batch_queue_drop_oldest(); } g_batch_ack_pending = false; g_batch_retry_count = 0; g_inflight_active = false; g_inflight_count = 0; g_inflight_batch_id = 0; g_batch_id++; } static void reset_batch_rx() { g_batch_rx.active = false; g_batch_rx.batch_id = 0; g_batch_rx.next_index = 0; g_batch_rx.expected_chunks = 0; g_batch_rx.total_len = 0; g_batch_rx.received_len = 0; g_batch_rx.last_rx_ms = 0; g_batch_rx.timeout_ms = 0; } static bool process_batch_packet(const LoraPacket &pkt, String &out_json, bool &decode_error, uint16_t &out_batch_id) { decode_error = false; if (pkt.payload_len < BATCH_HEADER_SIZE) { return false; } uint16_t batch_id = read_u16_le(&pkt.payload[0]); uint8_t chunk_index = pkt.payload[2]; uint8_t chunk_count = pkt.payload[3]; uint16_t total_len = read_u16_le(&pkt.payload[4]); const uint8_t *chunk_data = &pkt.payload[BATCH_HEADER_SIZE]; size_t chunk_len = pkt.payload_len - BATCH_HEADER_SIZE; uint32_t now_ms = millis(); if (!g_batch_rx.active || batch_id != g_batch_rx.batch_id || (now_ms - g_batch_rx.last_rx_ms > g_batch_rx.timeout_ms)) { if (chunk_index != 0) { reset_batch_rx(); return false; } if (total_len == 0 || total_len > BATCH_MAX_COMPRESSED || chunk_count == 0) { reset_batch_rx(); return false; } g_batch_rx.active = true; g_batch_rx.batch_id = batch_id; g_batch_rx.expected_chunks = chunk_count; g_batch_rx.total_len = total_len; g_batch_rx.received_len = 0; g_batch_rx.next_index = 0; g_batch_rx.timeout_ms = compute_batch_rx_timeout_ms(total_len, chunk_count); } if (!g_batch_rx.active || chunk_index != g_batch_rx.next_index || chunk_count != g_batch_rx.expected_chunks) { reset_batch_rx(); return false; } if (g_batch_rx.received_len + chunk_len > g_batch_rx.total_len || g_batch_rx.received_len + chunk_len > BATCH_MAX_COMPRESSED) { reset_batch_rx(); return false; } memcpy(&g_batch_rx.buffer[g_batch_rx.received_len], chunk_data, chunk_len); g_batch_rx.received_len += static_cast(chunk_len); g_batch_rx.next_index++; g_batch_rx.last_rx_ms = now_ms; if (g_batch_rx.next_index == g_batch_rx.expected_chunks && g_batch_rx.received_len == g_batch_rx.total_len) { static uint8_t decompressed[BATCH_MAX_DECOMPRESSED]; size_t decompressed_len = 0; if (!decompressBuffer(g_batch_rx.buffer, g_batch_rx.received_len, decompressed, sizeof(decompressed) - 1, decompressed_len)) { decode_error = true; reset_batch_rx(); return false; } if (decompressed_len >= sizeof(decompressed)) { decode_error = true; reset_batch_rx(); return false; } decompressed[decompressed_len] = '\0'; out_json = String(reinterpret_cast(decompressed)); out_batch_id = batch_id; reset_batch_rx(); return true; } return false; } void setup() { Serial.begin(115200); delay(200); watchdog_init(); g_boot_ms = millis(); g_role = detect_role(); init_device_ids(g_short_id, g_device_id, sizeof(g_device_id)); if (SERIAL_DEBUG_MODE) { #ifdef ARDUINO_ARCH_ESP32 serial_debug_printf("boot: reset_reason=%d", static_cast(esp_reset_reason())); #endif serial_debug_printf("boot: role=%s short_id=%04X dev=%s", g_role == DeviceRole::Sender ? "sender" : "receiver", g_short_id, g_device_id); } lora_init(); display_init(); time_rtc_init(); time_try_load_from_rtc(); display_set_role(g_role); display_set_self_ids(g_short_id, g_device_id); if (g_role == DeviceRole::Sender) { power_sender_init(); meter_init(); g_last_sample_ms = millis() - METER_SAMPLE_INTERVAL_MS; g_last_send_ms = millis(); update_battery_cache(); } else { power_receiver_init(); wifi_manager_init(); init_sender_statuses(); display_set_sender_statuses(g_sender_statuses, NUM_SENDERS); bool has_cfg = wifi_load_config(g_cfg); if (has_cfg && wifi_connect_sta(g_cfg)) { g_ap_mode = false; time_receiver_init(g_cfg.ntp_server_1.c_str(), g_cfg.ntp_server_2.c_str()); mqtt_init(g_cfg, g_device_id); web_server_set_config(g_cfg); web_server_set_sender_faults(g_sender_faults_remote, g_sender_last_error_remote); web_server_begin_sta(g_sender_statuses, NUM_SENDERS); } else { g_ap_mode = true; char ap_ssid[32]; snprintf(ap_ssid, sizeof(ap_ssid), "DD3-Bridge-%04X", g_short_id); wifi_start_ap(ap_ssid, "changeme123"); if (g_cfg.ntp_server_1.isEmpty()) { g_cfg.ntp_server_1 = "pool.ntp.org"; } if (g_cfg.ntp_server_2.isEmpty()) { g_cfg.ntp_server_2 = "time.nist.gov"; } web_server_set_config(g_cfg); web_server_set_sender_faults(g_sender_faults_remote, g_sender_last_error_remote); web_server_begin_ap(g_sender_statuses, NUM_SENDERS); } } } static void sender_loop() { watchdog_kick(); uint32_t now_ms = millis(); display_set_sender_queue(g_batch_count, g_build_count > 0); display_set_sender_batches(g_last_acked_batch_id, g_batch_id); if (SERIAL_DEBUG_MODE && now_ms - g_last_debug_log_ms >= 5000) { g_last_debug_log_ms = now_ms; serial_debug_printf("state: Q=%u%s A=%u C=%u inflight=%u ack_pending=%u retries=%u", g_batch_count, g_build_count > 0 ? "+" : "", g_last_acked_batch_id, g_batch_id, g_inflight_count, g_batch_ack_pending ? 1 : 0, g_batch_retry_count); } if (now_ms - g_last_sample_ms >= METER_SAMPLE_INTERVAL_MS) { g_last_sample_ms = now_ms; MeterData data = {}; data.short_id = g_short_id; strncpy(data.device_id, g_device_id, sizeof(data.device_id)); bool meter_ok = meter_read(data); if (!meter_ok) { note_fault(g_sender_faults, g_sender_last_error, g_sender_last_error_utc, g_sender_last_error_ms, FaultType::MeterRead); display_set_last_error(g_sender_last_error, g_sender_last_error_utc, g_sender_last_error_ms); } if (g_build_count == 0) { update_battery_cache(); } data.battery_voltage_v = g_last_battery_voltage_v; data.battery_percent = g_last_battery_percent; uint32_t now_utc = time_get_utc(); data.ts_utc = now_utc > 0 ? now_utc : millis() / 1000; data.valid = meter_ok; g_last_sample_ts_utc = data.ts_utc; g_build_samples[g_build_count++] = data; if (g_build_count >= METER_BATCH_MAX_SAMPLES) { batch_queue_enqueue(g_build_samples, g_build_count); g_build_count = 0; } display_set_last_meter(data); display_set_last_read(meter_ok, data.ts_utc); } if (!g_batch_ack_pending && now_ms - g_last_send_ms >= METER_SEND_INTERVAL_MS) { g_last_send_ms = now_ms; send_meter_batch(last_sample_ts()); } LoraPacket rx = {}; if (lora_receive(rx, 0) && rx.protocol_version == PROTOCOL_VERSION) { if (rx.payload_type == PayloadType::TimeSync) { time_handle_timesync_payload(rx.payload, rx.payload_len); } else if (rx.payload_type == PayloadType::Ack && rx.payload_len >= 6 && rx.role == DeviceRole::Receiver) { uint16_t ack_id = read_u16_le(rx.payload); uint16_t ack_sender = read_u16_le(&rx.payload[2]); uint16_t ack_receiver = read_u16_le(&rx.payload[4]); if (ack_sender == g_short_id && ack_receiver == rx.device_id_short && g_batch_ack_pending && ack_id == g_last_sent_batch_id) { g_last_acked_batch_id = ack_id; serial_debug_printf("ack: ok batch_id=%u", ack_id); finish_inflight_batch(); } } } if (g_batch_ack_pending && (now_ms - g_last_batch_send_ms >= BATCH_ACK_TIMEOUT_MS)) { if (g_batch_retry_count < BATCH_MAX_RETRIES) { g_batch_retry_count++; serial_debug_printf("ack: timeout batch_id=%u retry=%u", g_inflight_batch_id, g_batch_retry_count); resend_inflight_batch(last_sample_ts()); } else { serial_debug_printf("ack: failed batch_id=%u policy=%s", g_inflight_batch_id, BATCH_RETRY_POLICY == BatchRetryPolicy::Drop ? "drop" : "keep"); if (BATCH_RETRY_POLICY == BatchRetryPolicy::Drop) { finish_inflight_batch(); } else { g_batch_ack_pending = false; g_batch_retry_count = 0; g_inflight_active = false; g_inflight_count = 0; g_inflight_batch_id = 0; } note_fault(g_sender_faults, g_sender_last_error, g_sender_last_error_utc, g_sender_last_error_ms, FaultType::LoraTx); display_set_last_error(g_sender_last_error, g_sender_last_error_utc, g_sender_last_error_ms); } } display_tick(); uint32_t next_sample_due = g_last_sample_ms + METER_SAMPLE_INTERVAL_MS; uint32_t next_send_due = g_last_send_ms + METER_SEND_INTERVAL_MS; uint32_t next_due = next_sample_due < next_send_due ? next_sample_due : next_send_due; if (!g_batch_ack_pending && next_due > now_ms) { watchdog_kick(); light_sleep_ms(next_due - now_ms); } } static void receiver_loop() { watchdog_kick(); if (g_last_timesync_ms == 0) { g_last_timesync_ms = millis() - (TIME_SYNC_INTERVAL_SEC * 1000UL - TIME_SYNC_OFFSET_MS); } LoraPacket pkt = {}; if (lora_receive(pkt, 0) && pkt.protocol_version == PROTOCOL_VERSION) { if (pkt.payload_type == PayloadType::MeterData) { uint8_t decompressed[256]; size_t decompressed_len = 0; if (!decompressBuffer(pkt.payload, pkt.payload_len, decompressed, sizeof(decompressed) - 1, decompressed_len)) { note_fault(g_receiver_faults, g_receiver_last_error, g_receiver_last_error_utc, g_receiver_last_error_ms, FaultType::Decode); display_set_last_error(g_receiver_last_error, g_receiver_last_error_utc, g_receiver_last_error_ms); } else { if (decompressed_len >= sizeof(decompressed)) { note_fault(g_receiver_faults, g_receiver_last_error, g_receiver_last_error_utc, g_receiver_last_error_ms, FaultType::Decode); display_set_last_error(g_receiver_last_error, g_receiver_last_error_utc, g_receiver_last_error_ms); return; } decompressed[decompressed_len] = '\0'; MeterData data = {}; if (jsonToMeterData(String(reinterpret_cast(decompressed)), data)) { data.link_valid = true; data.link_rssi_dbm = pkt.rssi_dbm; data.link_snr_db = pkt.snr_db; for (uint8_t i = 0; i < NUM_SENDERS; ++i) { if (pkt.device_id_short == EXPECTED_SENDER_IDS[i]) { data.short_id = pkt.device_id_short; g_sender_statuses[i].last_data = data; g_sender_statuses[i].last_update_ts_utc = data.ts_utc; g_sender_statuses[i].has_data = true; g_sender_faults_remote[i].meter_read_fail = data.err_meter_read; g_sender_faults_remote[i].lora_tx_fail = data.err_lora_tx; g_sender_last_error_remote[i] = data.last_error; g_sender_last_error_remote_utc[i] = time_get_utc(); g_sender_last_error_remote_ms[i] = millis(); mqtt_publish_state(data); if (ENABLE_HA_DISCOVERY && !g_sender_discovery_sent[i]) { g_sender_discovery_sent[i] = mqtt_publish_discovery(data.device_id); } publish_faults_if_needed(data.device_id, g_sender_faults_remote[i], g_sender_faults_remote_published[i], g_sender_last_error_remote[i], g_sender_last_error_remote_published[i], g_sender_last_error_remote_utc[i], g_sender_last_error_remote_ms[i]); break; } } } else { note_fault(g_receiver_faults, g_receiver_last_error, g_receiver_last_error_utc, g_receiver_last_error_ms, FaultType::Decode); display_set_last_error(g_receiver_last_error, g_receiver_last_error_utc, g_receiver_last_error_ms); } } } else if (pkt.payload_type == PayloadType::MeterBatch) { String json; bool decode_error = false; uint16_t batch_id = 0; if (process_batch_packet(pkt, json, decode_error, batch_id)) { uint32_t rx_ts_utc = time_get_utc(); if (rx_ts_utc == 0) { rx_ts_utc = millis() / 1000; } inject_batch_meta(json, pkt.rssi_dbm, pkt.snr_db, rx_ts_utc); MeterData samples[METER_BATCH_MAX_SAMPLES]; size_t count = 0; int8_t sender_idx = -1; for (uint8_t i = 0; i < NUM_SENDERS; ++i) { if (pkt.device_id_short == EXPECTED_SENDER_IDS[i]) { sender_idx = static_cast(i); break; } } bool duplicate = sender_idx >= 0 && g_last_batch_id_rx[sender_idx] == batch_id; if (duplicate) { send_batch_ack(batch_id, pkt.device_id_short); } else if (jsonToMeterBatch(json, samples, METER_BATCH_MAX_SAMPLES, count)) { if (sender_idx >= 0) { web_server_set_last_batch(static_cast(sender_idx), samples, count); for (size_t s = 0; s < count; ++s) { samples[s].link_valid = true; samples[s].link_rssi_dbm = pkt.rssi_dbm; samples[s].link_snr_db = pkt.snr_db; samples[s].short_id = pkt.device_id_short; mqtt_publish_state(samples[s]); } if (count > 0) { g_sender_statuses[sender_idx].last_data = samples[count - 1]; g_sender_statuses[sender_idx].last_update_ts_utc = samples[count - 1].ts_utc; g_sender_statuses[sender_idx].has_data = true; g_sender_faults_remote[sender_idx].meter_read_fail = samples[count - 1].err_meter_read; g_sender_faults_remote[sender_idx].lora_tx_fail = samples[count - 1].err_lora_tx; g_sender_last_error_remote[sender_idx] = samples[count - 1].last_error; g_sender_last_error_remote_utc[sender_idx] = time_get_utc(); g_sender_last_error_remote_ms[sender_idx] = millis(); if (ENABLE_HA_DISCOVERY && !g_sender_discovery_sent[sender_idx]) { g_sender_discovery_sent[sender_idx] = mqtt_publish_discovery(samples[count - 1].device_id); } publish_faults_if_needed(samples[count - 1].device_id, g_sender_faults_remote[sender_idx], g_sender_faults_remote_published[sender_idx], g_sender_last_error_remote[sender_idx], g_sender_last_error_remote_published[sender_idx], g_sender_last_error_remote_utc[sender_idx], g_sender_last_error_remote_ms[sender_idx]); } g_last_batch_id_rx[sender_idx] = batch_id; send_batch_ack(batch_id, pkt.device_id_short); } } else { note_fault(g_receiver_faults, g_receiver_last_error, g_receiver_last_error_utc, g_receiver_last_error_ms, FaultType::Decode); display_set_last_error(g_receiver_last_error, g_receiver_last_error_utc, g_receiver_last_error_ms); } } else if (decode_error) { note_fault(g_receiver_faults, g_receiver_last_error, g_receiver_last_error_utc, g_receiver_last_error_ms, FaultType::Decode); display_set_last_error(g_receiver_last_error, g_receiver_last_error_utc, g_receiver_last_error_ms); } } } uint32_t interval_sec = TIME_SYNC_INTERVAL_SEC; if (time_rtc_present() && millis() - g_boot_ms >= TIME_SYNC_FAST_WINDOW_MS) { interval_sec = TIME_SYNC_SLOW_INTERVAL_SEC; } if (!g_ap_mode && millis() - g_last_timesync_ms > interval_sec * 1000UL) { g_last_timesync_ms = millis(); if (!time_send_timesync(g_short_id)) { note_fault(g_receiver_faults, g_receiver_last_error, g_receiver_last_error_utc, g_receiver_last_error_ms, FaultType::LoraTx); display_set_last_error(g_receiver_last_error, g_receiver_last_error_utc, g_receiver_last_error_ms); } } mqtt_loop(); web_server_loop(); if (ENABLE_HA_DISCOVERY && !g_receiver_discovery_sent) { g_receiver_discovery_sent = mqtt_publish_discovery(g_device_id); } publish_faults_if_needed(g_device_id, g_receiver_faults, g_receiver_faults_published, g_receiver_last_error, g_receiver_last_error_published, g_receiver_last_error_utc, g_receiver_last_error_ms); display_set_receiver_status(g_ap_mode, wifi_is_connected() ? wifi_get_ssid().c_str() : "AP", mqtt_is_connected()); display_tick(); watchdog_kick(); } void loop() { #ifdef ENABLE_TEST_MODE if (g_role == DeviceRole::Sender) { test_sender_loop(g_short_id, g_device_id); display_tick(); watchdog_kick(); delay(50); } else { test_receiver_loop(g_sender_statuses, NUM_SENDERS, g_short_id); mqtt_loop(); web_server_loop(); display_set_receiver_status(g_ap_mode, wifi_is_connected() ? wifi_get_ssid().c_str() : "AP", mqtt_is_connected()); display_tick(); watchdog_kick(); delay(50); } return; #endif if (g_role == DeviceRole::Sender) { sender_loop(); } else { receiver_loop(); } }