Switch LoRa batch payload to present-mask schema v3

BREAKING CHANGE: schema v2 is no longer supported.

Replaces fixed dt_s timing with a 30-bit present_mask while keeping MQTT JSON unchanged.
This commit is contained in:
2026-02-13 23:18:56 +01:00
parent e83bc11dea
commit 2a6ef0eb5c
3 changed files with 143 additions and 43 deletions

View File

@@ -142,6 +142,15 @@ static void serial_debug_printf(const char *fmt, ...) {
Serial.println(buf);
}
static uint8_t bit_count32(uint32_t value) {
uint8_t count = 0;
while (value != 0) {
value &= (value - 1);
count++;
}
return count;
}
static void set_last_meter_sample(const MeterData &parsed, uint32_t rx_ms) {
g_last_meter_data = parsed;
g_last_meter_valid = true;
@@ -702,9 +711,8 @@ static bool send_inflight_batch(uint32_t ts_for_display) {
input.batch_id = g_inflight_batch_id;
input.t_last = g_inflight_sync_request ? time_get_utc() :
g_inflight_samples[g_inflight_count - 1].ts_utc;
uint32_t dt_s = METER_SAMPLE_INTERVAL_MS / 1000;
input.dt_s = dt_s > 0 ? static_cast<uint8_t>(dt_s) : 1;
input.n = g_inflight_sync_request ? 0 : g_inflight_count;
input.present_mask = 0;
input.n = 0;
input.battery_mV = g_inflight_sync_request ? battery_mv_from_voltage(g_last_battery_voltage_v) :
battery_mv_from_voltage(g_inflight_samples[g_inflight_count - 1].battery_voltage_v);
input.err_m = g_sender_faults.meter_read_fail > 255 ? 255 : static_cast<uint8_t>(g_sender_faults.meter_read_fail);
@@ -714,20 +722,50 @@ static bool send_inflight_batch(uint32_t ts_for_display) {
input.err_rx_reject = static_cast<uint8_t>(g_sender_rx_reject_reason);
uint8_t energy_regressions = 0;
uint8_t phase_clamps = 0;
for (uint8_t i = 0; i < input.n; ++i) {
input.energy_wh[i] = kwh_to_wh_from_float(g_inflight_samples[i].energy_total_kwh);
if (i > 0 && input.energy_wh[i] < input.energy_wh[i - 1]) {
input.energy_wh[i] = input.energy_wh[i - 1];
if (energy_regressions < 255) {
energy_regressions++;
uint8_t ts_dropped = 0;
uint8_t ts_collapsed = 0;
if (!g_inflight_sync_request) {
if (input.t_last < static_cast<uint32_t>(METER_BATCH_MAX_SAMPLES - 1)) {
g_last_tx_build_error = TxBuildError::Encode;
return false;
}
const uint32_t window_start = input.t_last - static_cast<uint32_t>(METER_BATCH_MAX_SAMPLES - 1);
MeterData slot_samples[METER_BATCH_MAX_SAMPLES];
bool slot_used[METER_BATCH_MAX_SAMPLES] = {};
for (uint8_t i = 0; i < g_inflight_count; ++i) {
const MeterData &sample = g_inflight_samples[i];
if (sample.ts_utc < window_start || sample.ts_utc > input.t_last) {
if (ts_dropped < 255) {
ts_dropped++;
}
continue;
}
uint8_t slot = static_cast<uint8_t>(sample.ts_utc - window_start);
if (slot_used[slot] && ts_collapsed < 255) {
ts_collapsed++;
}
slot_used[slot] = true;
slot_samples[slot] = sample;
}
for (uint8_t slot = 0; slot < METER_BATCH_MAX_SAMPLES; ++slot) {
if (!slot_used[slot]) {
continue;
}
const uint8_t out_idx = input.n;
if (out_idx >= METER_BATCH_MAX_SAMPLES) {
g_last_tx_build_error = TxBuildError::Encode;
return false;
}
input.present_mask |= (1UL << slot);
input.n++;
input.energy_wh[out_idx] = kwh_to_wh_from_float(slot_samples[slot].energy_total_kwh);
bool c1 = false;
bool c2 = false;
bool c3 = false;
input.p1_w[i] = float_to_i16_w_clamped(g_inflight_samples[i].phase_power_w[0], c1);
input.p2_w[i] = float_to_i16_w_clamped(g_inflight_samples[i].phase_power_w[1], c2);
input.p3_w[i] = float_to_i16_w_clamped(g_inflight_samples[i].phase_power_w[2], c3);
input.p1_w[out_idx] = float_to_i16_w_clamped(slot_samples[slot].phase_power_w[0], c1);
input.p2_w[out_idx] = float_to_i16_w_clamped(slot_samples[slot].phase_power_w[1], c2);
input.p3_w[out_idx] = float_to_i16_w_clamped(slot_samples[slot].phase_power_w[2], c3);
if (c1 && phase_clamps < 255) {
phase_clamps++;
}
@@ -738,11 +776,23 @@ static bool send_inflight_batch(uint32_t ts_for_display) {
phase_clamps++;
}
}
if (SERIAL_DEBUG_MODE && (energy_regressions > 0 || phase_clamps > 0)) {
serial_debug_printf("tx: sanitize batch_id=%u energy_regress=%u phase_clamps=%u",
}
for (uint8_t i = 0; i < input.n; ++i) {
if (i > 0 && input.energy_wh[i] < input.energy_wh[i - 1]) {
input.energy_wh[i] = input.energy_wh[i - 1];
if (energy_regressions < 255) {
energy_regressions++;
}
}
}
if (SERIAL_DEBUG_MODE && (energy_regressions > 0 || phase_clamps > 0 || ts_dropped > 0 || ts_collapsed > 0)) {
serial_debug_printf("tx: sanitize batch_id=%u energy_regress=%u phase_clamps=%u ts_drop=%u ts_dup=%u",
g_inflight_batch_id,
static_cast<unsigned>(energy_regressions),
static_cast<unsigned>(phase_clamps));
static_cast<unsigned>(phase_clamps),
static_cast<unsigned>(ts_dropped),
static_cast<unsigned>(ts_collapsed));
}
static uint8_t encoded[BATCH_MAX_COMPRESSED];
@@ -754,7 +804,10 @@ static bool send_inflight_batch(uint32_t ts_for_display) {
}
uint32_t encode_ms = millis() - encode_start;
if (SERIAL_DEBUG_MODE) {
serial_debug_printf("tx: batch_id=%u count=%u bin_len=%u", g_inflight_batch_id, input.n,
serial_debug_printf("tx: batch_id=%u count=%u mask=%08lX bin_len=%u",
g_inflight_batch_id,
static_cast<unsigned>(input.n),
static_cast<unsigned long>(input.present_mask),
static_cast<unsigned>(encoded_len));
if (encode_ms > 200) {
serial_debug_printf("tx: encode took %lums", static_cast<unsigned long>(encode_ms));
@@ -1306,28 +1359,36 @@ static void receiver_loop() {
display_set_last_error(g_receiver_last_error, g_receiver_last_error_utc, g_receiver_last_error_ms);
goto receiver_loop_done;
}
if (bit_count32(batch.present_mask) != batch.n) {
note_fault(g_receiver_faults, g_receiver_last_error, g_receiver_last_error_utc, g_receiver_last_error_ms, FaultType::Decode);
display_set_last_error(g_receiver_last_error, g_receiver_last_error_utc, g_receiver_last_error_ms);
goto receiver_loop_done;
}
size_t count = batch.n;
uint16_t short_id = pkt.device_id_short;
if (short_id == 0) {
short_id = short_id_from_sender_id(batch.sender_id);
}
uint64_t span = static_cast<uint64_t>(batch.dt_s) * static_cast<uint64_t>(count - 1);
if (batch.t_last < span || batch.t_last < MIN_ACCEPTED_EPOCH_UTC) {
note_fault(g_receiver_faults, g_receiver_last_error, g_receiver_last_error_utc, g_receiver_last_error_ms, FaultType::Decode);
display_set_last_error(g_receiver_last_error, g_receiver_last_error_utc, g_receiver_last_error_ms);
goto receiver_loop_done;
}
uint32_t t_first = batch.t_last - static_cast<uint32_t>(span);
if (t_first < MIN_ACCEPTED_EPOCH_UTC) {
if (batch.t_last < static_cast<uint32_t>(METER_BATCH_MAX_SAMPLES - 1) || batch.t_last < MIN_ACCEPTED_EPOCH_UTC) {
note_fault(g_receiver_faults, g_receiver_last_error, g_receiver_last_error_utc, g_receiver_last_error_ms, FaultType::Decode);
display_set_last_error(g_receiver_last_error, g_receiver_last_error_utc, g_receiver_last_error_ms);
goto receiver_loop_done;
}
const uint32_t window_start = batch.t_last - static_cast<uint32_t>(METER_BATCH_MAX_SAMPLES - 1);
MeterData samples[METER_BATCH_MAX_SAMPLES];
float bat_v = batch.battery_mV > 0 ? static_cast<float>(batch.battery_mV) / 1000.0f : NAN;
for (size_t s = 0; s < count; ++s) {
size_t s = 0;
for (uint8_t slot = 0; slot < METER_BATCH_MAX_SAMPLES; ++slot) {
if ((batch.present_mask & (1UL << slot)) == 0) {
continue;
}
if (s >= count) {
note_fault(g_receiver_faults, g_receiver_last_error, g_receiver_last_error_utc, g_receiver_last_error_ms, FaultType::Decode);
display_set_last_error(g_receiver_last_error, g_receiver_last_error_utc, g_receiver_last_error_ms);
goto receiver_loop_done;
}
MeterData &data = samples[s];
data = {};
data.short_id = short_id;
@@ -1336,7 +1397,12 @@ static void receiver_loop() {
} else {
snprintf(data.device_id, sizeof(data.device_id), "dd3-0000");
}
data.ts_utc = t_first + static_cast<uint32_t>(s) * batch.dt_s;
data.ts_utc = window_start + static_cast<uint32_t>(slot);
if (data.ts_utc < MIN_ACCEPTED_EPOCH_UTC) {
note_fault(g_receiver_faults, g_receiver_last_error, g_receiver_last_error_utc, g_receiver_last_error_ms, FaultType::Decode);
display_set_last_error(g_receiver_last_error, g_receiver_last_error_utc, g_receiver_last_error_ms);
goto receiver_loop_done;
}
data.energy_total_kwh = static_cast<float>(batch.energy_wh[s]) / 1000.0f;
data.phase_power_w[0] = static_cast<float>(batch.p1_w[s]);
data.phase_power_w[1] = static_cast<float>(batch.p2_w[s]);
@@ -1354,6 +1420,12 @@ static void receiver_loop() {
data.last_error = static_cast<FaultType>(batch.err_last);
data.rx_reject_reason = batch.err_rx_reject;
sd_logger_log_sample(data, (s + 1 == count) && data.last_error != FaultType::None);
s++;
}
if (s != count) {
note_fault(g_receiver_faults, g_receiver_last_error, g_receiver_last_error_utc, g_receiver_last_error_ms, FaultType::Decode);
display_set_last_error(g_receiver_last_error, g_receiver_last_error_utc, g_receiver_last_error_ms);
goto receiver_loop_done;
}
if (sender_idx >= 0) {

View File

@@ -2,9 +2,11 @@
#include <limits.h>
static constexpr uint16_t kMagic = 0xDDB3;
static constexpr uint8_t kSchema = 2;
// Breaking change: schema v3 replaces fixed dt_s spacing with a 30-bit present_mask.
static constexpr uint8_t kSchema = 3;
static constexpr uint8_t kFlags = 0x01;
static constexpr size_t kMaxSamples = 30;
static constexpr uint32_t kPresentMaskValidBits = 0x3FFFFFFFUL;
static void write_u16_le(uint8_t *dst, uint16_t value) {
dst[0] = static_cast<uint8_t>(value & 0xFF);
@@ -97,6 +99,15 @@ static bool ensure_capacity(size_t needed, size_t cap, size_t pos) {
return pos + needed <= cap;
}
static uint8_t bit_count32(uint32_t value) {
uint8_t count = 0;
while (value != 0) {
value &= (value - 1);
count++;
}
return count;
}
bool encode_batch(const BatchInput &in, uint8_t *out, size_t out_cap, size_t *out_len) {
if (!out || !out_len) {
return false;
@@ -104,11 +115,17 @@ bool encode_batch(const BatchInput &in, uint8_t *out, size_t out_cap, size_t *ou
if (in.n > kMaxSamples) {
return false;
}
if (in.dt_s == 0) {
if ((in.present_mask & ~kPresentMaskValidBits) != 0) {
return false;
}
if (bit_count32(in.present_mask) != in.n) {
return false;
}
if (in.n == 0 && in.present_mask != 0) {
return false;
}
size_t pos = 0;
if (!ensure_capacity(21, out_cap, pos)) {
if (!ensure_capacity(24, out_cap, pos)) {
return false;
}
write_u16_le(&out[pos], kMagic);
@@ -121,7 +138,8 @@ bool encode_batch(const BatchInput &in, uint8_t *out, size_t out_cap, size_t *ou
pos += 2;
write_u32_le(&out[pos], in.t_last);
pos += 4;
out[pos++] = in.dt_s;
write_u32_le(&out[pos], in.present_mask);
pos += 4;
out[pos++] = in.n;
write_u16_le(&out[pos], in.battery_mV);
pos += 2;
@@ -189,7 +207,7 @@ bool decode_batch(const uint8_t *buf, size_t len, BatchInput *out) {
return false;
}
size_t pos = 0;
if (len < 21) {
if (len < 24) {
return false;
}
uint16_t magic = read_u16_le(&buf[pos]);
@@ -205,7 +223,8 @@ bool decode_batch(const uint8_t *buf, size_t len, BatchInput *out) {
pos += 2;
out->t_last = read_u32_le(&buf[pos]);
pos += 4;
out->dt_s = buf[pos++];
out->present_mask = read_u32_le(&buf[pos]);
pos += 4;
out->n = buf[pos++];
out->battery_mV = read_u16_le(&buf[pos]);
pos += 2;
@@ -215,7 +234,16 @@ bool decode_batch(const uint8_t *buf, size_t len, BatchInput *out) {
out->err_last = buf[pos++];
out->err_rx_reject = buf[pos++];
if (out->n > kMaxSamples || out->dt_s == 0) {
if (out->n > kMaxSamples) {
return false;
}
if ((out->present_mask & ~kPresentMaskValidBits) != 0) {
return false;
}
if (bit_count32(out->present_mask) != out->n) {
return false;
}
if (out->n == 0 && out->present_mask != 0) {
return false;
}
if (out->n == 0) {
@@ -292,7 +320,7 @@ bool payload_codec_self_test() {
in.sender_id = 1;
in.batch_id = 42;
in.t_last = 1700000000;
in.dt_s = 1;
in.present_mask = (1UL << 0) | (1UL << 2) | (1UL << 3) | (1UL << 10) | (1UL << 29);
in.n = 5;
in.battery_mV = 3750;
in.err_m = 2;
@@ -335,7 +363,7 @@ bool payload_codec_self_test() {
}
if (out.sender_id != in.sender_id || out.batch_id != in.batch_id || out.t_last != in.t_last ||
out.dt_s != in.dt_s || out.n != in.n || out.battery_mV != in.battery_mV ||
out.present_mask != in.present_mask || out.n != in.n || out.battery_mV != in.battery_mV ||
out.err_m != in.err_m || out.err_d != in.err_d || out.err_tx != in.err_tx || out.err_last != in.err_last ||
out.err_rx_reject != in.err_rx_reject) {
Serial.println("payload_codec_self_test: header mismatch");

View File

@@ -6,7 +6,7 @@ struct BatchInput {
uint16_t sender_id;
uint16_t batch_id;
uint32_t t_last;
uint8_t dt_s;
uint32_t present_mask;
uint8_t n;
uint16_t battery_mV;
uint8_t err_m;