#include #include "config.h" #include "data_model.h" #include "json_codec.h" #include "compressor.h" #include "lora_transport.h" #include "meter_driver.h" #include "power_manager.h" #include "time_manager.h" #include "wifi_manager.h" #include "mqtt_client.h" #include "web_server.h" #include "display_ui.h" #include "test_mode.h" static DeviceRole g_role = DeviceRole::Sender; static uint16_t g_short_id = 0; static char g_device_id[16] = ""; static SenderStatus g_sender_statuses[NUM_SENDERS]; static bool g_ap_mode = false; static WifiMqttConfig g_cfg; static uint32_t g_last_timesync_ms = 0; static constexpr uint32_t TIME_SYNC_OFFSET_MS = 15000; static uint32_t g_boot_ms = 0; static constexpr size_t BATCH_HEADER_SIZE = 6; static constexpr size_t BATCH_CHUNK_PAYLOAD = LORA_MAX_PAYLOAD - BATCH_HEADER_SIZE; static constexpr size_t BATCH_MAX_COMPRESSED = 4096; static constexpr size_t BATCH_MAX_DECOMPRESSED = 8192; static constexpr uint32_t BATCH_RX_TIMEOUT_MS = 2000; static MeterData g_meter_samples[METER_BATCH_MAX_SAMPLES]; static uint8_t g_meter_sample_count = 0; static uint8_t g_meter_sample_head = 0; static uint32_t g_last_sample_ms = 0; static uint32_t g_last_send_ms = 0; static uint16_t g_batch_id = 1; struct BatchRxState { bool active; uint16_t batch_id; uint8_t next_index; uint8_t expected_chunks; uint16_t total_len; uint16_t received_len; uint32_t last_rx_ms; uint8_t buffer[BATCH_MAX_COMPRESSED]; }; static BatchRxState g_batch_rx = {}; static void init_sender_statuses() { for (uint8_t i = 0; i < NUM_SENDERS; ++i) { g_sender_statuses[i] = {}; g_sender_statuses[i].has_data = false; g_sender_statuses[i].last_update_ts_utc = 0; g_sender_statuses[i].last_data.short_id = EXPECTED_SENDER_IDS[i]; snprintf(g_sender_statuses[i].last_data.device_id, sizeof(g_sender_statuses[i].last_data.device_id), "dd3-%04X", EXPECTED_SENDER_IDS[i]); } } static void push_meter_sample(const MeterData &data) { g_meter_samples[g_meter_sample_head] = data; g_meter_sample_head = (g_meter_sample_head + 1) % METER_BATCH_MAX_SAMPLES; if (g_meter_sample_count < METER_BATCH_MAX_SAMPLES) { g_meter_sample_count++; } } static size_t copy_meter_samples(MeterData *out, size_t max_count) { if (!out || max_count == 0 || g_meter_sample_count == 0) { return 0; } size_t count = g_meter_sample_count < max_count ? g_meter_sample_count : max_count; size_t start = (g_meter_sample_head + METER_BATCH_MAX_SAMPLES - count) % METER_BATCH_MAX_SAMPLES; for (size_t i = 0; i < count; ++i) { out[i] = g_meter_samples[(start + i) % METER_BATCH_MAX_SAMPLES]; } return count; } static uint32_t last_sample_ts() { if (g_meter_sample_count == 0) { uint32_t now_utc = time_get_utc(); return now_utc > 0 ? now_utc : millis() / 1000; } size_t idx = (g_meter_sample_head + METER_BATCH_MAX_SAMPLES - 1) % METER_BATCH_MAX_SAMPLES; return g_meter_samples[idx].ts_utc; } static void write_u16_le(uint8_t *dst, uint16_t value) { dst[0] = static_cast(value & 0xFF); dst[1] = static_cast((value >> 8) & 0xFF); } static uint16_t read_u16_le(const uint8_t *src) { return static_cast(src[0]) | (static_cast(src[1]) << 8); } static bool send_batch_payload(const uint8_t *data, size_t len, uint32_t ts_for_display) { if (!data || len == 0 || len > BATCH_MAX_COMPRESSED) { return false; } uint8_t chunk_count = static_cast((len + BATCH_CHUNK_PAYLOAD - 1) / BATCH_CHUNK_PAYLOAD); if (chunk_count == 0) { return false; } bool all_ok = true; size_t offset = 0; for (uint8_t i = 0; i < chunk_count; ++i) { size_t chunk_len = len - offset; if (chunk_len > BATCH_CHUNK_PAYLOAD) { chunk_len = BATCH_CHUNK_PAYLOAD; } LoraPacket pkt = {}; pkt.protocol_version = PROTOCOL_VERSION; pkt.role = DeviceRole::Sender; pkt.device_id_short = g_short_id; pkt.payload_type = PayloadType::MeterBatch; pkt.payload_len = chunk_len + BATCH_HEADER_SIZE; uint8_t *payload = pkt.payload; write_u16_le(&payload[0], g_batch_id); payload[2] = i; payload[3] = chunk_count; write_u16_le(&payload[4], static_cast(len)); memcpy(&payload[BATCH_HEADER_SIZE], data + offset, chunk_len); bool ok = lora_send(pkt); all_ok = all_ok && ok; offset += chunk_len; delay(10); } if (all_ok) { g_batch_id++; } display_set_last_tx(all_ok, ts_for_display); return all_ok; } static bool send_meter_batch(uint32_t ts_for_display) { MeterData ordered[METER_BATCH_MAX_SAMPLES]; size_t count = copy_meter_samples(ordered, METER_BATCH_MAX_SAMPLES); if (count == 0) { return false; } String json; if (!meterBatchToJson(ordered, count, json)) { return false; } static uint8_t compressed[BATCH_MAX_COMPRESSED]; size_t compressed_len = 0; if (!compressBuffer(reinterpret_cast(json.c_str()), json.length(), compressed, sizeof(compressed), compressed_len)) { return false; } bool ok = send_batch_payload(compressed, compressed_len, ts_for_display); if (ok) { g_meter_sample_count = 0; g_meter_sample_head = 0; } return ok; } static void reset_batch_rx() { g_batch_rx.active = false; g_batch_rx.batch_id = 0; g_batch_rx.next_index = 0; g_batch_rx.expected_chunks = 0; g_batch_rx.total_len = 0; g_batch_rx.received_len = 0; g_batch_rx.last_rx_ms = 0; } static bool process_batch_packet(const LoraPacket &pkt, String &out_json) { if (pkt.payload_len < BATCH_HEADER_SIZE) { return false; } uint16_t batch_id = read_u16_le(&pkt.payload[0]); uint8_t chunk_index = pkt.payload[2]; uint8_t chunk_count = pkt.payload[3]; uint16_t total_len = read_u16_le(&pkt.payload[4]); const uint8_t *chunk_data = &pkt.payload[BATCH_HEADER_SIZE]; size_t chunk_len = pkt.payload_len - BATCH_HEADER_SIZE; uint32_t now_ms = millis(); if (!g_batch_rx.active || batch_id != g_batch_rx.batch_id || (now_ms - g_batch_rx.last_rx_ms > BATCH_RX_TIMEOUT_MS)) { if (chunk_index != 0) { reset_batch_rx(); return false; } if (total_len == 0 || total_len > BATCH_MAX_COMPRESSED || chunk_count == 0) { reset_batch_rx(); return false; } g_batch_rx.active = true; g_batch_rx.batch_id = batch_id; g_batch_rx.expected_chunks = chunk_count; g_batch_rx.total_len = total_len; g_batch_rx.received_len = 0; g_batch_rx.next_index = 0; } if (!g_batch_rx.active || chunk_index != g_batch_rx.next_index || chunk_count != g_batch_rx.expected_chunks) { reset_batch_rx(); return false; } if (g_batch_rx.received_len + chunk_len > g_batch_rx.total_len || g_batch_rx.received_len + chunk_len > BATCH_MAX_COMPRESSED) { reset_batch_rx(); return false; } memcpy(&g_batch_rx.buffer[g_batch_rx.received_len], chunk_data, chunk_len); g_batch_rx.received_len += static_cast(chunk_len); g_batch_rx.next_index++; g_batch_rx.last_rx_ms = now_ms; if (g_batch_rx.next_index == g_batch_rx.expected_chunks && g_batch_rx.received_len == g_batch_rx.total_len) { static uint8_t decompressed[BATCH_MAX_DECOMPRESSED]; size_t decompressed_len = 0; if (!decompressBuffer(g_batch_rx.buffer, g_batch_rx.received_len, decompressed, sizeof(decompressed) - 1, decompressed_len)) { reset_batch_rx(); return false; } if (decompressed_len >= sizeof(decompressed)) { reset_batch_rx(); return false; } decompressed[decompressed_len] = '\0'; out_json = String(reinterpret_cast(decompressed)); reset_batch_rx(); return true; } return false; } void setup() { Serial.begin(115200); delay(200); g_boot_ms = millis(); g_role = detect_role(); init_device_ids(g_short_id, g_device_id, sizeof(g_device_id)); lora_init(); display_init(); time_rtc_init(); time_try_load_from_rtc(); display_set_role(g_role); display_set_self_ids(g_short_id, g_device_id); if (g_role == DeviceRole::Sender) { power_sender_init(); meter_init(); g_last_sample_ms = millis() - METER_SAMPLE_INTERVAL_MS; g_last_send_ms = millis(); } else { power_receiver_init(); wifi_manager_init(); init_sender_statuses(); display_set_sender_statuses(g_sender_statuses, NUM_SENDERS); bool has_cfg = wifi_load_config(g_cfg); if (has_cfg && wifi_connect_sta(g_cfg)) { g_ap_mode = false; time_receiver_init(g_cfg.ntp_server_1.c_str(), g_cfg.ntp_server_2.c_str()); mqtt_init(g_cfg, g_device_id); web_server_set_config(g_cfg); web_server_begin_sta(g_sender_statuses, NUM_SENDERS); } else { g_ap_mode = true; char ap_ssid[32]; snprintf(ap_ssid, sizeof(ap_ssid), "DD3-Bridge-%04X", g_short_id); wifi_start_ap(ap_ssid, "changeme123"); if (g_cfg.ntp_server_1.isEmpty()) { g_cfg.ntp_server_1 = "pool.ntp.org"; } if (g_cfg.ntp_server_2.isEmpty()) { g_cfg.ntp_server_2 = "time.nist.gov"; } web_server_set_config(g_cfg); web_server_begin_ap(g_sender_statuses, NUM_SENDERS); } } } static void sender_loop() { uint32_t now_ms = millis(); if (now_ms - g_last_sample_ms >= METER_SAMPLE_INTERVAL_MS) { g_last_sample_ms = now_ms; MeterData data = {}; data.short_id = g_short_id; strncpy(data.device_id, g_device_id, sizeof(data.device_id)); bool meter_ok = meter_read(data); read_battery(data); uint32_t now_utc = time_get_utc(); data.ts_utc = now_utc > 0 ? now_utc : millis() / 1000; data.valid = meter_ok; push_meter_sample(data); display_set_last_meter(data); display_set_last_read(meter_ok, data.ts_utc); } if (now_ms - g_last_send_ms >= METER_SEND_INTERVAL_MS) { g_last_send_ms = now_ms; send_meter_batch(last_sample_ts()); } LoraPacket rx = {}; if (lora_receive(rx, 0) && rx.protocol_version == PROTOCOL_VERSION && rx.payload_type == PayloadType::TimeSync) { time_handle_timesync_payload(rx.payload, rx.payload_len); } display_tick(); uint32_t next_sample_due = g_last_sample_ms + METER_SAMPLE_INTERVAL_MS; uint32_t next_send_due = g_last_send_ms + METER_SEND_INTERVAL_MS; uint32_t next_due = next_sample_due < next_send_due ? next_sample_due : next_send_due; if (next_due > now_ms) { light_sleep_ms(next_due - now_ms); } } static void receiver_loop() { if (g_last_timesync_ms == 0) { g_last_timesync_ms = millis() - (TIME_SYNC_INTERVAL_SEC * 1000UL - TIME_SYNC_OFFSET_MS); } LoraPacket pkt = {}; if (lora_receive(pkt, 0) && pkt.protocol_version == PROTOCOL_VERSION) { if (pkt.payload_type == PayloadType::MeterData) { uint8_t decompressed[256]; size_t decompressed_len = 0; if (decompressBuffer(pkt.payload, pkt.payload_len, decompressed, sizeof(decompressed) - 1, decompressed_len)) { if (decompressed_len >= sizeof(decompressed)) { return; } decompressed[decompressed_len] = '\0'; MeterData data = {}; if (jsonToMeterData(String(reinterpret_cast(decompressed)), data)) { for (uint8_t i = 0; i < NUM_SENDERS; ++i) { if (pkt.device_id_short == EXPECTED_SENDER_IDS[i]) { data.short_id = pkt.device_id_short; g_sender_statuses[i].last_data = data; g_sender_statuses[i].last_update_ts_utc = data.ts_utc; g_sender_statuses[i].has_data = true; mqtt_publish_state(data); break; } } } } } else if (pkt.payload_type == PayloadType::MeterBatch) { String json; if (process_batch_packet(pkt, json)) { MeterData samples[METER_BATCH_MAX_SAMPLES]; size_t count = 0; if (jsonToMeterBatch(json, samples, METER_BATCH_MAX_SAMPLES, count)) { for (uint8_t i = 0; i < NUM_SENDERS; ++i) { if (pkt.device_id_short == EXPECTED_SENDER_IDS[i]) { for (size_t s = 0; s < count; ++s) { samples[s].short_id = pkt.device_id_short; mqtt_publish_state(samples[s]); } if (count > 0) { g_sender_statuses[i].last_data = samples[count - 1]; g_sender_statuses[i].last_update_ts_utc = samples[count - 1].ts_utc; g_sender_statuses[i].has_data = true; } break; } } } } } } uint32_t interval_sec = TIME_SYNC_INTERVAL_SEC; if (time_rtc_present() && millis() - g_boot_ms >= TIME_SYNC_FAST_WINDOW_MS) { interval_sec = TIME_SYNC_SLOW_INTERVAL_SEC; } if (!g_ap_mode && millis() - g_last_timesync_ms > interval_sec * 1000UL) { g_last_timesync_ms = millis(); time_send_timesync(g_short_id); } mqtt_loop(); web_server_loop(); display_set_receiver_status(g_ap_mode, wifi_is_connected() ? wifi_get_ssid().c_str() : "AP", mqtt_is_connected()); display_tick(); } void loop() { #ifdef ENABLE_TEST_MODE if (g_role == DeviceRole::Sender) { test_sender_loop(g_short_id, g_device_id); display_tick(); delay(50); } else { test_receiver_loop(g_sender_statuses, NUM_SENDERS, g_short_id); mqtt_loop(); web_server_loop(); display_set_receiver_status(g_ap_mode, wifi_is_connected() ? wifi_get_ssid().c_str() : "AP", mqtt_is_connected()); display_tick(); delay(50); } return; #endif if (g_role == DeviceRole::Sender) { sender_loop(); } else { receiver_loop(); } }