diff --git a/include/config.h b/include/config.h index 0a25faa..3df1b16 100644 --- a/include/config.h +++ b/include/config.h @@ -15,6 +15,11 @@ enum class PayloadType : uint8_t { Ack = 4 }; +enum class BatchRetryPolicy : uint8_t { + Keep = 0, + Drop = 1 +}; + constexpr uint8_t PROTOCOL_VERSION = 1; // Pin definitions @@ -48,6 +53,7 @@ constexpr uint8_t LORA_SPREADING_FACTOR = 12; constexpr long LORA_BANDWIDTH = 125E3; constexpr uint8_t LORA_CODING_RATE = 5; constexpr uint8_t LORA_SYNC_WORD = 0x34; +constexpr uint8_t LORA_PREAMBLE_LEN = 8; // Timing constexpr uint32_t SENDER_WAKE_INTERVAL_SEC = 30; @@ -63,6 +69,8 @@ constexpr uint32_t METER_SEND_INTERVAL_MS = 30000; constexpr uint32_t BATCH_ACK_TIMEOUT_MS = 3000; constexpr uint8_t BATCH_MAX_RETRIES = 2; constexpr uint8_t METER_BATCH_MAX_SAMPLES = 30; +constexpr uint8_t BATCH_QUEUE_DEPTH = 10; +constexpr BatchRetryPolicy BATCH_RETRY_POLICY = BatchRetryPolicy::Keep; constexpr uint32_t WATCHDOG_TIMEOUT_SEC = 120; constexpr bool ENABLE_HA_DISCOVERY = true; diff --git a/include/lora_transport.h b/include/lora_transport.h index ec0fa00..bb445fb 100644 --- a/include/lora_transport.h +++ b/include/lora_transport.h @@ -3,7 +3,7 @@ #include #include "config.h" -constexpr size_t LORA_MAX_PAYLOAD = 200; +constexpr size_t LORA_MAX_PAYLOAD = 230; struct LoraPacket { uint8_t protocol_version; @@ -20,3 +20,4 @@ void lora_init(); bool lora_send(const LoraPacket &pkt); bool lora_receive(LoraPacket &pkt, uint32_t timeout_ms); void lora_sleep(); +uint32_t lora_airtime_ms(size_t packet_len); diff --git a/platformio.ini b/platformio.ini index 52f1e2d..277abd6 100644 --- a/platformio.ini +++ b/platformio.ini @@ -9,7 +9,7 @@ ; https://docs.platformio.org/page/projectconf.html [env:lilygo-t3-v1-6-1] -platform = espressif32 +platform = https://github.com/pioarduino/platform-espressif32/releases/download/51.03.07/platform-espressif32.zip board = ttgo-lora32-v1 framework = arduino lib_deps = @@ -20,7 +20,7 @@ lib_deps = knolleary/PubSubClient@^2.8 [env:lilygo-t3-v1-6-1-test] -platform = espressif32 +platform = https://github.com/pioarduino/platform-espressif32/releases/download/51.03.07/platform-espressif32.zip board = ttgo-lora32-v1 framework = arduino lib_deps = @@ -33,7 +33,7 @@ build_flags = -DENABLE_TEST_MODE [env:lilygo-t3-v1-6-1-868] -platform = espressif32 +platform = https://github.com/pioarduino/platform-espressif32/releases/download/51.03.07/platform-espressif32.zip board = ttgo-lora32-v1 framework = arduino lib_deps = @@ -46,7 +46,7 @@ build_flags = -DLORA_FREQUENCY_HZ=868E6 [env:lilygo-t3-v1-6-1-868-test] -platform = espressif32 +platform = https://github.com/pioarduino/platform-espressif32/releases/download/51.03.07/platform-espressif32.zip board = ttgo-lora32-v1 framework = arduino lib_deps = diff --git a/src/lora_transport.cpp b/src/lora_transport.cpp index 2bd0170..b69d190 100644 --- a/src/lora_transport.cpp +++ b/src/lora_transport.cpp @@ -1,6 +1,7 @@ #include "lora_transport.h" #include #include +#include static uint16_t crc16_ccitt(const uint8_t *data, size_t len) { uint16_t crc = 0xFFFF; @@ -106,3 +107,28 @@ bool lora_receive(LoraPacket &pkt, uint32_t timeout_ms) { void lora_sleep() { LoRa.sleep(); } + +uint32_t lora_airtime_ms(size_t packet_len) { + if (packet_len == 0) { + return 0; + } + + const double bw = static_cast(LORA_BANDWIDTH); + const double sf = static_cast(LORA_SPREADING_FACTOR); + const double cr = static_cast(LORA_CODING_RATE - 4); // coding rate denominator: 4/(4+cr) + const double tsym = (1 << LORA_SPREADING_FACTOR) / bw; + const double t_preamble = (static_cast(LORA_PREAMBLE_LEN) + 4.25) * tsym; + + const bool low_data_rate_opt = (LORA_SPREADING_FACTOR >= 11) && (LORA_BANDWIDTH <= 125000); + const double de = low_data_rate_opt ? 1.0 : 0.0; + const double ih = 0.0; + const double crc = 1.0; + + const double payload_symb_nb = 8.0 + max( + ceil((8.0 * packet_len - 4.0 * sf + 28.0 + 16.0 * crc - 20.0 * ih) / (4.0 * (sf - 2.0 * de))) * (cr + 4.0), + 0.0); + const double t_payload = payload_symb_nb * tsym; + const double t_packet = t_preamble + t_payload; + + return static_cast(ceil(t_packet * 1000.0)); +} diff --git a/src/main.cpp b/src/main.cpp index 3fd3d39..00bcbba 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -50,21 +50,38 @@ static constexpr size_t BATCH_HEADER_SIZE = 6; static constexpr size_t BATCH_CHUNK_PAYLOAD = LORA_MAX_PAYLOAD - BATCH_HEADER_SIZE; static constexpr size_t BATCH_MAX_COMPRESSED = 4096; static constexpr size_t BATCH_MAX_DECOMPRESSED = 8192; -static constexpr uint32_t BATCH_RX_TIMEOUT_MS = 2000; +static constexpr uint32_t BATCH_RX_MARGIN_MS = 800; + +struct BatchBuffer { + uint16_t batch_id; + bool batch_id_valid; + uint8_t count; + MeterData samples[METER_BATCH_MAX_SAMPLES]; +}; + +static BatchBuffer g_batch_queue[BATCH_QUEUE_DEPTH]; +static uint8_t g_batch_head = 0; +static uint8_t g_batch_tail = 0; +static uint8_t g_batch_count = 0; + +static MeterData g_build_samples[METER_BATCH_MAX_SAMPLES]; +static uint8_t g_build_count = 0; -static MeterData g_meter_samples[METER_BATCH_MAX_SAMPLES]; -static uint8_t g_meter_sample_count = 0; -static uint8_t g_meter_sample_head = 0; static uint32_t g_last_sample_ms = 0; +static uint32_t g_last_sample_ts_utc = 0; static uint32_t g_last_send_ms = 0; static uint32_t g_last_batch_send_ms = 0; +static float g_last_battery_voltage_v = NAN; +static uint8_t g_last_battery_percent = 0; +static uint32_t g_last_battery_ms = 0; static uint16_t g_batch_id = 1; static uint16_t g_last_sent_batch_id = 0; static uint8_t g_batch_retry_count = 0; static bool g_batch_ack_pending = false; static MeterData g_inflight_samples[METER_BATCH_MAX_SAMPLES]; -static size_t g_inflight_count = 0; +static uint8_t g_inflight_count = 0; static uint16_t g_inflight_batch_id = 0; +static bool g_inflight_active = false; static uint16_t g_last_batch_id_rx[NUM_SENDERS] = {}; @@ -76,6 +93,7 @@ struct BatchRxState { uint16_t total_len; uint16_t received_len; uint32_t last_rx_ms; + uint32_t timeout_ms; uint8_t buffer[BATCH_MAX_COMPRESSED]; }; @@ -98,33 +116,65 @@ static void init_sender_statuses() { } } -static void push_meter_sample(const MeterData &data) { - g_meter_samples[g_meter_sample_head] = data; - g_meter_sample_head = (g_meter_sample_head + 1) % METER_BATCH_MAX_SAMPLES; - if (g_meter_sample_count < METER_BATCH_MAX_SAMPLES) { - g_meter_sample_count++; - } +static void update_battery_cache() { + MeterData tmp = {}; + read_battery(tmp); + g_last_battery_voltage_v = tmp.battery_voltage_v; + g_last_battery_percent = tmp.battery_percent; + g_last_battery_ms = millis(); } -static size_t copy_meter_samples(MeterData *out, size_t max_count) { - if (!out || max_count == 0 || g_meter_sample_count == 0) { - return 0; +static bool batch_queue_drop_oldest() { + if (g_batch_count == 0) { + return false; } - size_t count = g_meter_sample_count < max_count ? g_meter_sample_count : max_count; - size_t start = (g_meter_sample_head + METER_BATCH_MAX_SAMPLES - count) % METER_BATCH_MAX_SAMPLES; - for (size_t i = 0; i < count; ++i) { - out[i] = g_meter_samples[(start + i) % METER_BATCH_MAX_SAMPLES]; + bool dropped_inflight = g_inflight_active && g_batch_queue[g_batch_tail].batch_id_valid && + g_inflight_batch_id == g_batch_queue[g_batch_tail].batch_id; + if (dropped_inflight) { + g_batch_ack_pending = false; + g_batch_retry_count = 0; + g_inflight_active = false; + g_inflight_count = 0; + g_inflight_batch_id = 0; } - return count; + g_batch_tail = (g_batch_tail + 1) % BATCH_QUEUE_DEPTH; + g_batch_count--; + return dropped_inflight; +} + +static BatchBuffer *batch_queue_peek() { + if (g_batch_count == 0) { + return nullptr; + } + return &g_batch_queue[g_batch_tail]; +} + +static void batch_queue_enqueue(const MeterData *samples, uint8_t count) { + if (!samples || count == 0) { + return; + } + if (g_batch_count >= BATCH_QUEUE_DEPTH) { + if (batch_queue_drop_oldest()) { + g_batch_id++; + } + } + BatchBuffer &slot = g_batch_queue[g_batch_head]; + slot.batch_id = 0; + slot.batch_id_valid = false; + slot.count = count; + for (uint8_t i = 0; i < count; ++i) { + slot.samples[i] = samples[i]; + } + g_batch_head = (g_batch_head + 1) % BATCH_QUEUE_DEPTH; + g_batch_count++; } static uint32_t last_sample_ts() { - if (g_meter_sample_count == 0) { + if (g_last_sample_ts_utc == 0) { uint32_t now_utc = time_get_utc(); return now_utc > 0 ? now_utc : millis() / 1000; } - size_t idx = (g_meter_sample_head + METER_BATCH_MAX_SAMPLES - 1) % METER_BATCH_MAX_SAMPLES; - return g_meter_samples[idx].ts_utc; + return g_last_sample_ts_utc; } static void note_fault(FaultCounters &counters, FaultType &last_type, uint32_t &last_ts_utc, uint32_t &last_ts_ms, FaultType type) { @@ -194,6 +244,18 @@ static uint16_t read_u16_le(const uint8_t *src) { return static_cast(src[0]) | (static_cast(src[1]) << 8); } +static uint32_t compute_batch_rx_timeout_ms(uint16_t total_len, uint8_t chunk_count) { + if (total_len == 0 || chunk_count == 0) { + return 2000; + } + size_t max_chunk_payload = total_len > BATCH_CHUNK_PAYLOAD ? BATCH_CHUNK_PAYLOAD : total_len; + size_t payload_len = BATCH_HEADER_SIZE + max_chunk_payload; + size_t packet_len = 5 + payload_len + 2; + uint32_t airtime_ms = lora_airtime_ms(packet_len); + uint32_t timeout_ms = airtime_ms + BATCH_RX_MARGIN_MS; + return timeout_ms < 500 ? 500 : timeout_ms; +} + static bool inject_batch_meta(String &json, int16_t rssi_dbm, float snr_db, uint32_t rx_ts_utc) { DynamicJsonDocument doc(8192); DeserializationError err = deserializeJson(doc, json); @@ -254,58 +316,42 @@ static bool send_batch_payload(const uint8_t *data, size_t len, uint32_t ts_for_ return all_ok; } -static void send_batch_ack(uint16_t batch_id) { +static void send_batch_ack(uint16_t batch_id, uint16_t sender_id) { LoraPacket ack = {}; ack.protocol_version = PROTOCOL_VERSION; ack.role = DeviceRole::Receiver; ack.device_id_short = g_short_id; ack.payload_type = PayloadType::Ack; - ack.payload_len = 2; - write_u16_le(ack.payload, batch_id); + ack.payload_len = 6; + write_u16_le(&ack.payload[0], batch_id); + write_u16_le(&ack.payload[2], sender_id); + write_u16_le(&ack.payload[4], g_short_id); lora_send(ack); } -static bool send_meter_batch(uint32_t ts_for_display) { - MeterData ordered[METER_BATCH_MAX_SAMPLES]; - size_t count = copy_meter_samples(ordered, METER_BATCH_MAX_SAMPLES); - if (count == 0) { +static bool prepare_inflight_from_queue() { + if (g_inflight_active) { + return true; + } + BatchBuffer *batch = batch_queue_peek(); + if (!batch || batch->count == 0) { return false; } - - for (size_t i = 0; i < count; ++i) { - g_inflight_samples[i] = ordered[i]; + if (!batch->batch_id_valid) { + batch->batch_id = g_batch_id; + batch->batch_id_valid = true; } - g_inflight_count = count; - g_inflight_batch_id = g_batch_id; - - String json; - if (!meterBatchToJson(g_inflight_samples, g_inflight_count, g_inflight_batch_id, json, &g_sender_faults, g_sender_last_error)) { - g_inflight_count = 0; - g_inflight_batch_id = 0; - return false; + g_inflight_count = batch->count; + g_inflight_batch_id = batch->batch_id; + for (uint8_t i = 0; i < g_inflight_count; ++i) { + g_inflight_samples[i] = batch->samples[i]; } - - static uint8_t compressed[BATCH_MAX_COMPRESSED]; - size_t compressed_len = 0; - if (!compressBuffer(reinterpret_cast(json.c_str()), json.length(), compressed, sizeof(compressed), compressed_len)) { - return false; - } - - bool ok = send_batch_payload(compressed, compressed_len, ts_for_display, g_inflight_batch_id); - g_last_batch_send_ms = millis(); - if (ok) { - g_last_sent_batch_id = g_inflight_batch_id; - g_batch_ack_pending = true; - g_batch_id++; - } else { - g_inflight_count = 0; - g_inflight_batch_id = 0; - } - return ok; + g_inflight_active = true; + return true; } -static bool resend_inflight_batch(uint32_t ts_for_display) { - if (!g_batch_ack_pending || g_inflight_count == 0) { +static bool send_inflight_batch(uint32_t ts_for_display) { + if (!g_inflight_active || g_inflight_count == 0) { return false; } String json; @@ -316,8 +362,6 @@ static bool resend_inflight_batch(uint32_t ts_for_display) { static uint8_t compressed[BATCH_MAX_COMPRESSED]; size_t compressed_len = 0; if (!compressBuffer(reinterpret_cast(json.c_str()), json.length(), compressed, sizeof(compressed), compressed_len)) { - g_inflight_count = 0; - g_inflight_batch_id = 0; return false; } @@ -328,6 +372,41 @@ static bool resend_inflight_batch(uint32_t ts_for_display) { return ok; } +static bool send_meter_batch(uint32_t ts_for_display) { + if (!prepare_inflight_from_queue()) { + return false; + } + bool ok = send_inflight_batch(ts_for_display); + if (ok) { + g_last_sent_batch_id = g_inflight_batch_id; + g_batch_ack_pending = true; + } else { + g_inflight_active = false; + g_inflight_count = 0; + g_inflight_batch_id = 0; + } + return ok; +} + +static bool resend_inflight_batch(uint32_t ts_for_display) { + if (!g_batch_ack_pending || !g_inflight_active || g_inflight_count == 0) { + return false; + } + return send_inflight_batch(ts_for_display); +} + +static void finish_inflight_batch() { + if (g_batch_count > 0) { + batch_queue_drop_oldest(); + } + g_batch_ack_pending = false; + g_batch_retry_count = 0; + g_inflight_active = false; + g_inflight_count = 0; + g_inflight_batch_id = 0; + g_batch_id++; +} + static void reset_batch_rx() { g_batch_rx.active = false; g_batch_rx.batch_id = 0; @@ -336,6 +415,7 @@ static void reset_batch_rx() { g_batch_rx.total_len = 0; g_batch_rx.received_len = 0; g_batch_rx.last_rx_ms = 0; + g_batch_rx.timeout_ms = 0; } static bool process_batch_packet(const LoraPacket &pkt, String &out_json, bool &decode_error, uint16_t &out_batch_id) { @@ -351,7 +431,7 @@ static bool process_batch_packet(const LoraPacket &pkt, String &out_json, bool & size_t chunk_len = pkt.payload_len - BATCH_HEADER_SIZE; uint32_t now_ms = millis(); - if (!g_batch_rx.active || batch_id != g_batch_rx.batch_id || (now_ms - g_batch_rx.last_rx_ms > BATCH_RX_TIMEOUT_MS)) { + if (!g_batch_rx.active || batch_id != g_batch_rx.batch_id || (now_ms - g_batch_rx.last_rx_ms > g_batch_rx.timeout_ms)) { if (chunk_index != 0) { reset_batch_rx(); return false; @@ -366,6 +446,7 @@ static bool process_batch_packet(const LoraPacket &pkt, String &out_json, bool & g_batch_rx.total_len = total_len; g_batch_rx.received_len = 0; g_batch_rx.next_index = 0; + g_batch_rx.timeout_ms = compute_batch_rx_timeout_ms(total_len, chunk_count); } if (!g_batch_rx.active || chunk_index != g_batch_rx.next_index || chunk_count != g_batch_rx.expected_chunks) { @@ -427,6 +508,7 @@ void setup() { meter_init(); g_last_sample_ms = millis() - METER_SAMPLE_INTERVAL_MS; g_last_send_ms = millis(); + update_battery_cache(); } else { power_receiver_init(); wifi_manager_init(); @@ -472,13 +554,22 @@ static void sender_loop() { note_fault(g_sender_faults, g_sender_last_error, g_sender_last_error_utc, g_sender_last_error_ms, FaultType::MeterRead); display_set_last_error(g_sender_last_error, g_sender_last_error_utc, g_sender_last_error_ms); } - read_battery(data); + if (g_build_count == 0) { + update_battery_cache(); + } + data.battery_voltage_v = g_last_battery_voltage_v; + data.battery_percent = g_last_battery_percent; uint32_t now_utc = time_get_utc(); data.ts_utc = now_utc > 0 ? now_utc : millis() / 1000; data.valid = meter_ok; - push_meter_sample(data); + g_last_sample_ts_utc = data.ts_utc; + g_build_samples[g_build_count++] = data; + if (g_build_count >= METER_BATCH_MAX_SAMPLES) { + batch_queue_enqueue(g_build_samples, g_build_count); + g_build_count = 0; + } display_set_last_meter(data); display_set_last_read(meter_ok, data.ts_utc); } @@ -492,18 +583,13 @@ static void sender_loop() { if (lora_receive(rx, 0) && rx.protocol_version == PROTOCOL_VERSION) { if (rx.payload_type == PayloadType::TimeSync) { time_handle_timesync_payload(rx.payload, rx.payload_len); - } else if (rx.payload_type == PayloadType::Ack && rx.payload_len >= 2) { + } else if (rx.payload_type == PayloadType::Ack && rx.payload_len >= 6 && rx.role == DeviceRole::Receiver) { uint16_t ack_id = read_u16_le(rx.payload); - if (g_batch_ack_pending && ack_id == g_last_sent_batch_id) { - g_batch_ack_pending = false; - g_batch_retry_count = 0; - if (g_inflight_count >= g_meter_sample_count) { - g_meter_sample_count = 0; - } else { - g_meter_sample_count -= static_cast(g_inflight_count); - } - g_inflight_count = 0; - g_inflight_batch_id = 0; + uint16_t ack_sender = read_u16_le(&rx.payload[2]); + uint16_t ack_receiver = read_u16_le(&rx.payload[4]); + if (ack_sender == g_short_id && ack_receiver == rx.device_id_short && + g_batch_ack_pending && ack_id == g_last_sent_batch_id) { + finish_inflight_batch(); } } } @@ -513,15 +599,15 @@ static void sender_loop() { g_batch_retry_count++; resend_inflight_batch(last_sample_ts()); } else { - g_batch_ack_pending = false; - g_batch_retry_count = 0; - if (g_inflight_count >= g_meter_sample_count) { - g_meter_sample_count = 0; + if (BATCH_RETRY_POLICY == BatchRetryPolicy::Drop) { + finish_inflight_batch(); } else { - g_meter_sample_count -= static_cast(g_inflight_count); + g_batch_ack_pending = false; + g_batch_retry_count = 0; + g_inflight_active = false; + g_inflight_count = 0; + g_inflight_batch_id = 0; } - g_inflight_count = 0; - g_inflight_batch_id = 0; note_fault(g_sender_faults, g_sender_last_error, g_sender_last_error_utc, g_sender_last_error_ms, FaultType::LoraTx); display_set_last_error(g_sender_last_error, g_sender_last_error_utc, g_sender_last_error_ms); } @@ -609,9 +695,9 @@ static void receiver_loop() { } } bool duplicate = sender_idx >= 0 && g_last_batch_id_rx[sender_idx] == batch_id; - if (duplicate) { - send_batch_ack(batch_id); - } else if (jsonToMeterBatch(json, samples, METER_BATCH_MAX_SAMPLES, count)) { + if (duplicate) { + send_batch_ack(batch_id, pkt.device_id_short); + } else if (jsonToMeterBatch(json, samples, METER_BATCH_MAX_SAMPLES, count)) { if (sender_idx >= 0) { for (size_t s = 0; s < count; ++s) { samples[s].link_valid = true; @@ -637,7 +723,7 @@ static void receiver_loop() { g_sender_last_error_remote_utc[sender_idx], g_sender_last_error_remote_ms[sender_idx]); } g_last_batch_id_rx[sender_idx] = batch_id; - send_batch_ack(batch_id); + send_batch_ack(batch_id, pkt.device_id_short); } } else { note_fault(g_receiver_faults, g_receiver_last_error, g_receiver_last_error_utc, g_receiver_last_error_ms, FaultType::Decode);