Keep in-flight batch until ACK
This commit is contained in:
72
src/main.cpp
72
src/main.cpp
@@ -62,6 +62,9 @@ static uint16_t g_batch_id = 1;
|
||||
static uint16_t g_last_sent_batch_id = 0;
|
||||
static uint8_t g_batch_retry_count = 0;
|
||||
static bool g_batch_ack_pending = false;
|
||||
static MeterData g_inflight_samples[METER_BATCH_MAX_SAMPLES];
|
||||
static size_t g_inflight_count = 0;
|
||||
static uint16_t g_inflight_batch_id = 0;
|
||||
|
||||
static uint16_t g_last_batch_id_rx[NUM_SENDERS] = {};
|
||||
|
||||
@@ -207,7 +210,7 @@ static bool inject_batch_meta(String &json, int16_t rssi_dbm, float snr_db, uint
|
||||
return serializeJson(doc, json) > 0;
|
||||
}
|
||||
|
||||
static bool send_batch_payload(const uint8_t *data, size_t len, uint32_t ts_for_display) {
|
||||
static bool send_batch_payload(const uint8_t *data, size_t len, uint32_t ts_for_display, uint16_t batch_id) {
|
||||
if (!data || len == 0 || len > BATCH_MAX_COMPRESSED) {
|
||||
return false;
|
||||
}
|
||||
@@ -231,7 +234,7 @@ static bool send_batch_payload(const uint8_t *data, size_t len, uint32_t ts_for_
|
||||
pkt.payload_len = chunk_len + BATCH_HEADER_SIZE;
|
||||
|
||||
uint8_t *payload = pkt.payload;
|
||||
write_u16_le(&payload[0], g_batch_id);
|
||||
write_u16_le(&payload[0], batch_id);
|
||||
payload[2] = i;
|
||||
payload[3] = chunk_count;
|
||||
write_u16_le(&payload[4], static_cast<uint16_t>(len));
|
||||
@@ -247,9 +250,6 @@ static bool send_batch_payload(const uint8_t *data, size_t len, uint32_t ts_for_
|
||||
delay(10);
|
||||
}
|
||||
|
||||
if (all_ok) {
|
||||
g_batch_id++;
|
||||
}
|
||||
display_set_last_tx(all_ok, ts_for_display);
|
||||
return all_ok;
|
||||
}
|
||||
@@ -272,8 +272,16 @@ static bool send_meter_batch(uint32_t ts_for_display) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < count; ++i) {
|
||||
g_inflight_samples[i] = ordered[i];
|
||||
}
|
||||
g_inflight_count = count;
|
||||
g_inflight_batch_id = g_batch_id;
|
||||
|
||||
String json;
|
||||
if (!meterBatchToJson(ordered, count, g_batch_id, json, &g_sender_faults, g_sender_last_error)) {
|
||||
if (!meterBatchToJson(g_inflight_samples, g_inflight_count, g_inflight_batch_id, json, &g_sender_faults, g_sender_last_error)) {
|
||||
g_inflight_count = 0;
|
||||
g_inflight_batch_id = 0;
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -283,11 +291,39 @@ static bool send_meter_batch(uint32_t ts_for_display) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool ok = send_batch_payload(compressed, compressed_len, ts_for_display);
|
||||
bool ok = send_batch_payload(compressed, compressed_len, ts_for_display, g_inflight_batch_id);
|
||||
g_last_batch_send_ms = millis();
|
||||
if (ok) {
|
||||
g_last_sent_batch_id = static_cast<uint16_t>(g_batch_id - 1);
|
||||
g_last_sent_batch_id = g_inflight_batch_id;
|
||||
g_batch_ack_pending = true;
|
||||
g_batch_id++;
|
||||
} else {
|
||||
g_inflight_count = 0;
|
||||
g_inflight_batch_id = 0;
|
||||
}
|
||||
return ok;
|
||||
}
|
||||
|
||||
static bool resend_inflight_batch(uint32_t ts_for_display) {
|
||||
if (!g_batch_ack_pending || g_inflight_count == 0) {
|
||||
return false;
|
||||
}
|
||||
String json;
|
||||
if (!meterBatchToJson(g_inflight_samples, g_inflight_count, g_inflight_batch_id, json, &g_sender_faults, g_sender_last_error)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
static uint8_t compressed[BATCH_MAX_COMPRESSED];
|
||||
size_t compressed_len = 0;
|
||||
if (!compressBuffer(reinterpret_cast<const uint8_t *>(json.c_str()), json.length(), compressed, sizeof(compressed), compressed_len)) {
|
||||
g_inflight_count = 0;
|
||||
g_inflight_batch_id = 0;
|
||||
return false;
|
||||
}
|
||||
|
||||
bool ok = send_batch_payload(compressed, compressed_len, ts_for_display, g_inflight_batch_id);
|
||||
if (ok) {
|
||||
g_last_batch_send_ms = millis();
|
||||
}
|
||||
return ok;
|
||||
}
|
||||
@@ -461,8 +497,13 @@ static void sender_loop() {
|
||||
if (g_batch_ack_pending && ack_id == g_last_sent_batch_id) {
|
||||
g_batch_ack_pending = false;
|
||||
g_batch_retry_count = 0;
|
||||
g_meter_sample_count = 0;
|
||||
g_meter_sample_head = 0;
|
||||
if (g_inflight_count >= g_meter_sample_count) {
|
||||
g_meter_sample_count = 0;
|
||||
} else {
|
||||
g_meter_sample_count -= static_cast<uint8_t>(g_inflight_count);
|
||||
}
|
||||
g_inflight_count = 0;
|
||||
g_inflight_batch_id = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -470,12 +511,17 @@ static void sender_loop() {
|
||||
if (g_batch_ack_pending && (now_ms - g_last_batch_send_ms >= BATCH_ACK_TIMEOUT_MS)) {
|
||||
if (g_batch_retry_count < BATCH_MAX_RETRIES) {
|
||||
g_batch_retry_count++;
|
||||
send_meter_batch(last_sample_ts());
|
||||
resend_inflight_batch(last_sample_ts());
|
||||
} else {
|
||||
g_batch_ack_pending = false;
|
||||
g_batch_retry_count = 0;
|
||||
g_meter_sample_count = 0;
|
||||
g_meter_sample_head = 0;
|
||||
if (g_inflight_count >= g_meter_sample_count) {
|
||||
g_meter_sample_count = 0;
|
||||
} else {
|
||||
g_meter_sample_count -= static_cast<uint8_t>(g_inflight_count);
|
||||
}
|
||||
g_inflight_count = 0;
|
||||
g_inflight_batch_id = 0;
|
||||
note_fault(g_sender_faults, g_sender_last_error, g_sender_last_error_utc, g_sender_last_error_ms, FaultType::LoraTx);
|
||||
display_set_last_error(g_sender_last_error, g_sender_last_error_utc, g_sender_last_error_ms);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user