Files
DD3-LoRa-Bridge-MultiSender/src/sender_state_machine.cpp
T
acidburns def09160d0 refactor lora payload timing
Bump the batch payload codec to schema v4 with separate meter-time and UTC anchors, then use meter seconds for sparse batch slotting and receiver reconstruction.

Update the current 868 MHz bench configuration, allow ACKs from configured receiver short IDs, improve AP-to-STA recovery, quiet the test build, and document the changed protocol in the README.
2026-06-30 12:19:27 +02:00

1701 lines
59 KiB
C++

#include "sender_state_machine.h"
#include <Arduino.h>
#include "config.h"
#include "data_model.h"
#include "payload_codec.h"
#include "lora_transport.h"
#include "meter_driver.h"
#include "power_manager.h"
#include "time_manager.h"
#include "wifi_manager.h"
#include "display_ui.h"
#include <stdarg.h>
#include <math.h>
#ifdef ARDUINO_ARCH_ESP32
#include <esp_task_wdt.h>
#include <freertos/FreeRTOS.h>
#include <freertos/queue.h>
#include <freertos/task.h>
#endif
namespace {
#if defined(DD3_DEBUG)
static constexpr bool DD3_DEBUG_ENABLED = true;
#else
static constexpr bool DD3_DEBUG_ENABLED = false;
#endif
enum class SenderPhase : uint8_t {
Syncing = 0,
Normal = 1,
Catchup = 2,
WaitAck = 3
};
static SenderPhase g_sender_phase = SenderPhase::Syncing;
static DeviceRole g_role = DeviceRole::Sender;
static uint16_t g_short_id = 0;
static char g_device_id[16] = "";
static SenderStatus g_sender_statuses[NUM_SENDERS];
static bool g_ap_mode = false;
static WifiMqttConfig g_cfg;
static FaultCounters g_sender_faults = {};
static FaultCounters g_receiver_faults = {};
static FaultCounters g_receiver_faults_published = {};
static FaultCounters g_sender_faults_remote[NUM_SENDERS] = {};
static FaultCounters g_sender_faults_remote_published[NUM_SENDERS] = {};
static FaultType g_sender_last_error = FaultType::None;
static FaultType g_receiver_last_error = FaultType::None;
static FaultType g_sender_last_error_remote[NUM_SENDERS] = {};
static FaultType g_sender_last_error_remote_published[NUM_SENDERS] = {};
static FaultType g_receiver_last_error_published = FaultType::None;
static uint32_t g_sender_last_error_utc = 0;
static uint32_t g_sender_last_error_ms = 0;
static uint32_t g_receiver_last_error_utc = 0;
static uint32_t g_receiver_last_error_ms = 0;
static uint32_t g_sender_last_error_remote_utc[NUM_SENDERS] = {};
static uint32_t g_sender_last_error_remote_ms[NUM_SENDERS] = {};
static bool g_sender_discovery_sent[NUM_SENDERS] = {};
static bool g_receiver_discovery_sent = false;
static constexpr size_t BATCH_HEADER_SIZE = 6;
static constexpr size_t BATCH_CHUNK_PAYLOAD = LORA_MAX_PAYLOAD - BATCH_HEADER_SIZE;
static constexpr size_t BATCH_MAX_COMPRESSED = 4096;
static constexpr uint32_t BATCH_RX_MARGIN_MS = 800;
struct BatchBuffer {
uint16_t batch_id;
bool batch_id_valid;
uint8_t count;
uint16_t attempt_count;
uint16_t valid_count;
uint16_t invalid_count;
FaultType last_error;
MeterData samples[METER_BATCH_MAX_SAMPLES];
};
static BatchBuffer g_batch_queue[BATCH_QUEUE_DEPTH];
static uint8_t g_batch_head = 0;
static uint8_t g_batch_tail = 0;
static uint8_t g_batch_count = 0;
static MeterData g_build_samples[METER_BATCH_MAX_SAMPLES];
static uint8_t g_build_count = 0;
static uint32_t g_last_sample_ms = 0;
static uint32_t g_last_sample_ts_utc = 0;
static uint32_t g_last_send_ms = 0;
static uint32_t g_last_batch_send_ms = 0;
static float g_last_battery_voltage_v = NAN;
static uint8_t g_last_battery_percent = 0;
static uint32_t g_last_battery_ms = 0;
static uint16_t g_batch_id = 1;
static uint16_t g_last_sent_batch_id = 0;
static uint16_t g_last_acked_batch_id = 0;
static uint8_t g_batch_retry_count = 0;
static bool g_batch_ack_pending = false;
static uint32_t g_batch_ack_timeout_ms = BATCH_ACK_TIMEOUT_MS;
static MeterData g_inflight_samples[METER_BATCH_MAX_SAMPLES];
static uint8_t g_inflight_count = 0;
static uint16_t g_inflight_batch_id = 0;
static bool g_inflight_active = false;
static bool g_inflight_sync_request = false;
static uint8_t g_inflight_encoded_payload[BATCH_MAX_COMPRESSED];
static size_t g_inflight_encoded_payload_len = 0;
static uint16_t g_inflight_encoded_batch_id = 0;
static bool g_inflight_encoded_sync_request = false;
static bool g_inflight_encoded_valid = false;
static uint32_t g_last_debug_log_ms = 0;
static uint32_t g_sender_rx_window_ms = 0;
static uint32_t g_sender_sleep_ms = 0;
static uint32_t g_sender_power_log_ms = 0;
static uint32_t g_meter_queue_high_water = 0;
static uint32_t g_meter_queue_drop_count = 0;
static uint32_t g_sender_ack_timeout_total = 0;
static uint32_t g_sender_ack_retry_total = 0;
static uint32_t g_sender_ack_rtt_last_ms = 0;
static uint32_t g_sender_ack_rtt_ewma_ms = 0;
static uint32_t g_sender_ack_miss_streak = 0;
static uint32_t g_last_ack_window_log_ms = 0;
static RxRejectReason g_sender_rx_reject_reason = RxRejectReason::None;
static uint32_t g_sender_rx_reject_log_ms = 0;
static RxRejectReason g_receiver_rx_reject_reason = RxRejectReason::None;
static uint32_t g_receiver_rx_reject_log_ms = 0;
static MeterData g_last_meter_data = {};
// Rate-limit: track ACK accept timestamps to detect replay floods.
static uint32_t g_ack_accept_last_ms = 0;
static constexpr uint32_t ACK_MIN_INTERVAL_MS = 500;
static bool g_last_meter_valid = false;
static uint32_t g_last_meter_rx_ms = 0;
static uint32_t g_meter_stale_seconds = 0;
static bool g_meter_time_anchor_valid = false;
static int64_t g_meter_epoch_offset = 0;
static bool g_meter_time_prev_valid = false;
static uint32_t g_meter_time_prev_seconds = 0;
static uint32_t g_meter_time_prev_rx_ms = 0;
static bool g_meter_time_jump_pending = false;
static bool g_time_acquired = false;
static bool g_sender_faults_reset_after_first_sync = false;
static uint32_t g_sender_faults_reset_hour_utc = UINT32_MAX;
static uint32_t g_last_sync_request_ms = 0;
static uint32_t g_build_attempts = 0;
static uint32_t g_build_valid = 0;
static uint32_t g_build_invalid = 0;
static constexpr uint32_t METER_SAMPLE_MAX_AGE_MS = 15000;
static constexpr uint32_t METER_TIME_DELTA_TOLERANCE_S = 2;
static constexpr int64_t METER_TIME_ANCHOR_DRIFT_TOLERANCE_S = 2;
#ifdef ENABLE_TEST_MODE
static uint32_t g_test_meter_last_emit_ms = 0;
static uint32_t g_test_meter_tick = 0;
#endif
struct MeterSampleEvent {
MeterData data;
uint32_t rx_ms;
};
#ifdef ARDUINO_ARCH_ESP32
static QueueHandle_t g_meter_sample_queue = nullptr;
static TaskHandle_t g_meter_reader_task = nullptr;
static bool g_meter_reader_task_running = false;
static constexpr UBaseType_t METER_SAMPLE_QUEUE_LEN = 32;
static constexpr uint32_t METER_READER_TASK_STACK_WORDS = 4096;
static constexpr UBaseType_t METER_READER_TASK_PRIORITY = 2;
static constexpr BaseType_t METER_READER_TASK_CORE = 0;
#endif
enum class TxBuildError : uint8_t {
None = 0,
Encode = 1
};
static TxBuildError g_last_tx_build_error = TxBuildError::None;
static void watchdog_kick();
static void finish_inflight_batch();
static void invalidate_inflight_encode_cache();
static void serial_debug_printf(const char *fmt, ...) {
if (!SERIAL_DEBUG_MODE) {
return;
}
char buf[256];
va_list args;
va_start(args, fmt);
vsnprintf(buf, sizeof(buf), fmt, args);
va_end(args);
Serial.println(buf);
}
#ifdef ARDUINO_ARCH_ESP32
static void update_meter_queue_high_water() {
if (!g_meter_sample_queue) {
return;
}
uint32_t depth = static_cast<uint32_t>(uxQueueMessagesWaiting(g_meter_sample_queue));
if (depth > g_meter_queue_high_water) {
g_meter_queue_high_water = depth;
}
}
#endif
static void sender_log_diagnostics(uint32_t now_ms) {
if (!SERIAL_DEBUG_MODE) {
return;
}
if (now_ms - g_last_debug_log_ms < SENDER_DIAG_LOG_INTERVAL_MS) {
return;
}
g_last_debug_log_ms = now_ms;
MeterDriverStats meter_stats = {};
meter_get_stats(meter_stats);
uint32_t queue_depth = 0;
#ifdef ARDUINO_ARCH_ESP32
if (g_meter_sample_queue) {
queue_depth = static_cast<uint32_t>(uxQueueMessagesWaiting(g_meter_sample_queue));
}
#endif
uint32_t meter_age_ms = 0;
if (meter_stats.last_good_frame_ms > 0 && now_ms >= meter_stats.last_good_frame_ms) {
meter_age_ms = now_ms - meter_stats.last_good_frame_ms;
} else if (meter_stats.last_good_frame_ms == 0) {
meter_age_ms = UINT32_MAX;
}
serial_debug_printf(
"diag: q_depth=%lu q_hi=%lu q_drop=%lu batch_q=%u build=%u ack_pending=%u ack_retry_cur=%u ack_retry_total=%lu ack_timeout_total=%lu ack_rtt_last_ms=%lu ack_rtt_ewma_ms=%lu ack_miss_streak=%lu meter_ok=%lu meter_fail=%lu meter_ovf=%lu meter_timeout=%lu meter_age_ms=%lu rx_win_ms=%lu sleep_ms=%lu",
static_cast<unsigned long>(queue_depth),
static_cast<unsigned long>(g_meter_queue_high_water),
static_cast<unsigned long>(g_meter_queue_drop_count),
static_cast<unsigned>(g_batch_count),
static_cast<unsigned>(g_build_count),
g_batch_ack_pending ? 1U : 0U,
static_cast<unsigned>(g_batch_retry_count),
static_cast<unsigned long>(g_sender_ack_retry_total),
static_cast<unsigned long>(g_sender_ack_timeout_total),
static_cast<unsigned long>(g_sender_ack_rtt_last_ms),
static_cast<unsigned long>(g_sender_ack_rtt_ewma_ms),
static_cast<unsigned long>(g_sender_ack_miss_streak),
static_cast<unsigned long>(meter_stats.frames_ok),
static_cast<unsigned long>(meter_stats.frames_parse_fail),
static_cast<unsigned long>(meter_stats.rx_overflow),
static_cast<unsigned long>(meter_stats.rx_timeout),
static_cast<unsigned long>(meter_age_ms),
static_cast<unsigned long>(g_sender_rx_window_ms),
static_cast<unsigned long>(g_sender_sleep_ms));
#ifdef DEBUG_METER_DIAG
serial_debug_printf(
"meter_diag: err_m=%lu err_d=%lu err_tx=%lu build_att=%lu build_ok=%lu build_fail=%lu stale_s=%lu",
static_cast<unsigned long>(g_sender_faults.meter_read_fail),
static_cast<unsigned long>(g_sender_faults.decode_fail),
static_cast<unsigned long>(g_sender_faults.lora_tx_fail),
static_cast<unsigned long>(g_build_attempts),
static_cast<unsigned long>(g_build_valid),
static_cast<unsigned long>(g_build_invalid),
static_cast<unsigned long>(g_meter_stale_seconds));
#endif
}
static void invalidate_inflight_encode_cache() {
g_inflight_encoded_payload_len = 0;
g_inflight_encoded_batch_id = 0;
g_inflight_encoded_sync_request = false;
g_inflight_encoded_valid = false;
}
static uint8_t bit_count32(uint32_t value) {
uint8_t count = 0;
while (value != 0) {
value &= (value - 1);
count++;
}
return count;
}
static uint32_t abs_diff_u32(uint32_t a, uint32_t b) {
return a >= b ? (a - b) : (b - a);
}
static void meter_time_update_snapshot(MeterData &parsed, uint32_t rx_ms) {
if (!parsed.meter_seconds_valid) {
return;
}
bool jump = false;
const char *jump_reason = nullptr;
uint32_t delta_meter_s = 0;
uint32_t delta_wall_s = 0;
if (g_meter_time_prev_valid) {
if (parsed.meter_seconds < g_meter_time_prev_seconds) {
jump = true;
jump_reason = "rollback";
} else {
delta_meter_s = parsed.meter_seconds - g_meter_time_prev_seconds;
uint32_t delta_wall_ms = rx_ms - g_meter_time_prev_rx_ms;
delta_wall_s = (delta_wall_ms + 500) / 1000;
if (abs_diff_u32(delta_meter_s, delta_wall_s) > METER_TIME_DELTA_TOLERANCE_S) {
jump = true;
jump_reason = "delta";
}
}
}
if (time_is_synced()) {
uint32_t epoch_now = time_get_utc();
if (epoch_now >= MIN_ACCEPTED_EPOCH_UTC) {
int64_t new_offset = static_cast<int64_t>(epoch_now) - static_cast<int64_t>(parsed.meter_seconds);
if (!g_meter_time_anchor_valid || jump) {
g_meter_epoch_offset = new_offset;
g_meter_time_anchor_valid = true;
} else {
int64_t drift_s = new_offset - g_meter_epoch_offset;
if (drift_s > METER_TIME_ANCHOR_DRIFT_TOLERANCE_S || drift_s < -METER_TIME_ANCHOR_DRIFT_TOLERANCE_S) {
jump = true;
jump_reason = jump_reason ? jump_reason : "anchor";
g_meter_epoch_offset = new_offset;
}
}
}
}
if (g_meter_time_anchor_valid) {
int64_t epoch64 = static_cast<int64_t>(parsed.meter_seconds) + g_meter_epoch_offset;
if (epoch64 > 0 && epoch64 <= static_cast<int64_t>(UINT32_MAX)) {
parsed.ts_utc = static_cast<uint32_t>(epoch64);
}
}
if (jump) {
g_meter_time_jump_pending = true;
if (SERIAL_DEBUG_MODE) {
serial_debug_printf("meter_time: jump reason=%s sec=%lu prev=%lu d_meter=%lu d_wall=%lu",
jump_reason ? jump_reason : "unknown",
static_cast<unsigned long>(parsed.meter_seconds),
static_cast<unsigned long>(g_meter_time_prev_seconds),
static_cast<unsigned long>(delta_meter_s),
static_cast<unsigned long>(delta_wall_s));
}
}
g_meter_time_prev_seconds = parsed.meter_seconds;
g_meter_time_prev_rx_ms = rx_ms;
g_meter_time_prev_valid = true;
}
static void set_last_meter_sample(const MeterData &parsed_in, uint32_t rx_ms) {
MeterData parsed = parsed_in;
meter_time_update_snapshot(parsed, rx_ms);
g_last_meter_data = parsed;
g_last_meter_valid = true;
g_last_meter_rx_ms = rx_ms;
g_meter_stale_seconds = 0;
}
static bool parse_meter_frame_sample(const char *frame, size_t frame_len, MeterData &parsed) {
parsed = {};
parsed.energy_total_kwh = NAN;
parsed.total_power_w = NAN;
parsed.phase_power_w[0] = NAN;
parsed.phase_power_w[1] = NAN;
parsed.phase_power_w[2] = NAN;
parsed.valid = false;
return meter_parse_frame(frame, frame_len, parsed);
}
#ifdef ENABLE_TEST_MODE
static bool generate_test_meter_sample(uint32_t now_ms, MeterData &parsed) {
if (g_test_meter_last_emit_ms != 0 && now_ms - g_test_meter_last_emit_ms < METER_SAMPLE_INTERVAL_MS) {
return false;
}
g_test_meter_last_emit_ms = now_ms;
g_test_meter_tick++;
parsed = {};
parsed.valid = true;
parsed.meter_seconds_valid = true;
parsed.meter_seconds = MIN_ACCEPTED_EPOCH_UTC + g_test_meter_tick;
parsed.energy_total_kwh = static_cast<float>(g_test_meter_tick) / 1000.0f; // 1 Wh step per sample.
parsed.phase_power_w[0] = static_cast<float>(g_test_meter_tick);
parsed.phase_power_w[1] = static_cast<float>(g_test_meter_tick);
parsed.phase_power_w[2] = static_cast<float>(g_test_meter_tick);
parsed.total_power_w = parsed.phase_power_w[0] + parsed.phase_power_w[1] + parsed.phase_power_w[2];
return true;
}
#endif
#ifdef ARDUINO_ARCH_ESP32
static void meter_queue_push_latest(const MeterSampleEvent &event) {
if (!g_meter_sample_queue) {
return;
}
if (xQueueSend(g_meter_sample_queue, &event, 0) == pdTRUE) {
update_meter_queue_high_water();
return;
}
g_meter_queue_drop_count++;
MeterSampleEvent dropped = {};
xQueueReceive(g_meter_sample_queue, &dropped, 0);
if (xQueueSend(g_meter_sample_queue, &event, 0) == pdTRUE) {
update_meter_queue_high_water();
return;
}
if (SERIAL_DEBUG_MODE) {
serial_debug_printf("meter: queue push failed");
}
}
static void meter_reader_task_entry(void *arg) {
(void)arg;
uint32_t consecutive_fails = 0;
for (;;) {
#ifdef ENABLE_TEST_MODE
MeterData test_sample = {};
uint32_t now_ms = millis();
if (!generate_test_meter_sample(now_ms, test_sample)) {
vTaskDelay(pdMS_TO_TICKS(5));
continue;
}
MeterSampleEvent event = {};
event.data = test_sample;
event.rx_ms = now_ms;
meter_queue_push_latest(event);
consecutive_fails = 0;
continue;
#endif
const char *frame = nullptr;
size_t frame_len = 0;
if (!meter_poll_frame(frame, frame_len)) {
// Exponential backoff: 5→10→20→…→METER_FAIL_BACKOFF_MAX_MS on consecutive
// poll misses. Reduces CPU wake-ups when the meter is unresponsive.
uint32_t backoff_ms = METER_FAIL_BACKOFF_BASE_MS;
if (consecutive_fails < 16) {
backoff_ms = METER_FAIL_BACKOFF_BASE_MS << consecutive_fails;
}
if (backoff_ms < 5) {
backoff_ms = 5;
}
if (backoff_ms > METER_FAIL_BACKOFF_MAX_MS) {
backoff_ms = METER_FAIL_BACKOFF_MAX_MS;
}
vTaskDelay(pdMS_TO_TICKS(backoff_ms));
if (consecutive_fails < UINT32_MAX) {
consecutive_fails++;
}
continue;
}
consecutive_fails = 0;
MeterData parsed = {};
if (parse_meter_frame_sample(frame, frame_len, parsed)) {
MeterSampleEvent event = {};
event.data = parsed;
event.rx_ms = millis();
meter_queue_push_latest(event);
}
}
}
static bool meter_reader_start() {
if (g_meter_reader_task_running) {
return true;
}
if (!g_meter_sample_queue) {
g_meter_sample_queue = xQueueCreate(METER_SAMPLE_QUEUE_LEN, sizeof(MeterSampleEvent));
if (!g_meter_sample_queue) {
if (SERIAL_DEBUG_MODE) {
serial_debug_printf("meter: queue alloc failed");
}
return false;
}
}
BaseType_t rc = xTaskCreatePinnedToCore(
meter_reader_task_entry,
"meter_reader",
METER_READER_TASK_STACK_WORDS,
nullptr,
METER_READER_TASK_PRIORITY,
&g_meter_reader_task,
METER_READER_TASK_CORE);
if (rc != pdPASS) {
if (SERIAL_DEBUG_MODE) {
serial_debug_printf("meter: task start failed rc=%ld", static_cast<long>(rc));
}
return false;
}
g_meter_reader_task_running = true;
serial_debug_printf("meter: reader task core=%ld queue=%u",
static_cast<long>(METER_READER_TASK_CORE),
static_cast<unsigned>(METER_SAMPLE_QUEUE_LEN));
return true;
}
#endif
static void meter_reader_pump(uint32_t now_ms) {
#ifdef ARDUINO_ARCH_ESP32
if (g_meter_reader_task_running && g_meter_sample_queue) {
MeterSampleEvent event = {};
while (xQueueReceive(g_meter_sample_queue, &event, 0) == pdTRUE) {
set_last_meter_sample(event.data, event.rx_ms);
}
return;
}
#endif
#ifdef ENABLE_TEST_MODE
MeterData test_sample = {};
if (generate_test_meter_sample(now_ms, test_sample)) {
set_last_meter_sample(test_sample, now_ms);
}
return;
#endif
const char *frame = nullptr;
size_t frame_len = 0;
if (!meter_poll_frame(frame, frame_len)) {
return;
}
MeterData parsed = {};
if (parse_meter_frame_sample(frame, frame_len, parsed)) {
set_last_meter_sample(parsed, now_ms);
}
}
static void update_battery_cache() {
MeterData tmp = {};
read_battery(tmp);
g_last_battery_voltage_v = tmp.battery_voltage_v;
g_last_battery_percent = tmp.battery_percent;
g_last_battery_ms = millis();
}
static bool battery_sample_due(uint32_t now_ms) {
return g_last_battery_ms == 0 || now_ms - g_last_battery_ms >= BATTERY_SAMPLE_INTERVAL_MS;
}
static bool batch_queue_drop_oldest() {
if (g_batch_count == 0) {
return false;
}
bool dropped_inflight = g_inflight_active && g_batch_queue[g_batch_tail].batch_id_valid &&
g_inflight_batch_id == g_batch_queue[g_batch_tail].batch_id;
if (dropped_inflight) {
g_batch_ack_pending = false;
g_batch_retry_count = 0;
g_inflight_active = false;
g_inflight_count = 0;
g_inflight_batch_id = 0;
g_inflight_sync_request = false;
invalidate_inflight_encode_cache();
}
g_batch_tail = (g_batch_tail + 1) % BATCH_QUEUE_DEPTH;
g_batch_count--;
return dropped_inflight;
}
static void sender_note_rx_reject(RxRejectReason reason, const char *context) {
if (reason == RxRejectReason::None) {
return;
}
g_sender_rx_reject_reason = reason;
uint32_t now_ms = millis();
if (SERIAL_DEBUG_MODE && now_ms - g_sender_rx_reject_log_ms >= 1000) {
g_sender_rx_reject_log_ms = now_ms;
serial_debug_printf("rx_reject: %s reason=%s", context, rx_reject_reason_text(reason));
}
}
static void receiver_note_rx_reject(RxRejectReason reason, const char *context) {
if (reason == RxRejectReason::None) {
return;
}
g_receiver_rx_reject_reason = reason;
uint32_t now_ms = millis();
if (SERIAL_DEBUG_MODE && now_ms - g_receiver_rx_reject_log_ms >= 1000) {
g_receiver_rx_reject_log_ms = now_ms;
serial_debug_printf("rx_reject: %s reason=%s", context, rx_reject_reason_text(reason));
}
}
static BatchBuffer *batch_queue_peek() {
if (g_batch_count == 0) {
return nullptr;
}
return &g_batch_queue[g_batch_tail];
}
static void batch_queue_enqueue(const MeterData *samples, uint8_t count) {
if (!samples || count == 0) {
return;
}
if (g_batch_count >= BATCH_QUEUE_DEPTH) {
if (batch_queue_drop_oldest()) {
g_batch_id++;
}
}
BatchBuffer &slot = g_batch_queue[g_batch_head];
slot.batch_id = 0;
slot.batch_id_valid = false;
slot.count = count;
slot.attempt_count = static_cast<uint16_t>(g_build_attempts);
slot.valid_count = static_cast<uint16_t>(g_build_valid);
slot.invalid_count = static_cast<uint16_t>(g_build_invalid);
slot.last_error = g_sender_last_error;
for (uint8_t i = 0; i < count; ++i) {
slot.samples[i] = samples[i];
}
g_batch_head = (g_batch_head + 1) % BATCH_QUEUE_DEPTH;
g_batch_count++;
}
static void reset_build_counters() {
g_build_attempts = 0;
g_build_valid = 0;
g_build_invalid = 0;
}
static bool append_meter_sample(const MeterData &data, bool meter_ok, bool has_snapshot) {
if (!has_snapshot) {
g_build_invalid++;
return false;
}
g_last_sample_ts_utc = data.ts_utc;
g_build_samples[g_build_count++] = data;
if (meter_ok) {
g_build_valid++;
} else {
g_build_invalid++;
}
if (g_build_count >= METER_BATCH_MAX_SAMPLES) {
batch_queue_enqueue(g_build_samples, g_build_count);
g_build_count = 0;
reset_build_counters();
}
return true;
}
static uint32_t last_sample_ts() {
if (g_last_sample_ts_utc == 0) {
uint32_t now_utc = time_get_utc();
return now_utc > 0 ? now_utc : millis() / 1000;
}
return g_last_sample_ts_utc;
}
static void note_fault(FaultCounters &counters, FaultType &last_type, uint32_t &last_ts_utc, uint32_t &last_ts_ms, FaultType type) {
if (type == FaultType::MeterRead) {
counters.meter_read_fail++;
} else if (type == FaultType::Decode) {
counters.decode_fail++;
} else if (type == FaultType::LoraTx) {
counters.lora_tx_fail++;
}
last_type = type;
last_ts_utc = time_get_utc();
last_ts_ms = millis();
}
static void clear_faults(FaultCounters &counters, FaultType &last_type, uint32_t &last_ts_utc, uint32_t &last_ts_ms) {
counters = {};
last_type = FaultType::None;
last_ts_utc = 0;
last_ts_ms = 0;
}
static void sender_reset_fault_stats(const char *reason, uint32_t now_utc) {
clear_faults(g_sender_faults, g_sender_last_error, g_sender_last_error_utc, g_sender_last_error_ms);
g_sender_rx_reject_reason = RxRejectReason::None;
display_set_last_error(g_sender_last_error, g_sender_last_error_utc, g_sender_last_error_ms);
if (SERIAL_DEBUG_MODE) {
serial_debug_printf("faults: reset scope=sender reason=%s ts_utc=%lu",
reason ? reason : "unknown",
static_cast<unsigned long>(now_utc));
}
}
static void sender_reset_fault_stats_on_first_sync(uint32_t synced_utc) {
if (g_sender_faults_reset_after_first_sync || synced_utc < MIN_ACCEPTED_EPOCH_UTC) {
return;
}
sender_reset_fault_stats("first_sync", synced_utc);
g_sender_faults_reset_after_first_sync = true;
g_sender_faults_reset_hour_utc = synced_utc / 3600U;
}
static void sender_reset_fault_stats_on_hour_boundary() {
if (!g_time_acquired || !g_sender_faults_reset_after_first_sync) {
return;
}
uint32_t now_utc = time_get_utc();
if (now_utc < MIN_ACCEPTED_EPOCH_UTC) {
return;
}
uint32_t now_hour_utc = now_utc / 3600U;
if (g_sender_faults_reset_hour_utc == UINT32_MAX) {
g_sender_faults_reset_hour_utc = now_hour_utc;
return;
}
if (now_hour_utc > g_sender_faults_reset_hour_utc) {
sender_reset_fault_stats("hourly", now_utc);
g_sender_faults_reset_hour_utc = now_hour_utc;
}
}
#ifdef ARDUINO_ARCH_ESP32
static void watchdog_init() {
esp_task_wdt_deinit();
esp_task_wdt_config_t config = {};
config.timeout_ms = WATCHDOG_TIMEOUT_SEC * 1000;
config.idle_core_mask = 0;
config.trigger_panic = true;
esp_task_wdt_init(&config);
esp_task_wdt_add(nullptr);
}
static void watchdog_kick() {
esp_task_wdt_reset();
}
#else
static void watchdog_init() {}
static void watchdog_kick() {}
#endif
static void write_u16_le(uint8_t *dst, uint16_t value) {
dst[0] = static_cast<uint8_t>(value & 0xFF);
dst[1] = static_cast<uint8_t>((value >> 8) & 0xFF);
}
static uint16_t read_u16_le(const uint8_t *src) {
return static_cast<uint16_t>(src[0]) | (static_cast<uint16_t>(src[1]) << 8);
}
static void write_u16_be(uint8_t *dst, uint16_t value) {
dst[0] = static_cast<uint8_t>((value >> 8) & 0xFF);
dst[1] = static_cast<uint8_t>(value & 0xFF);
}
static uint16_t read_u16_be(const uint8_t *src) {
return static_cast<uint16_t>(src[0] << 8) | static_cast<uint16_t>(src[1]);
}
static void write_u32_be(uint8_t *dst, uint32_t value) {
dst[0] = static_cast<uint8_t>((value >> 24) & 0xFF);
dst[1] = static_cast<uint8_t>((value >> 16) & 0xFF);
dst[2] = static_cast<uint8_t>((value >> 8) & 0xFF);
dst[3] = static_cast<uint8_t>(value & 0xFF);
}
static uint32_t read_u32_be(const uint8_t *src) {
return (static_cast<uint32_t>(src[0]) << 24) |
(static_cast<uint32_t>(src[1]) << 16) |
(static_cast<uint32_t>(src[2]) << 8) |
static_cast<uint32_t>(src[3]);
}
static uint16_t sender_id_from_short_id(uint16_t short_id) {
for (uint8_t i = 0; i < NUM_SENDERS; ++i) {
if (EXPECTED_SENDER_IDS[i] == short_id) {
return static_cast<uint16_t>(i + 1);
}
}
return 0;
}
static uint16_t short_id_from_sender_id(uint16_t sender_id) {
if (sender_id == 0 || sender_id > NUM_SENDERS) {
return 0;
}
return EXPECTED_SENDER_IDS[sender_id - 1];
}
static bool is_expected_receiver_short_id(uint16_t short_id) {
for (uint8_t i = 0; i < NUM_RECEIVERS; ++i) {
if (EXPECTED_RECEIVER_IDS[i] == short_id) {
return true;
}
}
return false;
}
static uint32_t kwh_to_wh_from_float(float value) {
if (isnan(value)) {
return 0;
}
double wh = static_cast<double>(value) * 1000.0;
if (wh < 0.0) {
wh = 0.0;
}
if (wh > static_cast<double>(UINT32_MAX)) {
wh = static_cast<double>(UINT32_MAX);
}
return static_cast<uint32_t>(llround(wh));
}
static bool float_to_i16_w(float value, int16_t &out) {
if (isnan(value)) {
out = 0;
return true;
}
long rounded = lroundf(value);
if (rounded < INT16_MIN || rounded > INT16_MAX) {
return false;
}
out = static_cast<int16_t>(rounded);
return true;
}
static int16_t float_to_i16_w_clamped(float value, bool &clamped) {
clamped = false;
if (isnan(value)) {
return 0;
}
long rounded = lroundf(value);
if (rounded < INT16_MIN) {
clamped = true;
return INT16_MIN;
}
if (rounded > INT16_MAX) {
clamped = true;
return INT16_MAX;
}
return static_cast<int16_t>(rounded);
}
static uint16_t battery_mv_from_voltage(float value) {
if (isnan(value) || value <= 0.0f) {
return 0;
}
long mv = lroundf(value * 1000.0f);
if (mv < 0) {
mv = 0;
}
if (mv > UINT16_MAX) {
mv = UINT16_MAX;
}
return static_cast<uint16_t>(mv);
}
static uint32_t compute_batch_rx_timeout_ms(uint16_t total_len, uint8_t chunk_count) {
if (total_len == 0 || chunk_count == 0) {
return 10000;
}
size_t max_chunk_payload = total_len > BATCH_CHUNK_PAYLOAD ? BATCH_CHUNK_PAYLOAD : total_len;
size_t payload_len = BATCH_HEADER_SIZE + max_chunk_payload;
size_t packet_len = 3 + payload_len + 2;
uint32_t per_chunk_toa_ms = lora_airtime_ms(packet_len);
uint32_t timeout_ms = static_cast<uint32_t>(chunk_count) * per_chunk_toa_ms + BATCH_RX_MARGIN_MS;
return timeout_ms < 10000 ? 10000 : timeout_ms;
}
static uint32_t compute_batch_ack_timeout_ms(size_t payload_len) {
if (payload_len == 0) {
return 10000;
}
uint8_t chunk_count = static_cast<uint8_t>((payload_len + BATCH_CHUNK_PAYLOAD - 1) / BATCH_CHUNK_PAYLOAD);
size_t packet_len = 3 + BATCH_HEADER_SIZE + (payload_len > BATCH_CHUNK_PAYLOAD ? BATCH_CHUNK_PAYLOAD : payload_len) + 2;
uint32_t per_chunk_toa_ms = lora_airtime_ms(packet_len);
uint32_t timeout_ms = static_cast<uint32_t>(chunk_count) * per_chunk_toa_ms + BATCH_RX_MARGIN_MS;
return timeout_ms < 10000 ? 10000 : timeout_ms;
}
static bool send_batch_payload(const uint8_t *data, size_t len, uint32_t ts_for_display, uint16_t batch_id) {
if (!data || len == 0 || len > BATCH_MAX_COMPRESSED) {
return false;
}
uint8_t chunk_count = static_cast<uint8_t>((len + BATCH_CHUNK_PAYLOAD - 1) / BATCH_CHUNK_PAYLOAD);
if (chunk_count == 0) {
return false;
}
bool all_ok = true;
size_t offset = 0;
for (uint8_t i = 0; i < chunk_count; ++i) {
size_t chunk_len = len - offset;
if (chunk_len > BATCH_CHUNK_PAYLOAD) {
chunk_len = BATCH_CHUNK_PAYLOAD;
}
LoraPacket pkt = {};
pkt.msg_kind = LoraMsgKind::BatchUp;
pkt.device_id_short = g_short_id;
pkt.payload_len = chunk_len + BATCH_HEADER_SIZE;
uint8_t *payload = pkt.payload;
write_u16_le(&payload[0], batch_id);
payload[2] = i;
payload[3] = chunk_count;
write_u16_le(&payload[4], static_cast<uint16_t>(len));
memcpy(&payload[BATCH_HEADER_SIZE], data + offset, chunk_len);
watchdog_kick();
uint32_t tx_start = millis();
bool ok = lora_send(pkt);
uint32_t tx_ms = millis() - tx_start;
all_ok = all_ok && ok;
if (!ok) {
note_fault(g_sender_faults, g_sender_last_error, g_sender_last_error_utc, g_sender_last_error_ms, FaultType::LoraTx);
display_set_last_error(g_sender_last_error, g_sender_last_error_utc, g_sender_last_error_ms);
}
if (SERIAL_DEBUG_MODE && (!ok || tx_ms > 2000)) {
serial_debug_printf("tx: chunk %u/%u took %lums ok=%u", static_cast<unsigned>(i + 1),
static_cast<unsigned>(chunk_count), static_cast<unsigned long>(tx_ms), ok ? 1 : 0);
}
offset += chunk_len;
delay(10);
}
display_set_last_tx(all_ok, ts_for_display);
return all_ok;
}
static void send_batch_ack(uint16_t batch_id, uint8_t sample_count) {
uint32_t epoch = time_get_utc();
uint8_t time_valid = (time_is_synced() && epoch >= MIN_ACCEPTED_EPOCH_UTC) ? 1 : 0;
if (!time_valid) {
epoch = 0;
}
LoraPacket ack = {};
ack.msg_kind = LoraMsgKind::AckDown;
ack.device_id_short = g_short_id;
ack.payload_len = LORA_ACK_DOWN_PAYLOAD_LEN;
ack.payload[0] = time_valid;
write_u16_be(&ack.payload[1], batch_id);
write_u32_be(&ack.payload[3], epoch);
uint8_t repeats = ACK_REPEAT_COUNT == 0 ? 1 : ACK_REPEAT_COUNT;
for (uint8_t i = 0; i < repeats; ++i) {
lora_send(ack);
if (i + 1 < repeats && ACK_REPEAT_DELAY_MS > 0) {
delay(ACK_REPEAT_DELAY_MS);
}
}
serial_debug_printf("ack: tx batch_id=%u time_valid=%u epoch=%lu samples=%u",
batch_id,
static_cast<unsigned>(time_valid),
static_cast<unsigned long>(epoch),
static_cast<unsigned>(sample_count));
lora_receive_continuous();
}
static bool prepare_inflight_from_queue() {
if (g_inflight_active) {
return true;
}
BatchBuffer *batch = batch_queue_peek();
if (!batch || batch->count == 0) {
return false;
}
if (!batch->batch_id_valid) {
batch->batch_id = g_batch_id;
batch->batch_id_valid = true;
}
if (SERIAL_DEBUG_MODE) {
serial_debug_printf("batch: id=%u desired=%u attempts=%u valid=%u invalid=%u err_last=%u",
batch->batch_id,
static_cast<unsigned>(METER_BATCH_MAX_SAMPLES),
static_cast<unsigned>(batch->attempt_count),
static_cast<unsigned>(batch->valid_count),
static_cast<unsigned>(batch->invalid_count),
static_cast<unsigned>(batch->last_error));
}
g_inflight_count = batch->count;
g_inflight_batch_id = batch->batch_id;
for (uint8_t i = 0; i < g_inflight_count; ++i) {
g_inflight_samples[i] = batch->samples[i];
}
g_inflight_active = true;
return true;
}
static bool send_inflight_batch(uint32_t ts_for_display) {
g_last_tx_build_error = TxBuildError::None;
if (!g_inflight_active) {
return false;
}
bool cache_match = g_inflight_encoded_valid &&
g_inflight_encoded_batch_id == g_inflight_batch_id &&
g_inflight_encoded_sync_request == g_inflight_sync_request;
if (cache_match) {
g_batch_ack_timeout_ms = compute_batch_ack_timeout_ms(g_inflight_encoded_payload_len);
uint32_t send_start = millis();
bool ok = send_batch_payload(g_inflight_encoded_payload, g_inflight_encoded_payload_len, ts_for_display, g_inflight_batch_id);
uint32_t send_ms = millis() - send_start;
if (SERIAL_DEBUG_MODE && send_ms > 1000) {
serial_debug_printf("tx: resend batch took %lums", static_cast<unsigned long>(send_ms));
}
if (ok) {
g_last_batch_send_ms = millis();
if (g_inflight_sync_request) {
serial_debug_printf("sync: request tx batch_id=%u", g_inflight_batch_id);
} else {
serial_debug_printf("tx: resent batch_id=%u len=%u", g_inflight_batch_id, static_cast<unsigned>(g_inflight_encoded_payload_len));
}
} else if (g_inflight_sync_request) {
serial_debug_printf("sync: request tx failed batch_id=%u", g_inflight_batch_id);
} else {
serial_debug_printf("tx: resend failed batch_id=%u", g_inflight_batch_id);
}
return ok;
}
BatchInput input = {};
input.sender_id = sender_id_from_short_id(g_short_id);
input.batch_id = g_inflight_batch_id;
input.meter_t_last = g_inflight_sync_request ? 0 :
g_inflight_samples[g_inflight_count - 1].meter_seconds;
input.ts_utc_last = g_inflight_sync_request ? time_get_utc() :
g_inflight_samples[g_inflight_count - 1].ts_utc;
input.present_mask = 0;
input.n = 0;
input.battery_mV = g_inflight_sync_request ? battery_mv_from_voltage(g_last_battery_voltage_v) :
battery_mv_from_voltage(g_inflight_samples[g_inflight_count - 1].battery_voltage_v);
input.err_m = g_sender_faults.meter_read_fail > 255 ? 255 : static_cast<uint8_t>(g_sender_faults.meter_read_fail);
input.err_d = g_sender_faults.decode_fail > 255 ? 255 : static_cast<uint8_t>(g_sender_faults.decode_fail);
input.err_tx = g_sender_faults.lora_tx_fail > 255 ? 255 : static_cast<uint8_t>(g_sender_faults.lora_tx_fail);
input.err_last = static_cast<uint8_t>(g_sender_last_error);
input.err_rx_reject = static_cast<uint8_t>(g_sender_rx_reject_reason);
uint8_t energy_regressions = 0;
uint8_t phase_clamps = 0;
uint8_t ts_dropped = 0;
uint8_t ts_collapsed = 0;
if (!g_inflight_sync_request) {
if (!g_inflight_samples[g_inflight_count - 1].meter_seconds_valid ||
input.meter_t_last < static_cast<uint32_t>(METER_BATCH_MAX_SAMPLES - 1)) {
g_last_tx_build_error = TxBuildError::Encode;
return false;
}
const uint32_t window_start = input.meter_t_last - static_cast<uint32_t>(METER_BATCH_MAX_SAMPLES - 1);
MeterData slot_samples[METER_BATCH_MAX_SAMPLES];
bool slot_used[METER_BATCH_MAX_SAMPLES] = {};
for (uint8_t i = 0; i < g_inflight_count; ++i) {
const MeterData &sample = g_inflight_samples[i];
if (!sample.meter_seconds_valid || sample.meter_seconds < window_start || sample.meter_seconds > input.meter_t_last) {
if (ts_dropped < 255) {
ts_dropped++;
}
continue;
}
uint8_t slot = static_cast<uint8_t>(sample.meter_seconds - window_start);
if (slot_used[slot] && ts_collapsed < 255) {
ts_collapsed++;
}
slot_used[slot] = true;
slot_samples[slot] = sample;
}
for (uint8_t slot = 0; slot < METER_BATCH_MAX_SAMPLES; ++slot) {
if (!slot_used[slot]) {
continue;
}
const uint8_t out_idx = input.n;
if (out_idx >= METER_BATCH_MAX_SAMPLES) {
g_last_tx_build_error = TxBuildError::Encode;
return false;
}
input.present_mask |= (1UL << slot);
input.n++;
input.energy_wh[out_idx] = kwh_to_wh_from_float(slot_samples[slot].energy_total_kwh);
bool c1 = false;
bool c2 = false;
bool c3 = false;
input.p1_w[out_idx] = float_to_i16_w_clamped(slot_samples[slot].phase_power_w[0], c1);
input.p2_w[out_idx] = float_to_i16_w_clamped(slot_samples[slot].phase_power_w[1], c2);
input.p3_w[out_idx] = float_to_i16_w_clamped(slot_samples[slot].phase_power_w[2], c3);
if (c1 && phase_clamps < 255) {
phase_clamps++;
}
if (c2 && phase_clamps < 255) {
phase_clamps++;
}
if (c3 && phase_clamps < 255) {
phase_clamps++;
}
}
}
for (uint8_t i = 0; i < input.n; ++i) {
if (i > 0 && input.energy_wh[i] < input.energy_wh[i - 1]) {
input.energy_wh[i] = input.energy_wh[i - 1];
if (energy_regressions < 255) {
energy_regressions++;
}
}
}
if (SERIAL_DEBUG_MODE && (energy_regressions > 0 || phase_clamps > 0 || ts_dropped > 0 || ts_collapsed > 0)) {
serial_debug_printf("tx: sanitize batch_id=%u energy_regress=%u phase_clamps=%u ts_drop=%u ts_dup=%u",
g_inflight_batch_id,
static_cast<unsigned>(energy_regressions),
static_cast<unsigned>(phase_clamps),
static_cast<unsigned>(ts_dropped),
static_cast<unsigned>(ts_collapsed));
}
static uint8_t encoded[BATCH_MAX_COMPRESSED];
size_t encoded_len = 0;
uint32_t encode_start = millis();
if (!encode_batch(input, encoded, sizeof(encoded), &encoded_len)) {
g_last_tx_build_error = TxBuildError::Encode;
invalidate_inflight_encode_cache();
return false;
}
memcpy(g_inflight_encoded_payload, encoded, encoded_len);
g_inflight_encoded_payload_len = encoded_len;
g_inflight_encoded_batch_id = g_inflight_batch_id;
g_inflight_encoded_sync_request = g_inflight_sync_request;
g_inflight_encoded_valid = true;
uint32_t encode_ms = millis() - encode_start;
if (SERIAL_DEBUG_MODE) {
serial_debug_printf("tx: batch_id=%u count=%u mask=%08lX bin_len=%u",
g_inflight_batch_id,
static_cast<unsigned>(input.n),
static_cast<unsigned long>(input.present_mask),
static_cast<unsigned>(encoded_len));
if (encode_ms > 200) {
serial_debug_printf("tx: encode took %lums", static_cast<unsigned long>(encode_ms));
}
}
g_batch_ack_timeout_ms = compute_batch_ack_timeout_ms(encoded_len);
uint32_t send_start = millis();
bool ok = send_batch_payload(encoded, encoded_len, ts_for_display, g_inflight_batch_id);
uint32_t send_ms = millis() - send_start;
if (SERIAL_DEBUG_MODE && send_ms > 1000) {
serial_debug_printf("tx: send batch took %lums", static_cast<unsigned long>(send_ms));
}
if (ok) {
g_last_batch_send_ms = millis();
if (g_inflight_sync_request) {
serial_debug_printf("sync: request tx batch_id=%u", g_inflight_batch_id);
} else {
serial_debug_printf("tx: sent batch_id=%u len=%u", g_inflight_batch_id, static_cast<unsigned>(encoded_len));
}
} else {
if (g_inflight_sync_request) {
serial_debug_printf("sync: request tx failed batch_id=%u", g_inflight_batch_id);
} else {
serial_debug_printf("tx: send failed batch_id=%u", g_inflight_batch_id);
}
}
return ok;
}
static bool send_meter_batch(uint32_t ts_for_display) {
if (!prepare_inflight_from_queue()) {
return false;
}
g_inflight_sync_request = false;
bool ok = send_inflight_batch(ts_for_display);
if (ok) {
g_last_sent_batch_id = g_inflight_batch_id;
g_batch_ack_pending = true;
} else {
if (g_last_tx_build_error == TxBuildError::Encode) {
serial_debug_printf("tx: encode failed batch_id=%u dropped", g_inflight_batch_id);
note_fault(g_sender_faults, g_sender_last_error, g_sender_last_error_utc, g_sender_last_error_ms, FaultType::Decode);
display_set_last_error(g_sender_last_error, g_sender_last_error_utc, g_sender_last_error_ms);
finish_inflight_batch();
return false;
}
g_inflight_active = false;
g_inflight_count = 0;
g_inflight_batch_id = 0;
g_inflight_sync_request = false;
invalidate_inflight_encode_cache();
}
return ok;
}
static bool send_sync_request() {
if (g_batch_ack_pending) {
return false;
}
if (battery_sample_due(millis())) {
update_battery_cache();
}
g_inflight_active = true;
g_inflight_sync_request = true;
g_inflight_count = 0;
g_inflight_batch_id = g_batch_id;
if (SERIAL_DEBUG_MODE && g_build_attempts > 0) {
serial_debug_printf("batch: id=%u desired=%u attempts=%u valid=%u invalid=%u err_last=%u sync=1",
g_inflight_batch_id,
static_cast<unsigned>(METER_BATCH_MAX_SAMPLES),
static_cast<unsigned>(g_build_attempts),
static_cast<unsigned>(g_build_valid),
static_cast<unsigned>(g_build_invalid),
static_cast<unsigned>(g_sender_last_error));
}
bool ok = send_inflight_batch(time_get_utc());
if (ok) {
g_last_sent_batch_id = g_inflight_batch_id;
g_batch_ack_pending = true;
} else {
g_inflight_active = false;
g_inflight_sync_request = false;
g_inflight_batch_id = 0;
invalidate_inflight_encode_cache();
}
return ok;
}
static bool resend_inflight_batch(uint32_t ts_for_display) {
if (!g_batch_ack_pending || !g_inflight_active || (!g_inflight_sync_request && g_inflight_count == 0)) {
return false;
}
return send_inflight_batch(ts_for_display);
}
static void finish_inflight_batch() {
if (g_batch_count > 0) {
batch_queue_drop_oldest();
}
g_batch_ack_pending = false;
g_batch_retry_count = 0;
g_inflight_active = false;
g_inflight_count = 0;
g_inflight_batch_id = 0;
g_inflight_sync_request = false;
invalidate_inflight_encode_cache();
g_batch_id++;
}
static void sender_loop() {
watchdog_kick();
uint32_t now_ms = millis();
display_set_sender_queue(g_batch_count, g_build_count > 0);
display_set_sender_batches(g_last_acked_batch_id, g_batch_id);
sender_log_diagnostics(now_ms);
meter_reader_pump(now_ms);
if (g_time_acquired) {
sender_reset_fault_stats_on_hour_boundary();
// Evaluate meter staleness once per loop iteration, not per catch-up tick.
// This prevents LoRa TX blocking (seconds) from inflating the fault counter
// by N missed ticks when the same stale-data condition persists throughout.
uint32_t meter_age_ms = g_last_meter_valid ? (now_ms - g_last_meter_rx_ms) : UINT32_MAX;
bool has_snapshot = g_last_meter_valid;
bool meter_ok = has_snapshot && meter_age_ms <= METER_SAMPLE_MAX_AGE_MS;
bool meter_fault_noted = false;
// Count one time-jump fault per event, outside the catch-up loop.
if (g_meter_time_jump_pending) {
g_meter_time_jump_pending = false;
note_fault(g_sender_faults, g_sender_last_error, g_sender_last_error_utc, g_sender_last_error_ms, FaultType::MeterRead);
display_set_last_error(g_sender_last_error, g_sender_last_error_utc, g_sender_last_error_ms);
meter_fault_noted = true;
}
// Count one stale-meter fault per contiguous stale period, not per tick.
if (!meter_ok && !meter_fault_noted) {
note_fault(g_sender_faults, g_sender_last_error, g_sender_last_error_utc, g_sender_last_error_ms, FaultType::MeterRead);
display_set_last_error(g_sender_last_error, g_sender_last_error_utc, g_sender_last_error_ms);
}
#ifdef DEBUG_METER_DIAG
{
uint32_t pending_ticks = (now_ms - g_last_sample_ms) / METER_SAMPLE_INTERVAL_MS;
if (pending_ticks > 1) {
serial_debug_printf("meter_diag: catchup ticks=%lu age_ms=%lu ok=%u snap=%u",
static_cast<unsigned long>(pending_ticks),
static_cast<unsigned long>(meter_age_ms),
meter_ok ? 1U : 0U,
has_snapshot ? 1U : 0U);
}
}
#endif
while (now_ms - g_last_sample_ms >= METER_SAMPLE_INTERVAL_MS) {
g_last_sample_ms += METER_SAMPLE_INTERVAL_MS;
MeterData data = {};
data.short_id = g_short_id;
strncpy(data.device_id, g_device_id, sizeof(data.device_id));
data.energy_total_kwh = NAN;
data.total_power_w = NAN;
data.phase_power_w[0] = NAN;
data.phase_power_w[1] = NAN;
data.phase_power_w[2] = NAN;
g_build_attempts++;
if (has_snapshot) {
data.meter_seconds = g_last_meter_data.meter_seconds;
data.meter_seconds_valid = g_last_meter_data.meter_seconds_valid;
data.energy_total_kwh = g_last_meter_data.energy_total_kwh;
data.total_power_w = g_last_meter_data.total_power_w;
data.phase_power_w[0] = g_last_meter_data.phase_power_w[0];
data.phase_power_w[1] = g_last_meter_data.phase_power_w[1];
data.phase_power_w[2] = g_last_meter_data.phase_power_w[2];
g_meter_stale_seconds = meter_age_ms >= 1000 ? (meter_age_ms / 1000) : 0;
} else {
g_meter_stale_seconds = g_last_meter_valid ? (meter_age_ms / 1000) : (g_meter_stale_seconds + 1);
}
if (g_build_count == 0 && battery_sample_due(now_ms)) {
update_battery_cache();
}
data.battery_voltage_v = g_last_battery_voltage_v;
data.battery_percent = g_last_battery_percent;
data.rx_reject_reason = static_cast<uint8_t>(g_sender_rx_reject_reason);
uint32_t sample_ts_utc = 0;
if (has_snapshot && g_last_meter_data.meter_seconds_valid && g_last_meter_data.ts_utc >= MIN_ACCEPTED_EPOCH_UTC) {
sample_ts_utc = g_last_meter_data.ts_utc;
} else {
sample_ts_utc = time_get_utc();
if (sample_ts_utc > 0 && now_ms > g_last_sample_ms) {
uint32_t lag_s = (now_ms - g_last_sample_ms) / 1000;
if (sample_ts_utc > lag_s) {
sample_ts_utc -= lag_s;
}
}
}
data.ts_utc = sample_ts_utc;
data.valid = has_snapshot;
if (!data.meter_seconds_valid) {
data.valid = false;
}
bool appended = append_meter_sample(data, meter_ok, has_snapshot && data.meter_seconds_valid);
(void)appended;
display_set_last_meter(data);
display_set_last_read(meter_ok, data.ts_utc);
}
if (!g_batch_ack_pending && now_ms - g_last_send_ms >= METER_SEND_INTERVAL_MS) {
g_last_send_ms = now_ms;
if (g_build_count > 0) {
batch_queue_enqueue(g_build_samples, g_build_count);
g_build_count = 0;
reset_build_counters();
}
if (g_batch_count > 0) {
send_meter_batch(last_sample_ts());
} else if (g_build_attempts > 0) {
if (send_sync_request()) {
reset_build_counters();
}
}
}
// Catch-up mode: when backlog exists, send next queued batch without waiting
// for the regular 30s cadence.
if (!g_batch_ack_pending && g_batch_count > 1) {
send_meter_batch(last_sample_ts());
}
} else {
if (!g_batch_ack_pending && now_ms - g_last_sync_request_ms >= SYNC_REQUEST_INTERVAL_MS) {
g_last_sync_request_ms = now_ms;
send_sync_request();
}
}
if (g_batch_ack_pending) {
LoraPacket ack_pkt = {};
constexpr size_t ack_len = lora_frame_size(LORA_ACK_DOWN_PAYLOAD_LEN);
uint32_t ack_air_ms = lora_airtime_ms(ack_len);
uint32_t ack_window_first_ms = ack_air_ms + 200;
if (g_sender_ack_rtt_ewma_ms > 0) {
uint32_t rtt_based_ms = g_sender_ack_rtt_ewma_ms + 150;
if (rtt_based_ms > ack_window_first_ms) {
ack_window_first_ms = rtt_based_ms;
}
}
uint32_t miss_boost_ms = g_sender_ack_miss_streak * 150;
if (miss_boost_ms > 1200) {
miss_boost_ms = 1200;
}
ack_window_first_ms += miss_boost_ms;
if (ack_window_first_ms < 600) {
ack_window_first_ms = 600;
}
if (ack_window_first_ms > 2500) {
ack_window_first_ms = 2500;
}
uint32_t ack_window_second_ms = ack_window_first_ms + (ack_window_first_ms / 2);
uint32_t min_second_ms = ack_air_ms + 400;
if (ack_window_second_ms < min_second_ms) {
ack_window_second_ms = min_second_ms;
}
if (ack_window_second_ms > 5000) {
ack_window_second_ms = 5000;
}
if (SERIAL_DEBUG_MODE && (g_sender_ack_miss_streak > 0 || now_ms - g_last_ack_window_log_ms >= 10000)) {
g_last_ack_window_log_ms = now_ms;
serial_debug_printf("ack: rx windows=%lu/%lu airtime=%lu miss_streak=%lu",
static_cast<unsigned long>(ack_window_first_ms),
static_cast<unsigned long>(ack_window_second_ms),
static_cast<unsigned long>(ack_air_ms),
static_cast<unsigned long>(g_sender_ack_miss_streak));
}
uint32_t rx_start = millis();
bool got_ack = lora_receive_window(ack_pkt, ack_window_first_ms);
if (!got_ack) {
got_ack = lora_receive_window(ack_pkt, ack_window_second_ms);
}
uint32_t rx_elapsed = millis() - rx_start;
if (SERIAL_DEBUG_MODE) {
g_sender_rx_window_ms += rx_elapsed;
}
bool ack_accepted = false;
if (!got_ack) {
RxRejectReason reason = lora_get_last_rx_reject_reason();
sender_note_rx_reject(reason, "ack");
if (SERIAL_DEBUG_MODE) {
int16_t rssi_dbm = 0;
float snr_db = 0.0f;
bool has_signal = lora_get_last_rx_signal(rssi_dbm, snr_db);
const char *reason_text = reason == RxRejectReason::None ? "timeout" : rx_reject_reason_text(reason);
if (has_signal) {
serial_debug_printf("ack: rx miss reason=%s rssi=%d snr=%.1f",
reason_text,
static_cast<int>(rssi_dbm),
static_cast<double>(snr_db));
} else {
serial_debug_printf("ack: rx miss reason=%s", reason_text);
}
}
} else if (ack_pkt.msg_kind != LoraMsgKind::AckDown) {
sender_note_rx_reject(RxRejectReason::InvalidMsgKind, "ack");
if (SERIAL_DEBUG_MODE) {
uint16_t ack_id = ack_pkt.payload_len >= 3 ? read_u16_be(&ack_pkt.payload[1]) : 0;
serial_debug_printf("ack: reject msg_kind=%u payload_len=%u ack_id=%u",
static_cast<unsigned>(ack_pkt.msg_kind),
static_cast<unsigned>(ack_pkt.payload_len),
ack_id);
}
} else if (ack_pkt.payload_len < LORA_ACK_DOWN_PAYLOAD_LEN) {
sender_note_rx_reject(RxRejectReason::LengthMismatch, "ack");
if (SERIAL_DEBUG_MODE) {
uint16_t ack_id = ack_pkt.payload_len >= 3 ? read_u16_be(&ack_pkt.payload[1]) : 0;
serial_debug_printf("ack: reject msg_kind=%u payload_len=%u ack_id=%u",
static_cast<unsigned>(ack_pkt.msg_kind),
static_cast<unsigned>(ack_pkt.payload_len),
ack_id);
}
} else if (sender_id_from_short_id(ack_pkt.device_id_short) == 0 &&
ack_pkt.device_id_short != g_short_id &&
!is_expected_receiver_short_id(ack_pkt.device_id_short)) {
// Reject ACKs from unknown device IDs to prevent spoofing, but allow
// ACKs from configured receiver short-IDs.
sender_note_rx_reject(RxRejectReason::DeviceIdMismatch, "ack");
if (SERIAL_DEBUG_MODE) {
serial_debug_printf("ack: reject device_id=%04X (unknown)",
ack_pkt.device_id_short);
}
} else {
uint8_t time_valid = ack_pkt.payload[0] & 0x01;
uint16_t ack_id = read_u16_be(&ack_pkt.payload[1]);
uint32_t ack_epoch = read_u32_be(&ack_pkt.payload[3]);
bool set_time = false;
if (g_batch_ack_pending && ack_id == g_last_sent_batch_id) {
// Rate-limit: reject if another ACK was accepted less than ACK_MIN_INTERVAL_MS ago
if (g_ack_accept_last_ms != 0 && (millis() - g_ack_accept_last_ms < ACK_MIN_INTERVAL_MS)) {
if (SERIAL_DEBUG_MODE) {
serial_debug_printf("ack: rate-limited (last accepted %lums ago)",
static_cast<unsigned long>(millis() - g_ack_accept_last_ms));
}
} else {
ack_accepted = true;
g_ack_accept_last_ms = millis();
g_sender_ack_rtt_last_ms = rx_elapsed;
if (g_sender_ack_rtt_ewma_ms == 0) {
g_sender_ack_rtt_ewma_ms = rx_elapsed;
} else {
g_sender_ack_rtt_ewma_ms = (g_sender_ack_rtt_ewma_ms * 3U + rx_elapsed + 1U) / 4U;
}
if (time_valid == 1 && ack_epoch >= MIN_ACCEPTED_EPOCH_UTC) {
time_set_utc(ack_epoch);
g_time_acquired = true;
sender_reset_fault_stats_on_first_sync(ack_epoch);
set_time = true;
}
g_last_acked_batch_id = ack_id;
serial_debug_printf("ack: rx ok batch_id=%u time_valid=%u epoch=%lu set=%u",
ack_id,
static_cast<unsigned>(time_valid),
static_cast<unsigned long>(ack_epoch),
set_time ? 1 : 0);
finish_inflight_batch();
}
} else {
if (ack_id != g_last_sent_batch_id) {
sender_note_rx_reject(RxRejectReason::BatchIdMismatch, "ack");
if (SERIAL_DEBUG_MODE) {
serial_debug_printf("ack: reject msg_kind=%u payload_len=%u ack_id=%u",
static_cast<unsigned>(ack_pkt.msg_kind),
static_cast<unsigned>(ack_pkt.payload_len),
ack_id);
}
}
}
}
if (ack_accepted) {
g_sender_ack_miss_streak = 0;
} else if (g_sender_ack_miss_streak < UINT32_MAX) {
g_sender_ack_miss_streak++;
}
}
if (!g_batch_ack_pending) {
lora_sleep();
}
if (g_batch_ack_pending && (now_ms - g_last_batch_send_ms >= g_batch_ack_timeout_ms)) {
g_sender_ack_timeout_total++;
if (g_batch_retry_count < BATCH_MAX_RETRIES) {
g_batch_retry_count++;
g_sender_ack_retry_total++;
serial_debug_printf("ack: timeout batch_id=%u retry=%u", g_inflight_batch_id, g_batch_retry_count);
resend_inflight_batch(last_sample_ts());
} else {
serial_debug_printf("ack: failed batch_id=%u policy=%s", g_inflight_batch_id,
BATCH_RETRY_POLICY == BatchRetryPolicy::Drop ? "drop" : "keep");
if (BATCH_RETRY_POLICY == BatchRetryPolicy::Drop) {
finish_inflight_batch();
} else {
g_batch_ack_pending = false;
g_batch_retry_count = 0;
g_inflight_active = false;
g_inflight_count = 0;
g_inflight_batch_id = 0;
g_inflight_sync_request = false;
invalidate_inflight_encode_cache();
}
note_fault(g_sender_faults, g_sender_last_error, g_sender_last_error_utc, g_sender_last_error_ms, FaultType::LoraTx);
display_set_last_error(g_sender_last_error, g_sender_last_error_utc, g_sender_last_error_ms);
}
}
display_tick();
uint32_t next_due = g_time_acquired ? (g_last_sample_ms + METER_SAMPLE_INTERVAL_MS) :
(g_last_sync_request_ms + SYNC_REQUEST_INTERVAL_MS);
if (g_time_acquired) {
uint32_t next_send_due = g_last_send_ms + METER_SEND_INTERVAL_MS;
if (next_send_due < next_due) {
next_due = next_send_due;
}
}
if (!g_batch_ack_pending && next_due > now_ms) {
watchdog_kick();
uint32_t idle_ms = next_due - now_ms;
if (SERIAL_DEBUG_MODE) {
g_sender_sleep_ms += idle_ms;
if (now_ms - g_sender_power_log_ms >= SENDER_DIAG_LOG_INTERVAL_MS) {
g_sender_power_log_ms = now_ms;
serial_debug_printf("power: rx_ms=%lu sleep_ms=%lu", static_cast<unsigned long>(g_sender_rx_window_ms),
static_cast<unsigned long>(g_sender_sleep_ms));
}
}
lora_sleep();
if (LIGHT_SLEEP_IDLE) {
// Chunked light-sleep: wake every LIGHT_SLEEP_CHUNK_MS so the
// meter_reader_task (Core 0, prio 2) can drain the 128-byte UART HW FIFO
// before it overflows (~133 ms at 9600 baud). Saves ~25 mA vs delay().
light_sleep_chunked_ms(idle_ms, LIGHT_SLEEP_CHUNK_MS);
} else if (g_time_acquired) {
// Fallback: keep meter reader task alive with an active wait.
delay(idle_ms);
} else {
light_sleep_ms(idle_ms);
}
}
}
static const char *sender_phase_text(SenderPhase phase) {
switch (phase) {
case SenderPhase::Syncing: return "SYNCING";
case SenderPhase::Normal: return "NORMAL";
case SenderPhase::Catchup: return "CATCHUP";
case SenderPhase::WaitAck: return "WAIT_ACK";
default: return "UNKNOWN";
}
}
static void sender_transition(SenderPhase next, const char *reason) {
if (next == g_sender_phase) {
return;
}
if (DD3_DEBUG_ENABLED && SERIAL_DEBUG_MODE) {
serial_debug_printf("state: %s -> %s reason=%s", sender_phase_text(g_sender_phase), sender_phase_text(next), reason ? reason : "none");
}
g_sender_phase = next;
}
static void sender_update_phase() {
if (g_batch_ack_pending) {
sender_transition(SenderPhase::WaitAck, "ack_pending");
} else if (!g_time_acquired) {
sender_transition(SenderPhase::Syncing, "time_unsynced");
} else if (g_batch_count > 1) {
sender_transition(SenderPhase::Catchup, "backlog");
} else {
sender_transition(SenderPhase::Normal, "steady");
}
}
static void sender_validate_invariants() {
if (g_batch_count > BATCH_QUEUE_DEPTH) {
serial_debug_printf("inv: queue overflow count=%u max=%u", static_cast<unsigned>(g_batch_count), static_cast<unsigned>(BATCH_QUEUE_DEPTH));
g_batch_count = BATCH_QUEUE_DEPTH;
}
if (g_batch_retry_count > BATCH_MAX_RETRIES) {
serial_debug_printf("inv: retry overflow retry=%u max=%u", static_cast<unsigned>(g_batch_retry_count), static_cast<unsigned>(BATCH_MAX_RETRIES));
g_batch_retry_count = BATCH_MAX_RETRIES;
}
if (g_batch_ack_pending && !g_inflight_active && SERIAL_DEBUG_MODE) {
serial_debug_printf("inv: ack pending without inflight");
}
}
} // namespace
bool SenderStateMachine::begin(const SenderStateMachineConfig &config) {
g_short_id = config.short_id;
if (config.device_id) {
strncpy(g_device_id, config.device_id, sizeof(g_device_id));
g_device_id[sizeof(g_device_id) - 1] = '\0';
} else {
g_device_id[0] = '\0';
}
power_sender_init();
power_configure_unused_pins_sender();
meter_init();
#ifdef ARDUINO_ARCH_ESP32
if (!meter_reader_start()) {
serial_debug_printf("meter: using inline polling fallback");
}
#endif
g_last_sample_ms = millis() - METER_SAMPLE_INTERVAL_MS;
g_last_send_ms = millis();
g_last_sync_request_ms = millis() - SYNC_REQUEST_INTERVAL_MS;
g_time_acquired = false;
g_sender_faults_reset_after_first_sync = false;
g_sender_faults_reset_hour_utc = UINT32_MAX;
#ifdef ENABLE_TEST_MODE
g_test_meter_last_emit_ms = 0;
g_test_meter_tick = 0;
#endif
update_battery_cache();
sender_transition(SenderPhase::Syncing, "begin");
return true;
}
void SenderStateMachine::loop() {
sender_update_phase();
sender_loop();
sender_validate_invariants();
}
SenderStats SenderStateMachine::stats() const {
SenderStats stats = {};
stats.queue_depth = g_batch_count;
stats.build_count = g_build_count;
stats.inflight_batch_id = g_inflight_batch_id;
stats.last_sent_batch_id = g_last_sent_batch_id;
stats.last_acked_batch_id = g_last_acked_batch_id;
stats.retry_count = g_batch_retry_count;
stats.ack_pending = g_batch_ack_pending;
stats.ack_timeout_total = g_sender_ack_timeout_total;
stats.ack_retry_total = g_sender_ack_retry_total;
stats.ack_miss_streak = g_sender_ack_miss_streak;
stats.rx_window_ms = g_sender_rx_window_ms;
stats.sleep_ms = g_sender_sleep_ms;
return stats;
}
void SenderStateMachine::handleMeterRead(uint32_t now_ms) {
(void)now_ms;
}
void SenderStateMachine::maybeSendBatch(uint32_t now_ms) {
(void)now_ms;
}
void SenderStateMachine::handleAckWindow(uint32_t now_ms) {
(void)now_ms;
}
bool SenderStateMachine::applyTimeFromAck(uint8_t time_valid, uint32_t ack_epoch) {
if (time_valid == 1 && ack_epoch >= MIN_ACCEPTED_EPOCH_UTC) {
time_set_utc(ack_epoch);
g_time_acquired = true;
sender_reset_fault_stats_on_first_sync(ack_epoch);
return true;
}
return false;
}
void SenderStateMachine::validateInvariants() {
sender_validate_invariants();
}