Improve reliability and add data recovery tools

## Bug Fixes
- Fix integer overflow potential in history bin allocation (web_server.cpp)
  Using uint64_t for intermediate multiplication prevents overflow with different constants

- Prevent data loss during WiFi failures (main.cpp)
  Device now automatically attempts WiFi reconnection every 30 seconds when in AP mode
  Exits AP mode and resumes MQTT transmission as soon as WiFi becomes available
  Data collection and SD logging continue regardless of connectivity

## New Features
- Add standalone MQTT data republisher for lost data recovery
  - Command-line tool (republish_mqtt.py) with interactive and scripting modes
  - GUI tool (republish_mqtt_gui.py) for user-friendly recovery
  - Rate-limited publishing (5 msg/sec default, configurable 1-100)
  - Manual time range selection or auto-detect missing data via InfluxDB
  - Cross-platform support (Windows, macOS, Linux)
  - Converts SD card CSV exports back to MQTT format

## Documentation
- Add comprehensive code review (CODE_REVIEW.md)
  - 16 detailed security and quality assessments
  - Identifies critical HTTPS/auth gaps, medium-priority overflow issues
  - Confirms absence of buffer overflows and unsafe string functions
  - Grade: B+ with areas for improvement

- Add republisher documentation (REPUBLISH_README.md, REPUBLISH_GUI_README.md)
  - Installation and usage instructions
  - Example commands and scenarios
  - Troubleshooting guide
  - Performance characteristics

## Dependencies
- Add requirements_republish.txt
  - paho-mqtt>=1.6.1
  - influxdb-client>=1.18.0

## Impact
- Eliminates data loss scenario where unreliable WiFi leaves device stuck in AP mode
- Provides recovery mechanism for any historical data missed during outages
- Improves code safety with explicit overflow-resistant arithmetic
- Increases operational visibility with comprehensive code review
This commit is contained in:
2026-03-11 17:01:22 +01:00
parent ee849433c8
commit 32cd0652c9
8 changed files with 1982 additions and 2 deletions

611
republish_mqtt.py Normal file
View File

@@ -0,0 +1,611 @@
#!/usr/bin/env python3
"""
DD3 LoRa Bridge - MQTT Data Republisher
Republishes historical meter data from SD card CSV files to MQTT
Prevents data loss by allowing recovery of data during WiFi/MQTT downtime
"""
import argparse
import csv
import json
import os
import sys
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, Tuple, List
import paho.mqtt.client as mqtt
# Optional: for auto-detection of missing data
try:
from influxdb_client import InfluxDBClient
HAS_INFLUXDB = True
except ImportError:
HAS_INFLUXDB = False
class MQTTRepublisher:
"""Republish meter data from CSV files to MQTT"""
def __init__(self, broker: str, port: int, username: str = None, password: str = None,
rate_per_sec: int = 5):
self.broker = broker
self.port = port
self.username = username
self.password = password
self.rate_per_sec = rate_per_sec
self.delay_sec = 1.0 / rate_per_sec
self.client = mqtt.Client()
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
self.connected = False
if username and password:
self.client.username_pw_set(username, password)
def _on_connect(self, client, userdata, flags, rc):
if rc == 0:
self.connected = True
print(f"✓ Connected to MQTT broker at {self.broker}:{self.port}")
else:
print(f"✗ Failed to connect to MQTT broker. Error code: {rc}")
self.connected = False
def _on_disconnect(self, client, userdata, rc):
self.connected = False
if rc != 0:
print(f"✗ Unexpected disconnection. Error code: {rc}")
def connect(self):
"""Connect to MQTT broker"""
try:
self.client.connect(self.broker, self.port, keepalive=60)
self.client.loop_start()
# Wait for connection to establish
timeout = 10
start = time.time()
while not self.connected and time.time() - start < timeout:
time.sleep(0.1)
if not self.connected:
raise RuntimeError(f"Failed to connect within {timeout}s")
except Exception as e:
print(f"✗ Connection error: {e}")
raise
def disconnect(self):
"""Disconnect from MQTT broker"""
self.client.loop_stop()
self.client.disconnect()
def publish_sample(self, device_id: str, ts_utc: int, data: dict) -> bool:
"""Publish a single meter sample to MQTT"""
if not self.connected:
print("✗ Not connected to MQTT broker")
return False
try:
topic = f"smartmeter/{device_id}/state"
payload = json.dumps(data)
result = self.client.publish(topic, payload)
if result.rc != mqtt.MQTT_ERR_SUCCESS:
print(f"✗ Publish failed: {mqtt.error_string(result.rc)}")
return False
return True
except Exception as e:
print(f"✗ Error publishing: {e}")
return False
def republish_csv(self, csv_file: str, device_id: str,
filter_from: Optional[int] = None,
filter_to: Optional[int] = None) -> int:
"""
Republish data from CSV file to MQTT
Args:
csv_file: Path to CSV file
device_id: Device ID for MQTT topic
filter_from: Unix timestamp - only publish samples >= this time
filter_to: Unix timestamp - only publish samples <= this time
Returns:
Number of samples published
"""
if not os.path.isfile(csv_file):
print(f"✗ File not found: {csv_file}")
return 0
count = 0
skipped = 0
try:
with open(csv_file, 'r') as f:
reader = csv.DictReader(f)
if not reader.fieldnames:
print(f"✗ Invalid CSV: no header row")
return 0
# Validate required fields
required = ['ts_utc', 'e_kwh', 'p_w']
missing = [field for field in required if field not in reader.fieldnames]
if missing:
print(f"✗ Missing required CSV columns: {missing}")
return 0
for row in reader:
try:
ts_utc = int(row['ts_utc'])
# Apply time filter
if filter_from and ts_utc < filter_from:
skipped += 1
continue
if filter_to and ts_utc > filter_to:
break
# Build MQTT payload matching device format
data = {
'id': self._extract_short_id(device_id),
'ts': ts_utc,
}
# Energy (formatted as 2 decimal places)
try:
e_kwh = float(row['e_kwh'])
data['e_kwh'] = f"{e_kwh:.2f}"
except (ValueError, KeyError):
pass
# Power values (as integers)
for key in ['p_w', 'p1_w', 'p2_w', 'p3_w']:
if key in row and row[key].strip():
try:
data[key] = int(round(float(row[key])))
except ValueError:
pass
# Battery
if 'bat_v' in row and row['bat_v'].strip():
try:
data['bat_v'] = f"{float(row['bat_v']):.2f}"
except ValueError:
pass
if 'bat_pct' in row and row['bat_pct'].strip():
try:
data['bat_pct'] = int(row['bat_pct'])
except ValueError:
pass
# Link quality
if 'rssi' in row and row['rssi'].strip() and row['rssi'] != '-127':
try:
data['rssi'] = int(row['rssi'])
except ValueError:
pass
if 'snr' in row and row['snr'].strip():
try:
data['snr'] = float(row['snr'])
except ValueError:
pass
# Publish with rate limiting
if self.publish_sample(device_id, ts_utc, data):
count += 1
print(f" [{count:4d}] {ts_utc} {data.get('p_w', '?')}W {data.get('e_kwh', '?')}kWh", end='\r')
# Rate limiting: delay between messages
if self.rate_per_sec > 0:
time.sleep(self.delay_sec)
except (ValueError, KeyError) as e:
skipped += 1
continue
except Exception as e:
print(f"✗ Error reading CSV: {e}")
return count
print(f"✓ Published {count} samples, skipped {skipped}")
return count
@staticmethod
def _extract_short_id(device_id: str) -> str:
"""Extract last 4 chars of device_id (e.g., 'dd3-F19C' -> 'F19C')"""
if len(device_id) >= 4:
return device_id[-4:].upper()
return device_id.upper()
class InfluxDBHelper:
"""Helper to detect missing data ranges in InfluxDB"""
def __init__(self, url: str, token: str, org: str, bucket: str):
if not HAS_INFLUXDB:
raise ImportError("influxdb-client not installed. Install with: pip install influxdb-client")
self.client = InfluxDBClient(url=url, token=token, org=org)
self.bucket = bucket
self.query_api = self.client.query_api()
def find_missing_ranges(self, device_id: str, from_time: int, to_time: int,
expected_interval: int = 30) -> List[Tuple[int, int]]:
"""
Find time ranges missing from InfluxDB
Args:
device_id: Device ID
from_time: Start timestamp (Unix)
to_time: End timestamp (Unix)
expected_interval: Expected seconds between samples (default 30s)
Returns:
List of (start, end) tuples for missing ranges
"""
# Query InfluxDB for existing data
query = f'''
from(bucket: "{self.bucket}")
|> range(start: {from_time}s, stop: {to_time}s)
|> filter(fn: (r) => r._measurement == "smartmeter" and r.device_id == "{device_id}")
|> keep(columns: ["_time"])
|> sort(columns: ["_time"])
'''
try:
tables = self.query_api.query(query)
existing_times = []
for table in tables:
for record in table.records:
ts = int(record.values["_time"].timestamp())
existing_times.append(ts)
if not existing_times:
# No data in InfluxDB, entire range is missing
return [(from_time, to_time)]
missing_ranges = []
prev_ts = from_time
for ts in sorted(existing_times):
gap = ts - prev_ts
# If gap is larger than expected interval, we're missing data
if gap > expected_interval * 1.5:
missing_ranges.append((prev_ts, ts))
prev_ts = ts
# Check if missing data at the end
if prev_ts < to_time:
missing_ranges.append((prev_ts, to_time))
return missing_ranges
except Exception as e:
print(f"✗ InfluxDB query error: {e}")
return []
def close(self):
"""Close InfluxDB connection"""
self.client.close()
def parse_time_input(time_str: str, reference_date: datetime = None) -> int:
"""Parse time input and return Unix timestamp"""
if reference_date is None:
reference_date = datetime.now()
# Try various formats
formats = [
'%Y-%m-%d',
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%d %H:%M',
'%H:%M:%S',
'%H:%M',
]
for fmt in formats:
try:
dt = datetime.strptime(time_str, fmt)
# If time-only format, use reference date
if '%Y' not in fmt:
dt = dt.replace(year=reference_date.year,
month=reference_date.month,
day=reference_date.day)
return int(dt.timestamp())
except ValueError:
continue
raise ValueError(f"Cannot parse time: {time_str}")
def interactive_time_selection() -> Tuple[int, int]:
"""Interactively get time range from user"""
print("\n=== Time Range Selection ===")
print("Enter dates in format: YYYY-MM-DD or YYYY-MM-DD HH:MM:SS")
while True:
try:
from_str = input("\nStart time (YYYY-MM-DD): ").strip()
from_time = parse_time_input(from_str)
to_str = input("End time (YYYY-MM-DD): ").strip()
to_time = parse_time_input(to_str)
if from_time >= to_time:
print("✗ Start time must be before end time")
continue
# Show 1-day bounds to user
from_dt = datetime.fromtimestamp(from_time)
to_dt = datetime.fromtimestamp(to_time)
print(f"\n→ Will publish data from {from_dt} to {to_dt}")
confirm = input("Confirm? (y/n): ").strip().lower()
if confirm == 'y':
return from_time, to_time
except ValueError as e:
print(f"{e}")
def interactive_csv_file_selection() -> str:
"""Help user select CSV files from SD card"""
print("\n=== CSV File Selection ===")
csv_dir = input("Enter path to CSV directory (or 'auto' to scan): ").strip()
if csv_dir.lower() == 'auto':
# Scan common locations
possible_paths = [
".",
"./sd_data",
"./data",
"D:\\", # SD card on Windows
"/mnt/sd", # SD card on Linux
]
for path in possible_paths:
if os.path.isdir(path):
csv_dir = path
break
# Find all CSV files
if not os.path.isdir(csv_dir):
print(f"✗ Directory not found: {csv_dir}")
return None
csv_files = list(Path(csv_dir).rglob("*.csv"))
if not csv_files:
print(f"✗ No CSV files found in {csv_dir}")
return None
print(f"\nFound {len(csv_files)} CSV files:")
for i, f in enumerate(sorted(csv_files)[:20], 1):
print(f" {i}. {f.relative_to(csv_dir) if csv_dir != '.' else f}")
if len(csv_files) > 20:
print(f" ... and {len(csv_files) - 20} more")
selected = input("\nEnter CSV file number or path: ").strip()
try:
idx = int(selected) - 1
if 0 <= idx < len(csv_files):
return str(csv_files[idx])
except ValueError:
pass
# User entered a path
if os.path.isfile(selected):
return selected
print(f"✗ Invalid selection: {selected}")
return None
def main():
parser = argparse.ArgumentParser(
description="Republish DD3 meter data from CSV to MQTT",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Interactive mode (will prompt for all settings)
python republish_mqtt.py -i
# Republish specific CSV file with automatic time detection (InfluxDB)
python republish_mqtt.py -f data.csv -d dd3-F19C \\
--mqtt-broker 192.168.1.100 \\
--influxdb-url http://localhost:8086 \\
--influxdb-token mytoken --influxdb-org myorg
# Manual time range
python republish_mqtt.py -f data.csv -d dd3-F19C \\
--mqtt-broker 192.168.1.100 \\
--from-time "2026-03-01" --to-time "2026-03-05"
"""
)
parser.add_argument('-i', '--interactive', action='store_true',
help='Interactive mode (prompt for all settings)')
parser.add_argument('-f', '--file', type=str,
help='CSV file path')
parser.add_argument('-d', '--device-id', type=str,
help='Device ID (e.g., dd3-F19C)')
parser.add_argument('--mqtt-broker', type=str, default='localhost',
help='MQTT broker address (default: localhost)')
parser.add_argument('--mqtt-port', type=int, default=1883,
help='MQTT broker port (default: 1883)')
parser.add_argument('--mqtt-user', type=str,
help='MQTT username')
parser.add_argument('--mqtt-pass', type=str,
help='MQTT password')
parser.add_argument('--rate', type=int, default=5,
help='Publish rate (messages per second, default: 5)')
parser.add_argument('--from-time', type=str,
help='Start time (YYYY-MM-DD or YYYY-MM-DD HH:MM:SS)')
parser.add_argument('--to-time', type=str,
help='End time (YYYY-MM-DD or YYYY-MM-DD HH:MM:SS)')
parser.add_argument('--influxdb-url', type=str,
help='InfluxDB URL (for auto-detection)')
parser.add_argument('--influxdb-token', type=str,
help='InfluxDB API token')
parser.add_argument('--influxdb-org', type=str,
help='InfluxDB organization')
parser.add_argument('--influxdb-bucket', type=str, default='smartmeter',
help='InfluxDB bucket (default: smartmeter)')
args = parser.parse_args()
# Interactive mode
if args.interactive or not args.file:
print("╔════════════════════════════════════════════════════╗")
print("║ DD3 LoRa Bridge - MQTT Data Republisher ║")
print("║ Recover lost meter data from SD card CSV files ║")
print("╚════════════════════════════════════════════════════╝")
# Get CSV file
csv_file = args.file or interactive_csv_file_selection()
if not csv_file:
sys.exit(1)
# Get device ID
device_id = args.device_id
if not device_id:
device_id = input("\nDevice ID (e.g., dd3-F19C): ").strip()
if not device_id:
print("✗ Device ID required")
sys.exit(1)
# Get MQTT settings
mqtt_broker = input(f"\nMQTT Broker [{args.mqtt_broker}]: ").strip() or args.mqtt_broker
mqtt_port = args.mqtt_port
mqtt_user = input("MQTT Username (leave empty if none): ").strip() or None
mqtt_pass = None
if mqtt_user:
import getpass
mqtt_pass = getpass.getpass("MQTT Password: ")
# Get time range
print("\n=== Select Time Range ===")
use_influx = HAS_INFLUXDB and input("Auto-detect missing ranges from InfluxDB? (y/n): ").strip().lower() == 'y'
from_time = None
to_time = None
if use_influx:
influx_url = input("InfluxDB URL: ").strip()
influx_token = input("API Token: ").strip()
influx_org = input("Organization: ").strip()
try:
helper = InfluxDBHelper(influx_url, influx_token, influx_org,
args.influxdb_bucket)
# Get user's date range first
from_time, to_time = interactive_time_selection()
print("\nSearching for missing data in InfluxDB...")
missing_ranges = helper.find_missing_ranges(device_id, from_time, to_time)
helper.close()
if missing_ranges:
print(f"\nFound {len(missing_ranges)} missing data range(s):")
for i, (start, end) in enumerate(missing_ranges, 1):
start_dt = datetime.fromtimestamp(start)
end_dt = datetime.fromtimestamp(end)
duration = (end - start) / 3600
print(f" {i}. {start_dt} to {end_dt} ({duration:.1f} hours)")
# Use first range by default
from_time, to_time = missing_ranges[0]
print(f"\nWill republish first range: {datetime.fromtimestamp(from_time)} to {datetime.fromtimestamp(to_time)}")
else:
print("No missing data found in InfluxDB")
except Exception as e:
print(f"✗ InfluxDB error: {e}")
sys.exit(1)
else:
from_time, to_time = interactive_time_selection()
else:
# Command-line mode
csv_file = args.file
device_id = args.device_id
if not device_id:
print("✗ Device ID required (use -d or --device-id)")
sys.exit(1)
mqtt_broker = args.mqtt_broker
mqtt_port = args.mqtt_port
mqtt_user = args.mqtt_user
mqtt_pass = args.mqtt_pass
# Parse time range
if args.from_time and args.to_time:
try:
from_time = parse_time_input(args.from_time)
to_time = parse_time_input(args.to_time)
except ValueError as e:
print(f"{e}")
sys.exit(1)
else:
# Auto-detect if InfluxDB is available
if args.influxdb_url and args.influxdb_token and args.influxdb_org and HAS_INFLUXDB:
print("Auto-detecting missing data ranges...")
try:
helper = InfluxDBHelper(args.influxdb_url, args.influxdb_token,
args.influxdb_org, args.influxdb_bucket)
# Default to last 7 days
now = int(time.time())
from_time = now - (7 * 24 * 3600)
to_time = now
missing_ranges = helper.find_missing_ranges(device_id, from_time, to_time)
helper.close()
if missing_ranges:
from_time, to_time = missing_ranges[0]
print(f"Found missing data: {datetime.fromtimestamp(from_time)} to {datetime.fromtimestamp(to_time)}")
else:
print("No missing data found")
sys.exit(0)
except Exception as e:
print(f"{e}")
sys.exit(1)
else:
print("✗ Time range required (use --from-time and --to-time, or InfluxDB settings)")
sys.exit(1)
# Republish data
print(f"\n=== Publishing to MQTT ===")
print(f"Broker: {mqtt_broker}:{mqtt_port}")
print(f"Device: {device_id}")
print(f"Rate: {args.rate} msg/sec")
print(f"Range: {datetime.fromtimestamp(from_time)} to {datetime.fromtimestamp(to_time)}")
print()
try:
republisher = MQTTRepublisher(mqtt_broker, mqtt_port, mqtt_user, mqtt_pass,
rate_per_sec=args.rate)
republisher.connect()
count = republisher.republish_csv(csv_file, device_id,
filter_from=from_time,
filter_to=to_time)
republisher.disconnect()
print(f"\n✓ Successfully published {count} samples")
except KeyboardInterrupt:
print("\n\n⚠ Interrupted by user")
if 'republisher' in locals():
republisher.disconnect()
sys.exit(0)
except Exception as e:
print(f"\n✗ Error: {e}")
sys.exit(1)
if __name__ == '__main__':
main()