- Fix documentation: CSV header typo (ts_hms_utc ts_hms_local) - Add comprehensive compatibility test suite (test_republish_compatibility.py) - Both republish_mqtt.py and republish_mqtt_gui.py verified working - Tests: CSV parsing, MQTT JSON format, legacy compatibility, InfluxDB schema - All 5/5 compatibility tests passing - Create detailed compatibility reports and validation documentation
245 lines
6.3 KiB
Markdown
245 lines
6.3 KiB
Markdown
# DD3 MQTT Data Republisher
|
|
|
|
Standalone Python script to recover and republish lost meter data from SD card CSV files to MQTT.
|
|
|
|
## Features
|
|
|
|
- **Rate-limited publishing**: Sends 5 messages/second by default (configurable) to prevent MQTT broker overload
|
|
- **Two modes of operation**:
|
|
- **Auto-detect**: Connect to InfluxDB to find gaps in recorded data
|
|
- **Manual selection**: User specifies start/end time range
|
|
- **Cross-platform**: Works on Windows, macOS, and Linux
|
|
- **CSV parsing**: Reads SD card CSV export format and converts to MQTT JSON
|
|
- **Interactive mode**: Walks user through configuration step-by-step
|
|
- **Command-line mode**: Scripting and automation friendly
|
|
|
|
## Installation
|
|
|
|
### Prerequisites
|
|
- Python 3.7 or later
|
|
|
|
### Setup
|
|
|
|
```bash
|
|
# Install dependencies
|
|
pip install -r requirements_republish.txt
|
|
```
|
|
|
|
### Optional: InfluxDB support
|
|
To enable automatic gap detection via InfluxDB, `influxdb-client` will be automatically installed. If you want to use the fallback manual mode only, you can skip this (though it's included in requirements).
|
|
|
|
## Usage
|
|
|
|
### Interactive Mode (Recommended for first use)
|
|
|
|
```bash
|
|
python republish_mqtt.py -i
|
|
```
|
|
|
|
The script will prompt you for:
|
|
1. CSV file location (with auto-discovery)
|
|
2. Device ID
|
|
3. MQTT broker settings
|
|
4. Time range (manual or auto-detect from InfluxDB)
|
|
|
|
### Command Line Mode
|
|
|
|
#### Republish a specific time range:
|
|
```bash
|
|
python republish_mqtt.py \
|
|
-f path/to/data.csv \
|
|
-d dd3-F19C \
|
|
--mqtt-broker 192.168.1.100 \
|
|
--mqtt-user admin \
|
|
--mqtt-pass password \
|
|
--from-time "2026-03-01" \
|
|
--to-time "2026-03-05"
|
|
```
|
|
|
|
#### Auto-detect missing data with InfluxDB:
|
|
```bash
|
|
python republish_mqtt.py \
|
|
-f path/to/data.csv \
|
|
-d dd3-F19C \
|
|
--mqtt-broker 192.168.1.100 \
|
|
--influxdb-url http://localhost:8086 \
|
|
--influxdb-token mytoken123 \
|
|
--influxdb-org myorg \
|
|
--influxdb-bucket smartmeter
|
|
```
|
|
|
|
#### Different publish rate (slower for stability):
|
|
```bash
|
|
python republish_mqtt.py \
|
|
-f data.csv \
|
|
-d dd3-F19C \
|
|
--mqtt-broker localhost \
|
|
--rate 2 # 2 messages per second instead of 5
|
|
```
|
|
|
|
## CSV Format
|
|
|
|
The script expects CSV files exported from the SD card with this header:
|
|
```
|
|
ts_utc,ts_hms_local,p_w,p1_w,p2_w,p3_w,e_kwh,bat_v,bat_pct,rssi,snr,err_m,err_d,err_tx,err_last
|
|
```
|
|
|
|
Note: `ts_hms_local` is the local time (HH:MM:SS) in your configured timezone, not UTC. The `ts_utc` field contains the Unix timestamp in UTC.
|
|
|
|
Each row is one meter sample. The script converts these to MQTT JSON format:
|
|
```json
|
|
{
|
|
"id": "F19C",
|
|
"ts": 1710076800,
|
|
"e_kwh": "1234.56",
|
|
"p_w": 5432,
|
|
"p1_w": 1800,
|
|
"p2_w": 1816,
|
|
"p3_w": 1816,
|
|
"bat_v": "4.15",
|
|
"bat_pct": 95,
|
|
"rssi": -95,
|
|
"snr": 9.25
|
|
}
|
|
```
|
|
|
|
## How It Works
|
|
|
|
### Manual Mode (Fallback)
|
|
1. User specifies a time range (start and end timestamps)
|
|
2. Script reads CSV file
|
|
3. Filters samples within the time range
|
|
4. Publishes to MQTT topic: `smartmeter/{device_id}/state`
|
|
5. Respects rate limiting (5 msg/sec by default)
|
|
|
|
### Auto-Detect Mode (with InfluxDB)
|
|
1. Script connects to InfluxDB
|
|
2. Queries for existing data in the specified bucket
|
|
3. Identifies gaps (time ranges with no data)
|
|
4. Shows gaps to user
|
|
5. Republishes the first (oldest) gap from CSV file
|
|
6. User can re-run to fill subsequent gaps
|
|
|
|
## Rate Limiting
|
|
|
|
By default, the script publishes 5 messages per second. This is:
|
|
- **Safe for most MQTT brokers** (no risk of overload)
|
|
- **Fast enough** (fills data in < 5 minute for typical daily data)
|
|
- **Adjustable** with `--rate` parameter
|
|
|
|
Examples:
|
|
- `--rate 1`: 1 msg/sec (very conservative)
|
|
- `--rate 5`: 5 msg/sec (default, recommended)
|
|
- `--rate 10`: 10 msg/sec (only if broker can handle it)
|
|
|
|
## Device ID
|
|
|
|
The device ID is used to determine the MQTT topic. It appears on the device display and in the CSV directory structure:
|
|
- Example: `dd3-F19C`
|
|
- Short ID (last 4 characters): `F19C`
|
|
|
|
You can use either form; the script extracts the short ID for the MQTT topic.
|
|
|
|
## Time Format
|
|
|
|
Dates can be specified in multiple formats:
|
|
- `2026-03-01` (YYYY-MM-DD)
|
|
- `2026-03-01 14:30:00` (YYYY-MM-DD HH:MM:SS)
|
|
- `14:30:00` (HH:MM:SS - uses today's date)
|
|
- `14:30` (HH:MM - uses today's date)
|
|
|
|
## Examples
|
|
|
|
### Scenario 1: Recover data from yesterday
|
|
```bash
|
|
python republish_mqtt.py -i
|
|
# Select CSV file → dd3-F19C_2026-03-09.csv
|
|
# Device ID → dd3-F19C
|
|
# MQTT broker → 192.168.1.100
|
|
# Choose manual time selection
|
|
# From → 2026-03-09 00:00:00
|
|
# To → 2026-03-10 00:00:00
|
|
```
|
|
|
|
### Scenario 2: Find and fill gaps automatically
|
|
```bash
|
|
python republish_mqtt.py \
|
|
-f path/to/csv/dd3-F19C/*.csv \
|
|
-d dd3-F19C \
|
|
--mqtt-broker mosquitto.example.com \
|
|
--mqtt-user admin --mqtt-pass changeme \
|
|
--influxdb-url http://influxdb:8086 \
|
|
--influxdb-token mytoken \
|
|
--influxdb-org myorg
|
|
```
|
|
|
|
### Scenario 3: Slow publishing for unreliable connection
|
|
```bash
|
|
python republish_mqtt.py -i --rate 1
|
|
```
|
|
|
|
## Troubleshooting
|
|
|
|
### "Cannot connect to MQTT broker"
|
|
- Check broker address and port
|
|
- Verify firewall rules
|
|
- Check username/password if required
|
|
- Test connectivity: `ping broker_address`
|
|
|
|
### "No data in CSV file"
|
|
- Verify CSV file path exists
|
|
- Check that CSV has data rows (not just header)
|
|
- Ensure device ID matches CSV directory name
|
|
|
|
### "InfluxDB query error"
|
|
- Verify InfluxDB is running and accessible
|
|
- Check API token validity
|
|
- Verify organization name
|
|
- Check bucket contains data
|
|
|
|
### "Published 0 samples"
|
|
- CSV file may be empty
|
|
- Time range may not match any data in CSV
|
|
- Try a wider date range
|
|
- Check that CSV timestamps are in Unix format
|
|
|
|
## Performance
|
|
|
|
Typical performance on a standard PC:
|
|
- **CSV parsing**: ~10,000 rows/second
|
|
- **MQTT publishing** (at 5 msg/sec): 1 day's worth of data (~2800 samples) takes ~9 minutes
|
|
|
|
For large files (multiple weeks of data), the script may take longer. This is expected and safe.
|
|
|
|
## Advanced: Scripting
|
|
|
|
For automation, you can use command-line mode with environment variables or config files:
|
|
|
|
```bash
|
|
#!/bin/bash
|
|
# Recover last 3 days of data
|
|
|
|
DEVICE_ID="dd3-F19C"
|
|
CSV_DIR="/mnt/sd/dd3/$DEVICE_ID"
|
|
FROM=$(date -d '3 days ago' '+%Y-%m-%d')
|
|
TO=$(date '+%Y-%m-%d')
|
|
|
|
python republish_mqtt.py \
|
|
-f "$(ls -t $CSV_DIR/*.csv | head -1)" \
|
|
-d "$DEVICE_ID" \
|
|
--mqtt-broker mqtt.example.com \
|
|
--mqtt-user admin \
|
|
--mqtt-pass changeme \
|
|
--from-time "$FROM" \
|
|
--to-time "$TO" \
|
|
--rate 5
|
|
```
|
|
|
|
## License
|
|
|
|
Same as DD3 project
|
|
|
|
## Support
|
|
|
|
For issues or feature requests, check the project repository.
|