Fix sender stale sample reuse and add append helper

This commit is contained in:
2026-02-05 01:01:03 +01:00
parent 595a31f278
commit 3a1de36b75

View File

@@ -55,6 +55,10 @@ struct BatchBuffer {
uint16_t batch_id; uint16_t batch_id;
bool batch_id_valid; bool batch_id_valid;
uint8_t count; uint8_t count;
uint16_t attempt_count;
uint16_t valid_count;
uint16_t invalid_count;
FaultType last_error;
MeterData samples[METER_BATCH_MAX_SAMPLES]; MeterData samples[METER_BATCH_MAX_SAMPLES];
}; };
@@ -93,9 +97,14 @@ static uint32_t g_sender_rx_reject_log_ms = 0;
static MeterData g_last_meter_data = {}; static MeterData g_last_meter_data = {};
static bool g_last_meter_valid = false; static bool g_last_meter_valid = false;
static uint32_t g_last_meter_rx_ms = 0; static uint32_t g_last_meter_rx_ms = 0;
static uint32_t g_last_meter_seq = 0;
static uint32_t g_last_meter_seq_used = 0;
static uint32_t g_meter_stale_seconds = 0; static uint32_t g_meter_stale_seconds = 0;
static bool g_time_acquired = false; static bool g_time_acquired = false;
static uint32_t g_last_sync_request_ms = 0; static uint32_t g_last_sync_request_ms = 0;
static uint32_t g_build_attempts = 0;
static uint32_t g_build_valid = 0;
static uint32_t g_build_invalid = 0;
static void watchdog_kick(); static void watchdog_kick();
@@ -207,6 +216,10 @@ static void batch_queue_enqueue(const MeterData *samples, uint8_t count) {
slot.batch_id = 0; slot.batch_id = 0;
slot.batch_id_valid = false; slot.batch_id_valid = false;
slot.count = count; slot.count = count;
slot.attempt_count = static_cast<uint16_t>(g_build_attempts);
slot.valid_count = static_cast<uint16_t>(g_build_valid);
slot.invalid_count = static_cast<uint16_t>(g_build_invalid);
slot.last_error = g_sender_last_error;
for (uint8_t i = 0; i < count; ++i) { for (uint8_t i = 0; i < count; ++i) {
slot.samples[i] = samples[i]; slot.samples[i] = samples[i];
} }
@@ -214,6 +227,28 @@ static void batch_queue_enqueue(const MeterData *samples, uint8_t count) {
g_batch_count++; g_batch_count++;
} }
static void reset_build_counters() {
g_build_attempts = 0;
g_build_valid = 0;
g_build_invalid = 0;
}
static bool append_meter_sample(const MeterData &data, bool meter_ok) {
if (!meter_ok) {
g_build_invalid++;
return false;
}
g_last_sample_ts_utc = data.ts_utc;
g_build_samples[g_build_count++] = data;
g_build_valid++;
if (g_build_count >= METER_BATCH_MAX_SAMPLES) {
batch_queue_enqueue(g_build_samples, g_build_count);
g_build_count = 0;
reset_build_counters();
}
return true;
}
static uint32_t last_sample_ts() { static uint32_t last_sample_ts() {
if (g_last_sample_ts_utc == 0) { if (g_last_sample_ts_utc == 0) {
uint32_t now_utc = time_get_utc(); uint32_t now_utc = time_get_utc();
@@ -482,6 +517,15 @@ static bool prepare_inflight_from_queue() {
batch->batch_id = g_batch_id; batch->batch_id = g_batch_id;
batch->batch_id_valid = true; batch->batch_id_valid = true;
} }
if (SERIAL_DEBUG_MODE) {
serial_debug_printf("batch: id=%u desired=%u attempts=%u valid=%u invalid=%u err_last=%u",
batch->batch_id,
static_cast<unsigned>(METER_BATCH_MAX_SAMPLES),
static_cast<unsigned>(batch->attempt_count),
static_cast<unsigned>(batch->valid_count),
static_cast<unsigned>(batch->invalid_count),
static_cast<unsigned>(batch->last_error));
}
g_inflight_count = batch->count; g_inflight_count = batch->count;
g_inflight_batch_id = batch->batch_id; g_inflight_batch_id = batch->batch_id;
for (uint8_t i = 0; i < g_inflight_count; ++i) { for (uint8_t i = 0; i < g_inflight_count; ++i) {
@@ -587,6 +631,15 @@ static bool send_sync_request() {
g_inflight_sync_request = true; g_inflight_sync_request = true;
g_inflight_count = 0; g_inflight_count = 0;
g_inflight_batch_id = g_batch_id; g_inflight_batch_id = g_batch_id;
if (SERIAL_DEBUG_MODE && g_build_attempts > 0) {
serial_debug_printf("batch: id=%u desired=%u attempts=%u valid=%u invalid=%u err_last=%u sync=1",
g_inflight_batch_id,
static_cast<unsigned>(METER_BATCH_MAX_SAMPLES),
static_cast<unsigned>(g_build_attempts),
static_cast<unsigned>(g_build_valid),
static_cast<unsigned>(g_build_invalid),
static_cast<unsigned>(g_sender_last_error));
}
bool ok = send_inflight_batch(time_get_utc()); bool ok = send_inflight_batch(time_get_utc());
if (ok) { if (ok) {
g_last_sent_batch_id = g_inflight_batch_id; g_last_sent_batch_id = g_inflight_batch_id;
@@ -791,6 +844,9 @@ static void sender_loop() {
g_last_meter_valid = true; g_last_meter_valid = true;
g_last_meter_rx_ms = now_ms; g_last_meter_rx_ms = now_ms;
g_meter_stale_seconds = 0; g_meter_stale_seconds = 0;
g_last_meter_seq++;
} else {
g_last_meter_valid = false;
} }
} }
@@ -799,8 +855,15 @@ static void sender_loop() {
MeterData data = {}; MeterData data = {};
data.short_id = g_short_id; data.short_id = g_short_id;
strncpy(data.device_id, g_device_id, sizeof(data.device_id)); strncpy(data.device_id, g_device_id, sizeof(data.device_id));
data.energy_total_kwh = NAN;
data.total_power_w = NAN;
data.phase_power_w[0] = NAN;
data.phase_power_w[1] = NAN;
data.phase_power_w[2] = NAN;
bool meter_ok = g_last_meter_valid; g_build_attempts++;
// Avoid reusing stale meter frames: only accept new parsed data once per sample.
bool meter_ok = g_last_meter_valid && (g_last_meter_seq != g_last_meter_seq_used);
if (meter_ok) { if (meter_ok) {
data.energy_total_kwh = g_last_meter_data.energy_total_kwh; data.energy_total_kwh = g_last_meter_data.energy_total_kwh;
data.total_power_w = g_last_meter_data.total_power_w; data.total_power_w = g_last_meter_data.total_power_w;
@@ -809,6 +872,7 @@ static void sender_loop() {
data.phase_power_w[2] = g_last_meter_data.phase_power_w[2]; data.phase_power_w[2] = g_last_meter_data.phase_power_w[2];
uint32_t age_ms = now_ms - g_last_meter_rx_ms; uint32_t age_ms = now_ms - g_last_meter_rx_ms;
g_meter_stale_seconds = age_ms >= 1000 ? (age_ms / 1000) : 0; g_meter_stale_seconds = age_ms >= 1000 ? (age_ms / 1000) : 0;
g_last_meter_seq_used = g_last_meter_seq;
} else { } else {
g_meter_stale_seconds++; g_meter_stale_seconds++;
} }
@@ -825,11 +889,17 @@ static void sender_loop() {
data.ts_utc = time_get_utc(); data.ts_utc = time_get_utc();
data.valid = meter_ok; data.valid = meter_ok;
g_last_sample_ts_utc = data.ts_utc; bool appended = append_meter_sample(data, meter_ok);
g_build_samples[g_build_count++] = data; if (SERIAL_DEBUG_MODE) {
if (g_build_count >= METER_BATCH_MAX_SAMPLES) { serial_debug_printf("sample: i=%lu ok=%u appended=%u e_kwh=%.3f p1=%.1f p2=%.1f p3=%.1f ms=%lu",
batch_queue_enqueue(g_build_samples, g_build_count); static_cast<unsigned long>(g_build_attempts),
g_build_count = 0; meter_ok ? 1U : 0U,
appended ? 1U : 0U,
static_cast<double>(data.energy_total_kwh),
static_cast<double>(data.phase_power_w[0]),
static_cast<double>(data.phase_power_w[1]),
static_cast<double>(data.phase_power_w[2]),
static_cast<unsigned long>(now_ms));
} }
display_set_last_meter(data); display_set_last_meter(data);
display_set_last_read(meter_ok, data.ts_utc); display_set_last_read(meter_ok, data.ts_utc);
@@ -837,7 +907,18 @@ static void sender_loop() {
if (!g_batch_ack_pending && now_ms - g_last_send_ms >= METER_SEND_INTERVAL_MS) { if (!g_batch_ack_pending && now_ms - g_last_send_ms >= METER_SEND_INTERVAL_MS) {
g_last_send_ms = now_ms; g_last_send_ms = now_ms;
if (g_build_count > 0) {
batch_queue_enqueue(g_build_samples, g_build_count);
g_build_count = 0;
reset_build_counters();
}
if (g_batch_count > 0) {
send_meter_batch(last_sample_ts()); send_meter_batch(last_sample_ts());
} else if (g_build_attempts > 0) {
if (send_sync_request()) {
reset_build_counters();
}
}
} }
} else { } else {
if (!g_batch_ack_pending && now_ms - g_last_sync_request_ms >= SYNC_REQUEST_INTERVAL_MS) { if (!g_batch_ack_pending && now_ms - g_last_sync_request_ms >= SYNC_REQUEST_INTERVAL_MS) {