Update batch schema and add ACK handling

This commit is contained in:
2026-01-31 01:53:02 +01:00
parent 8ba7675a1c
commit 8fba67fcf3
9 changed files with 305 additions and 321 deletions

View File

@@ -12,6 +12,7 @@
#include "web_server.h"
#include "display_ui.h"
#include "test_mode.h"
#include <ArduinoJson.h>
#ifdef ARDUINO_ARCH_ESP32
#include <esp_task_wdt.h>
#endif
@@ -56,7 +57,13 @@ static uint8_t g_meter_sample_count = 0;
static uint8_t g_meter_sample_head = 0;
static uint32_t g_last_sample_ms = 0;
static uint32_t g_last_send_ms = 0;
static uint32_t g_last_batch_send_ms = 0;
static uint16_t g_batch_id = 1;
static uint16_t g_last_sent_batch_id = 0;
static uint8_t g_batch_retry_count = 0;
static bool g_batch_ack_pending = false;
static uint16_t g_last_batch_id_rx[NUM_SENDERS] = {};
struct BatchRxState {
bool active;
@@ -159,7 +166,11 @@ static void publish_faults_if_needed(const char *device_id, const FaultCounters
#ifdef ARDUINO_ARCH_ESP32
static void watchdog_init() {
esp_task_wdt_init(WATCHDOG_TIMEOUT_SEC, true);
esp_task_wdt_config_t config = {};
config.timeout_ms = WATCHDOG_TIMEOUT_SEC * 1000;
config.idle_core_mask = 0;
config.trigger_panic = true;
esp_task_wdt_init(&config);
esp_task_wdt_add(nullptr);
}
@@ -180,6 +191,22 @@ static uint16_t read_u16_le(const uint8_t *src) {
return static_cast<uint16_t>(src[0]) | (static_cast<uint16_t>(src[1]) << 8);
}
static bool inject_batch_meta(String &json, int16_t rssi_dbm, float snr_db, uint32_t rx_ts_utc) {
DynamicJsonDocument doc(8192);
DeserializationError err = deserializeJson(doc, json);
if (err) {
return false;
}
JsonObject meta = doc.createNestedObject("meta");
meta["rssi"] = rssi_dbm;
meta["snr"] = snr_db;
meta["rx_ts"] = rx_ts_utc;
json = "";
return serializeJson(doc, json) > 0;
}
static bool send_batch_payload(const uint8_t *data, size_t len, uint32_t ts_for_display) {
if (!data || len == 0 || len > BATCH_MAX_COMPRESSED) {
return false;
@@ -227,6 +254,17 @@ static bool send_batch_payload(const uint8_t *data, size_t len, uint32_t ts_for_
return all_ok;
}
static void send_batch_ack(uint16_t batch_id) {
LoraPacket ack = {};
ack.protocol_version = PROTOCOL_VERSION;
ack.role = DeviceRole::Receiver;
ack.device_id_short = g_short_id;
ack.payload_type = PayloadType::Ack;
ack.payload_len = 2;
write_u16_le(ack.payload, batch_id);
lora_send(ack);
}
static bool send_meter_batch(uint32_t ts_for_display) {
MeterData ordered[METER_BATCH_MAX_SAMPLES];
size_t count = copy_meter_samples(ordered, METER_BATCH_MAX_SAMPLES);
@@ -235,7 +273,7 @@ static bool send_meter_batch(uint32_t ts_for_display) {
}
String json;
if (!meterBatchToJson(ordered, count, json, &g_sender_faults, g_sender_last_error)) {
if (!meterBatchToJson(ordered, count, g_batch_id, json, &g_sender_faults, g_sender_last_error)) {
return false;
}
@@ -246,9 +284,10 @@ static bool send_meter_batch(uint32_t ts_for_display) {
}
bool ok = send_batch_payload(compressed, compressed_len, ts_for_display);
g_last_batch_send_ms = millis();
if (ok) {
g_meter_sample_count = 0;
g_meter_sample_head = 0;
g_last_sent_batch_id = static_cast<uint16_t>(g_batch_id - 1);
g_batch_ack_pending = true;
}
return ok;
}
@@ -263,7 +302,7 @@ static void reset_batch_rx() {
g_batch_rx.last_rx_ms = 0;
}
static bool process_batch_packet(const LoraPacket &pkt, String &out_json, bool &decode_error) {
static bool process_batch_packet(const LoraPacket &pkt, String &out_json, bool &decode_error, uint16_t &out_batch_id) {
decode_error = false;
if (pkt.payload_len < BATCH_HEADER_SIZE) {
return false;
@@ -323,6 +362,7 @@ static bool process_batch_packet(const LoraPacket &pkt, String &out_json, bool &
}
decompressed[decompressed_len] = '\0';
out_json = String(reinterpret_cast<const char *>(decompressed));
out_batch_id = batch_id;
reset_batch_rx();
return true;
}
@@ -407,14 +447,38 @@ static void sender_loop() {
display_set_last_read(meter_ok, data.ts_utc);
}
if (now_ms - g_last_send_ms >= METER_SEND_INTERVAL_MS) {
if (!g_batch_ack_pending && now_ms - g_last_send_ms >= METER_SEND_INTERVAL_MS) {
g_last_send_ms = now_ms;
send_meter_batch(last_sample_ts());
}
LoraPacket rx = {};
if (lora_receive(rx, 0) && rx.protocol_version == PROTOCOL_VERSION && rx.payload_type == PayloadType::TimeSync) {
time_handle_timesync_payload(rx.payload, rx.payload_len);
if (lora_receive(rx, 0) && rx.protocol_version == PROTOCOL_VERSION) {
if (rx.payload_type == PayloadType::TimeSync) {
time_handle_timesync_payload(rx.payload, rx.payload_len);
} else if (rx.payload_type == PayloadType::Ack && rx.payload_len >= 2) {
uint16_t ack_id = read_u16_le(rx.payload);
if (g_batch_ack_pending && ack_id == g_last_sent_batch_id) {
g_batch_ack_pending = false;
g_batch_retry_count = 0;
g_meter_sample_count = 0;
g_meter_sample_head = 0;
}
}
}
if (g_batch_ack_pending && (now_ms - g_last_batch_send_ms >= BATCH_ACK_TIMEOUT_MS)) {
if (g_batch_retry_count < BATCH_MAX_RETRIES) {
g_batch_retry_count++;
send_meter_batch(last_sample_ts());
} else {
g_batch_ack_pending = false;
g_batch_retry_count = 0;
g_meter_sample_count = 0;
g_meter_sample_head = 0;
note_fault(g_sender_faults, g_sender_last_error, g_sender_last_error_utc, g_sender_last_error_ms, FaultType::LoraTx);
display_set_last_error(g_sender_last_error, g_sender_last_error_utc, g_sender_last_error_ms);
}
}
display_tick();
@@ -482,37 +546,52 @@ static void receiver_loop() {
} else if (pkt.payload_type == PayloadType::MeterBatch) {
String json;
bool decode_error = false;
if (process_batch_packet(pkt, json, decode_error)) {
uint16_t batch_id = 0;
if (process_batch_packet(pkt, json, decode_error, batch_id)) {
uint32_t rx_ts_utc = time_get_utc();
if (rx_ts_utc == 0) {
rx_ts_utc = millis() / 1000;
}
inject_batch_meta(json, pkt.rssi_dbm, pkt.snr_db, rx_ts_utc);
MeterData samples[METER_BATCH_MAX_SAMPLES];
size_t count = 0;
if (jsonToMeterBatch(json, samples, METER_BATCH_MAX_SAMPLES, count)) {
for (uint8_t i = 0; i < NUM_SENDERS; ++i) {
if (pkt.device_id_short == EXPECTED_SENDER_IDS[i]) {
for (size_t s = 0; s < count; ++s) {
samples[s].link_valid = true;
samples[s].link_rssi_dbm = pkt.rssi_dbm;
samples[s].link_snr_db = pkt.snr_db;
samples[s].short_id = pkt.device_id_short;
mqtt_publish_state(samples[s]);
}
if (count > 0) {
g_sender_statuses[i].last_data = samples[count - 1];
g_sender_statuses[i].last_update_ts_utc = samples[count - 1].ts_utc;
g_sender_statuses[i].has_data = true;
g_sender_faults_remote[i].meter_read_fail = samples[count - 1].err_meter_read;
g_sender_faults_remote[i].lora_tx_fail = samples[count - 1].err_lora_tx;
g_sender_last_error_remote[i] = samples[count - 1].last_error;
g_sender_last_error_remote_utc[i] = time_get_utc();
g_sender_last_error_remote_ms[i] = millis();
if (ENABLE_HA_DISCOVERY && !g_sender_discovery_sent[i]) {
g_sender_discovery_sent[i] = mqtt_publish_discovery(samples[count - 1].device_id);
}
publish_faults_if_needed(samples[count - 1].device_id, g_sender_faults_remote[i], g_sender_faults_remote_published[i],
g_sender_last_error_remote[i], g_sender_last_error_remote_published[i],
g_sender_last_error_remote_utc[i], g_sender_last_error_remote_ms[i]);
}
break;
int8_t sender_idx = -1;
for (uint8_t i = 0; i < NUM_SENDERS; ++i) {
if (pkt.device_id_short == EXPECTED_SENDER_IDS[i]) {
sender_idx = static_cast<int8_t>(i);
break;
}
}
bool duplicate = sender_idx >= 0 && g_last_batch_id_rx[sender_idx] == batch_id;
if (duplicate) {
send_batch_ack(batch_id);
} else if (jsonToMeterBatch(json, samples, METER_BATCH_MAX_SAMPLES, count)) {
if (sender_idx >= 0) {
for (size_t s = 0; s < count; ++s) {
samples[s].link_valid = true;
samples[s].link_rssi_dbm = pkt.rssi_dbm;
samples[s].link_snr_db = pkt.snr_db;
samples[s].short_id = pkt.device_id_short;
mqtt_publish_state(samples[s]);
}
if (count > 0) {
g_sender_statuses[sender_idx].last_data = samples[count - 1];
g_sender_statuses[sender_idx].last_update_ts_utc = samples[count - 1].ts_utc;
g_sender_statuses[sender_idx].has_data = true;
g_sender_faults_remote[sender_idx].meter_read_fail = samples[count - 1].err_meter_read;
g_sender_faults_remote[sender_idx].lora_tx_fail = samples[count - 1].err_lora_tx;
g_sender_last_error_remote[sender_idx] = samples[count - 1].last_error;
g_sender_last_error_remote_utc[sender_idx] = time_get_utc();
g_sender_last_error_remote_ms[sender_idx] = millis();
if (ENABLE_HA_DISCOVERY && !g_sender_discovery_sent[sender_idx]) {
g_sender_discovery_sent[sender_idx] = mqtt_publish_discovery(samples[count - 1].device_id);
}
publish_faults_if_needed(samples[count - 1].device_id, g_sender_faults_remote[sender_idx], g_sender_faults_remote_published[sender_idx],
g_sender_last_error_remote[sender_idx], g_sender_last_error_remote_published[sender_idx],
g_sender_last_error_remote_utc[sender_idx], g_sender_last_error_remote_ms[sender_idx]);
}
g_last_batch_id_rx[sender_idx] = batch_id;
send_batch_ack(batch_id);
}
} else {
note_fault(g_receiver_faults, g_receiver_last_error, g_receiver_last_error_utc, g_receiver_last_error_ms, FaultType::Decode);