#!/usr/bin/env python3 """ DD3 LoRa Bridge - MQTT Data Republisher Republishes historical meter data from SD card CSV files to MQTT Prevents data loss by allowing recovery of data during WiFi/MQTT downtime """ import argparse import csv import json import os import sys import time from datetime import datetime, timedelta from pathlib import Path from typing import Optional, Tuple, List import paho.mqtt.client as mqtt # Optional: for auto-detection of missing data try: from influxdb_client import InfluxDBClient HAS_INFLUXDB = True except ImportError: HAS_INFLUXDB = False class MQTTRepublisher: """Republish meter data from CSV files to MQTT""" def __init__(self, broker: str, port: int, username: str = None, password: str = None, rate_per_sec: int = 5): self.broker = broker self.port = port self.username = username self.password = password self.rate_per_sec = rate_per_sec self.delay_sec = 1.0 / rate_per_sec self.client = mqtt.Client() self.client.on_connect = self._on_connect self.client.on_disconnect = self._on_disconnect self.connected = False if username and password: self.client.username_pw_set(username, password) def _on_connect(self, client, userdata, flags, rc): if rc == 0: self.connected = True print(f"✓ Connected to MQTT broker at {self.broker}:{self.port}") else: print(f"✗ Failed to connect to MQTT broker. Error code: {rc}") self.connected = False def _on_disconnect(self, client, userdata, rc): self.connected = False if rc != 0: print(f"✗ Unexpected disconnection. Error code: {rc}") def connect(self): """Connect to MQTT broker""" try: self.client.connect(self.broker, self.port, keepalive=60) self.client.loop_start() # Wait for connection to establish timeout = 10 start = time.time() while not self.connected and time.time() - start < timeout: time.sleep(0.1) if not self.connected: raise RuntimeError(f"Failed to connect within {timeout}s") except Exception as e: print(f"✗ Connection error: {e}") raise def disconnect(self): """Disconnect from MQTT broker""" self.client.loop_stop() self.client.disconnect() def publish_sample(self, device_id: str, ts_utc: int, data: dict) -> bool: """Publish a single meter sample to MQTT""" if not self.connected: print("✗ Not connected to MQTT broker") return False try: topic = f"smartmeter/{device_id}/state" payload = json.dumps(data) result = self.client.publish(topic, payload) if result.rc != mqtt.MQTT_ERR_SUCCESS: print(f"✗ Publish failed: {mqtt.error_string(result.rc)}") return False return True except Exception as e: print(f"✗ Error publishing: {e}") return False def republish_csv(self, csv_file: str, device_id: str, filter_from: Optional[int] = None, filter_to: Optional[int] = None) -> int: """ Republish data from CSV file to MQTT Args: csv_file: Path to CSV file device_id: Device ID for MQTT topic filter_from: Unix timestamp - only publish samples >= this time filter_to: Unix timestamp - only publish samples <= this time Returns: Number of samples published """ if not os.path.isfile(csv_file): print(f"✗ File not found: {csv_file}") return 0 count = 0 skipped = 0 try: with open(csv_file, 'r') as f: reader = csv.DictReader(f) if not reader.fieldnames: print(f"✗ Invalid CSV: no header row") return 0 # Validate required fields required = ['ts_utc', 'e_kwh', 'p_w'] missing = [field for field in required if field not in reader.fieldnames] if missing: print(f"✗ Missing required CSV columns: {missing}") return 0 for row in reader: try: ts_utc = int(row['ts_utc']) # Apply time filter if filter_from and ts_utc < filter_from: skipped += 1 continue if filter_to and ts_utc > filter_to: break # Build MQTT payload matching device format data = { 'id': self._extract_short_id(device_id), 'ts': ts_utc, } # Energy (formatted as 2 decimal places) try: e_kwh = float(row['e_kwh']) data['e_kwh'] = f"{e_kwh:.2f}" except (ValueError, KeyError): pass # Power values (as integers) for key in ['p_w', 'p1_w', 'p2_w', 'p3_w']: if key in row and row[key].strip(): try: data[key] = int(round(float(row[key]))) except ValueError: pass # Battery if 'bat_v' in row and row['bat_v'].strip(): try: data['bat_v'] = f"{float(row['bat_v']):.2f}" except ValueError: pass if 'bat_pct' in row and row['bat_pct'].strip(): try: data['bat_pct'] = int(row['bat_pct']) except ValueError: pass # Link quality if 'rssi' in row and row['rssi'].strip() and row['rssi'] != '-127': try: data['rssi'] = int(row['rssi']) except ValueError: pass if 'snr' in row and row['snr'].strip(): try: data['snr'] = float(row['snr']) except ValueError: pass # Publish with rate limiting if self.publish_sample(device_id, ts_utc, data): count += 1 print(f" [{count:4d}] {ts_utc} {data.get('p_w', '?')}W {data.get('e_kwh', '?')}kWh", end='\r') # Rate limiting: delay between messages if self.rate_per_sec > 0: time.sleep(self.delay_sec) except (ValueError, KeyError) as e: skipped += 1 continue except Exception as e: print(f"✗ Error reading CSV: {e}") return count print(f"✓ Published {count} samples, skipped {skipped}") return count @staticmethod def _extract_short_id(device_id: str) -> str: """Extract last 4 chars of device_id (e.g., 'dd3-F19C' -> 'F19C')""" if len(device_id) >= 4: return device_id[-4:].upper() return device_id.upper() class InfluxDBHelper: """Helper to detect missing data ranges in InfluxDB""" def __init__(self, url: str, token: str, org: str, bucket: str): if not HAS_INFLUXDB: raise ImportError("influxdb-client not installed. Install with: pip install influxdb-client") self.client = InfluxDBClient(url=url, token=token, org=org) self.bucket = bucket self.query_api = self.client.query_api() def find_missing_ranges(self, device_id: str, from_time: int, to_time: int, expected_interval: int = 30) -> List[Tuple[int, int]]: """ Find time ranges missing from InfluxDB Args: device_id: Device ID from_time: Start timestamp (Unix) to_time: End timestamp (Unix) expected_interval: Expected seconds between samples (default 30s) Returns: List of (start, end) tuples for missing ranges """ # Query InfluxDB for existing data query = f''' from(bucket: "{self.bucket}") |> range(start: {from_time}s, stop: {to_time}s) |> filter(fn: (r) => r._measurement == "smartmeter" and r.device_id == "{device_id}") |> keep(columns: ["_time"]) |> sort(columns: ["_time"]) ''' try: tables = self.query_api.query(query) existing_times = [] for table in tables: for record in table.records: ts = int(record.values["_time"].timestamp()) existing_times.append(ts) if not existing_times: # No data in InfluxDB, entire range is missing return [(from_time, to_time)] missing_ranges = [] prev_ts = from_time for ts in sorted(existing_times): gap = ts - prev_ts # If gap is larger than expected interval, we're missing data if gap > expected_interval * 1.5: missing_ranges.append((prev_ts, ts)) prev_ts = ts # Check if missing data at the end if prev_ts < to_time: missing_ranges.append((prev_ts, to_time)) return missing_ranges except Exception as e: print(f"✗ InfluxDB query error: {e}") return [] def close(self): """Close InfluxDB connection""" self.client.close() def parse_time_input(time_str: str, reference_date: datetime = None) -> int: """Parse time input and return Unix timestamp""" if reference_date is None: reference_date = datetime.now() # Try various formats formats = [ '%Y-%m-%d', '%Y-%m-%d %H:%M:%S', '%Y-%m-%d %H:%M', '%H:%M:%S', '%H:%M', ] for fmt in formats: try: dt = datetime.strptime(time_str, fmt) # If time-only format, use reference date if '%Y' not in fmt: dt = dt.replace(year=reference_date.year, month=reference_date.month, day=reference_date.day) return int(dt.timestamp()) except ValueError: continue raise ValueError(f"Cannot parse time: {time_str}") def interactive_time_selection() -> Tuple[int, int]: """Interactively get time range from user""" print("\n=== Time Range Selection ===") print("Enter dates in format: YYYY-MM-DD or YYYY-MM-DD HH:MM:SS") while True: try: from_str = input("\nStart time (YYYY-MM-DD): ").strip() from_time = parse_time_input(from_str) to_str = input("End time (YYYY-MM-DD): ").strip() to_time = parse_time_input(to_str) if from_time >= to_time: print("✗ Start time must be before end time") continue # Show 1-day bounds to user from_dt = datetime.fromtimestamp(from_time) to_dt = datetime.fromtimestamp(to_time) print(f"\n→ Will publish data from {from_dt} to {to_dt}") confirm = input("Confirm? (y/n): ").strip().lower() if confirm == 'y': return from_time, to_time except ValueError as e: print(f"✗ {e}") def interactive_csv_file_selection() -> str: """Help user select CSV files from SD card""" print("\n=== CSV File Selection ===") csv_dir = input("Enter path to CSV directory (or 'auto' to scan): ").strip() if csv_dir.lower() == 'auto': # Scan common locations possible_paths = [ ".", "./sd_data", "./data", "D:\\", # SD card on Windows "/mnt/sd", # SD card on Linux ] for path in possible_paths: if os.path.isdir(path): csv_dir = path break # Find all CSV files if not os.path.isdir(csv_dir): print(f"✗ Directory not found: {csv_dir}") return None csv_files = list(Path(csv_dir).rglob("*.csv")) if not csv_files: print(f"✗ No CSV files found in {csv_dir}") return None print(f"\nFound {len(csv_files)} CSV files:") for i, f in enumerate(sorted(csv_files)[:20], 1): print(f" {i}. {f.relative_to(csv_dir) if csv_dir != '.' else f}") if len(csv_files) > 20: print(f" ... and {len(csv_files) - 20} more") selected = input("\nEnter CSV file number or path: ").strip() try: idx = int(selected) - 1 if 0 <= idx < len(csv_files): return str(csv_files[idx]) except ValueError: pass # User entered a path if os.path.isfile(selected): return selected print(f"✗ Invalid selection: {selected}") return None def main(): parser = argparse.ArgumentParser( description="Republish DD3 meter data from CSV to MQTT", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Interactive mode (will prompt for all settings) python republish_mqtt.py -i # Republish specific CSV file with automatic time detection (InfluxDB) python republish_mqtt.py -f data.csv -d dd3-F19C \\ --mqtt-broker 192.168.1.100 \\ --influxdb-url http://localhost:8086 \\ --influxdb-token mytoken --influxdb-org myorg # Manual time range python republish_mqtt.py -f data.csv -d dd3-F19C \\ --mqtt-broker 192.168.1.100 \\ --from-time "2026-03-01" --to-time "2026-03-05" """ ) parser.add_argument('-i', '--interactive', action='store_true', help='Interactive mode (prompt for all settings)') parser.add_argument('-f', '--file', type=str, help='CSV file path') parser.add_argument('-d', '--device-id', type=str, help='Device ID (e.g., dd3-F19C)') parser.add_argument('--mqtt-broker', type=str, default='localhost', help='MQTT broker address (default: localhost)') parser.add_argument('--mqtt-port', type=int, default=1883, help='MQTT broker port (default: 1883)') parser.add_argument('--mqtt-user', type=str, help='MQTT username') parser.add_argument('--mqtt-pass', type=str, help='MQTT password') parser.add_argument('--rate', type=int, default=5, help='Publish rate (messages per second, default: 5)') parser.add_argument('--from-time', type=str, help='Start time (YYYY-MM-DD or YYYY-MM-DD HH:MM:SS)') parser.add_argument('--to-time', type=str, help='End time (YYYY-MM-DD or YYYY-MM-DD HH:MM:SS)') parser.add_argument('--influxdb-url', type=str, help='InfluxDB URL (for auto-detection)') parser.add_argument('--influxdb-token', type=str, help='InfluxDB API token') parser.add_argument('--influxdb-org', type=str, help='InfluxDB organization') parser.add_argument('--influxdb-bucket', type=str, default='smartmeter', help='InfluxDB bucket (default: smartmeter)') args = parser.parse_args() # Interactive mode if args.interactive or not args.file: print("╔════════════════════════════════════════════════════╗") print("║ DD3 LoRa Bridge - MQTT Data Republisher ║") print("║ Recover lost meter data from SD card CSV files ║") print("╚════════════════════════════════════════════════════╝") # Get CSV file csv_file = args.file or interactive_csv_file_selection() if not csv_file: sys.exit(1) # Get device ID device_id = args.device_id if not device_id: device_id = input("\nDevice ID (e.g., dd3-F19C): ").strip() if not device_id: print("✗ Device ID required") sys.exit(1) # Get MQTT settings mqtt_broker = input(f"\nMQTT Broker [{args.mqtt_broker}]: ").strip() or args.mqtt_broker mqtt_port = args.mqtt_port mqtt_user = input("MQTT Username (leave empty if none): ").strip() or None mqtt_pass = None if mqtt_user: import getpass mqtt_pass = getpass.getpass("MQTT Password: ") # Get time range print("\n=== Select Time Range ===") use_influx = HAS_INFLUXDB and input("Auto-detect missing ranges from InfluxDB? (y/n): ").strip().lower() == 'y' from_time = None to_time = None if use_influx: influx_url = input("InfluxDB URL: ").strip() influx_token = input("API Token: ").strip() influx_org = input("Organization: ").strip() try: helper = InfluxDBHelper(influx_url, influx_token, influx_org, args.influxdb_bucket) # Get user's date range first from_time, to_time = interactive_time_selection() print("\nSearching for missing data in InfluxDB...") missing_ranges = helper.find_missing_ranges(device_id, from_time, to_time) helper.close() if missing_ranges: print(f"\nFound {len(missing_ranges)} missing data range(s):") for i, (start, end) in enumerate(missing_ranges, 1): start_dt = datetime.fromtimestamp(start) end_dt = datetime.fromtimestamp(end) duration = (end - start) / 3600 print(f" {i}. {start_dt} to {end_dt} ({duration:.1f} hours)") # Use first range by default from_time, to_time = missing_ranges[0] print(f"\nWill republish first range: {datetime.fromtimestamp(from_time)} to {datetime.fromtimestamp(to_time)}") else: print("No missing data found in InfluxDB") except Exception as e: print(f"✗ InfluxDB error: {e}") sys.exit(1) else: from_time, to_time = interactive_time_selection() else: # Command-line mode csv_file = args.file device_id = args.device_id if not device_id: print("✗ Device ID required (use -d or --device-id)") sys.exit(1) mqtt_broker = args.mqtt_broker mqtt_port = args.mqtt_port mqtt_user = args.mqtt_user mqtt_pass = args.mqtt_pass # Parse time range if args.from_time and args.to_time: try: from_time = parse_time_input(args.from_time) to_time = parse_time_input(args.to_time) except ValueError as e: print(f"✗ {e}") sys.exit(1) else: # Auto-detect if InfluxDB is available if args.influxdb_url and args.influxdb_token and args.influxdb_org and HAS_INFLUXDB: print("Auto-detecting missing data ranges...") try: helper = InfluxDBHelper(args.influxdb_url, args.influxdb_token, args.influxdb_org, args.influxdb_bucket) # Default to last 7 days now = int(time.time()) from_time = now - (7 * 24 * 3600) to_time = now missing_ranges = helper.find_missing_ranges(device_id, from_time, to_time) helper.close() if missing_ranges: from_time, to_time = missing_ranges[0] print(f"Found missing data: {datetime.fromtimestamp(from_time)} to {datetime.fromtimestamp(to_time)}") else: print("No missing data found") sys.exit(0) except Exception as e: print(f"✗ {e}") sys.exit(1) else: print("✗ Time range required (use --from-time and --to-time, or InfluxDB settings)") sys.exit(1) # Republish data print(f"\n=== Publishing to MQTT ===") print(f"Broker: {mqtt_broker}:{mqtt_port}") print(f"Device: {device_id}") print(f"Rate: {args.rate} msg/sec") print(f"Range: {datetime.fromtimestamp(from_time)} to {datetime.fromtimestamp(to_time)}") print() try: republisher = MQTTRepublisher(mqtt_broker, mqtt_port, mqtt_user, mqtt_pass, rate_per_sec=args.rate) republisher.connect() count = republisher.republish_csv(csv_file, device_id, filter_from=from_time, filter_to=to_time) republisher.disconnect() print(f"\n✓ Successfully published {count} samples") except KeyboardInterrupt: print("\n\n⚠ Interrupted by user") if 'republisher' in locals(): republisher.disconnect() sys.exit(0) except Exception as e: print(f"\n✗ Error: {e}") sys.exit(1) if __name__ == '__main__': main()