Compare commits
6 Commits
master
...
o4mini-llm
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a0e18c1a7d | ||
|
|
fbdf1ea24b | ||
| 06ce74da9f | |||
| 0acb2a2538 | |||
| bd300f163e | |||
| 25da1ac04b |
51
client/MQTT.md
Normal file
51
client/MQTT.md
Normal file
@@ -0,0 +1,51 @@
|
||||
# MQTT Configuration
|
||||
|
||||
This project can publish weather and public transport data to an MQTT broker.
|
||||
To enable MQTT, follow these steps:
|
||||
|
||||
## 1. Install dependencies
|
||||
Ensure you have Rust and Cargo installed. The MQTT support uses the Paho MQTT client crate.
|
||||
Run:
|
||||
```bash
|
||||
cargo update
|
||||
```
|
||||
|
||||
## 2. Set the MQTT_BROKER environment variable
|
||||
Before running the client, define `MQTT_BROKER` to your broker address.
|
||||
- Without URI scheme (defaults to TCP):
|
||||
```bash
|
||||
export MQTT_BROKER=localhost:1883
|
||||
```
|
||||
- With URI scheme:
|
||||
```bash
|
||||
export MQTT_BROKER=tcp://broker.example.com:1883
|
||||
```
|
||||
|
||||
## 3. Run the LED board client
|
||||
Pass the LED board IP address as the only argument:
|
||||
```bash
|
||||
export MQTT_BROKER=localhost:1883
|
||||
cargo run --bin ledboard_client -- 192.168.1.50
|
||||
```
|
||||
|
||||
## Topics and Payloads
|
||||
The client publishes two topics:
|
||||
|
||||
### weather
|
||||
JSON payload with fields:
|
||||
- `dt`: timestamp (Unix seconds)
|
||||
- `temp`: temperature in °C
|
||||
- `weather`: object with `main`, `description`, `icon`
|
||||
- `rain`: rain volume in last 3h (optional)
|
||||
- `pop`: probability of precipitation
|
||||
- `wind`: object with `speed`, `deg`, `gust`
|
||||
|
||||
### straba
|
||||
JSON payload with fields:
|
||||
- `outbound_station`: name of outbound station
|
||||
- `outbound_diff`: seconds until outbound departure
|
||||
- `inbound_station`: name of inbound station
|
||||
- `inbound_diff`: seconds until inbound departure
|
||||
|
||||
## Customization
|
||||
You can adjust MQTT topics, QoS, and message formats in `client/bin/src/main.rs` under the `publish_to_mqtt` function.
|
||||
@@ -21,4 +21,5 @@ serde = "1.0"
|
||||
serde_derive = "1.0"
|
||||
serde_json = "1.0"
|
||||
# end of web stuff
|
||||
paho-mqtt = "0.13.2"
|
||||
ping = "0.4.1"
|
||||
|
||||
@@ -23,6 +23,7 @@ use std::process::ExitCode;
|
||||
|
||||
use openweathermap::forecast::Forecast;
|
||||
use straba::NextDeparture;
|
||||
use paho_mqtt::{Client, CreateOptionsBuilder, ConnectOptionsBuilder, Message};
|
||||
// This declaration will look for a file named `straba.rs` and will
|
||||
// insert its contents inside a module named `straba` under this scope
|
||||
mod straba;
|
||||
@@ -276,8 +277,8 @@ fn render_strab_partial(display: &mut UdpDisplay, station: &String, diff: i64, h
|
||||
}
|
||||
|
||||
fn render_strab(display: &mut UdpDisplay, straba_res: &NextDeparture) {
|
||||
render_strab_partial(display, &straba_res.outbound_station, straba_res.outbound_diff, 17);
|
||||
render_strab_partial(display, &straba_res.inbound_station, straba_res.inbound_diff, 27);
|
||||
render_strab_partial(display, &straba_res.outbound_station, straba_res.outbound_diff, 15);
|
||||
render_strab_partial(display, &straba_res.inbound_station, straba_res.inbound_diff, 25);
|
||||
}
|
||||
|
||||
fn send_package(ipaddress: String,
|
||||
@@ -302,12 +303,11 @@ fn send_package(ipaddress: String,
|
||||
|
||||
render_clock(&mut display);
|
||||
|
||||
|
||||
package[1..PACKAGE_LENGTH].copy_from_slice(&display.image);
|
||||
// client need to bind to client port (1 before 4242)
|
||||
let socket = UdpSocket::bind("0.0.0.0:14242").expect("couldn't bind to address");
|
||||
let target = format!("{}:4242", ipaddress);
|
||||
let socket = UdpSocket::bind("0.0.0.0:0").expect("couldn't bind to address");
|
||||
socket
|
||||
.send_to(&package, ipaddress + ":4242")
|
||||
.send_to(&package, &target)
|
||||
.expect("couldn't send data");
|
||||
}
|
||||
|
||||
@@ -322,30 +322,56 @@ LEDboardClient <ip address>"
|
||||
|
||||
fn check_connection(ipaddress: String) -> bool {
|
||||
let device_online;
|
||||
// generate a faulty package length
|
||||
let mut package: [u8; PACKAGE_LENGTH/2] = [0; PACKAGE_LENGTH/2];
|
||||
// client need to bind to client port (1 before 4242)
|
||||
let socket = UdpSocket::bind("0.0.0.0:14242").expect("couldn't bind to address");
|
||||
socket.set_read_timeout(Some(Duration::from_secs(10))).unwrap(); /* 10 seconds timeout */
|
||||
socket
|
||||
.send_to(&package, ipaddress + ":4242")
|
||||
.expect("couldn't send data");
|
||||
|
||||
// self.recv_buff is a [u8; 8092]
|
||||
let answer = socket.recv_from(&mut package);
|
||||
match answer {
|
||||
Ok((_n, _addr)) => {
|
||||
//println!("{} bytes response from {:?} {:?}", n, addr, &package[..n]);
|
||||
device_online = true;
|
||||
// Use a random local port instead of hardcoding
|
||||
let socket = UdpSocket::bind("0.0.0.0:0").expect("couldn't bind to address");
|
||||
socket.set_read_timeout(Some(Duration::from_secs(10))).unwrap();
|
||||
|
||||
let target = format!("{}:4242", ipaddress);
|
||||
match socket.send_to(&package, &target) {
|
||||
Ok(_) => {
|
||||
// Continue with receive
|
||||
match socket.recv_from(&mut package) {
|
||||
Ok((_n, _addr)) => device_online = true,
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => device_online = false,
|
||||
Err(_) => device_online = false,
|
||||
}
|
||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
|
||||
device_online = false;
|
||||
},
|
||||
Err(_) => device_online = false,
|
||||
}
|
||||
Err(_e) => {
|
||||
device_online = false;
|
||||
|
||||
device_online
|
||||
}
|
||||
/// Publishes weather and transit data to MQTT broker
|
||||
fn publish_to_mqtt(client: &Client, data: &Option<Result<Forecast, String>>, straba_res: &NextDeparture) {
|
||||
let payload = if let Some(Ok(forecast)) = data {
|
||||
if let Some(f) = forecast.list.first() {
|
||||
let temp = f.main.temp;
|
||||
let weather = f.weather.get(0).map(|w| w.main.clone()).unwrap_or_default();
|
||||
format!("temp:{:.1}C,weather:{}",
|
||||
temp,
|
||||
weather)
|
||||
} else {
|
||||
"no_forecast".to_string()
|
||||
}
|
||||
} else {
|
||||
"no_data".to_string()
|
||||
};
|
||||
let msg = Message::new("ledboard/forecast", payload, 1);
|
||||
if let Err(e) = client.publish(msg) {
|
||||
eprintln!("Error publishing MQTT message: {}", e);
|
||||
}
|
||||
|
||||
let payloadPT = {
|
||||
format!("out:{}min,in:{}min",
|
||||
straba_res.outbound_diff / 60,
|
||||
straba_res.inbound_diff / 60)
|
||||
};
|
||||
let ptmsg = Message::new("ledboard/public_transportation", payloadPT, 1);
|
||||
if let Err(e) = client.publish(ptmsg) {
|
||||
eprintln!("Error publishing MQTT message: {}", e);
|
||||
}
|
||||
return device_online;
|
||||
}
|
||||
|
||||
fn main() -> ExitCode {
|
||||
@@ -361,13 +387,44 @@ fn main() -> ExitCode {
|
||||
2 => {
|
||||
let ip = &args[1];
|
||||
|
||||
// Read broker URL from environment
|
||||
let broker = match std::env::var("MQTT_BROKER") {
|
||||
Ok(val) if !val.is_empty() => val,
|
||||
_ => {
|
||||
eprintln!("Environment variable MQTT_BROKER not set or empty, MQTT disabled");
|
||||
String::new()
|
||||
}
|
||||
};
|
||||
|
||||
let mut device_online = check_connection(ip.to_string());
|
||||
return main_function(ip, Some(broker))
|
||||
|
||||
}
|
||||
// two argument passed
|
||||
3 => {
|
||||
let ip = &args[1];
|
||||
let mqtt = &args[2];
|
||||
return main_function( ip,
|
||||
Some(mqtt.to_string())
|
||||
);
|
||||
}
|
||||
// all the other cases
|
||||
_ => {
|
||||
// show a help message
|
||||
help();
|
||||
return ExitCode::SUCCESS;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn main_function(ip: &String, mqttBroker: Option<String>) -> ExitCode {
|
||||
let mut device_online: bool = check_connection(ip.to_string());
|
||||
if !device_online {
|
||||
println!("{} not online", ip);
|
||||
return ExitCode::FAILURE;
|
||||
}
|
||||
|
||||
|
||||
|
||||
let receiver = openweathermap::init_forecast("Mannheim",
|
||||
"metric",
|
||||
"de",
|
||||
@@ -381,6 +438,39 @@ fn main() -> ExitCode {
|
||||
println!("{:?} {:?}s", straba_res.outbound_station, straba_res.outbound_diff);
|
||||
println!("{:?} {:?}s", straba_res.inbound_station , straba_res.inbound_diff);
|
||||
|
||||
// Initialize MQTT client from MQTT_BROKER env var (else disabled)
|
||||
let mqtt_client: Option<Client> = {
|
||||
if mqttBroker.is_none() {
|
||||
None
|
||||
} else if mqttBroker.is_some() {
|
||||
let create_opts = CreateOptionsBuilder::new()
|
||||
.server_uri(mqttBroker.clone().unwrap())
|
||||
.client_id("ledboard_client")
|
||||
.finalize();
|
||||
match Client::new(create_opts) {
|
||||
Ok(cli) => {
|
||||
let conn_opts = ConnectOptionsBuilder::new()
|
||||
.keep_alive_interval(Duration::from_secs(20))
|
||||
.clean_session(true)
|
||||
.finalize();
|
||||
match cli.connect(conn_opts) {
|
||||
Ok(_) => Some(cli),
|
||||
Err(e) => {
|
||||
eprintln!("Failed to connect to MQTT broker '{}': {}", mqttBroker.clone().unwrap(), e);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Failed to create MQTT client for '{}': {}", mqttBroker.clone().unwrap(), e);
|
||||
None
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
// Render start
|
||||
send_package(ip.to_string(), &last_data, &straba_res);
|
||||
loop {
|
||||
@@ -414,14 +504,11 @@ fn main() -> ExitCode {
|
||||
if device_online == true {
|
||||
// Render new image
|
||||
send_package(ip.to_string(), &last_data, &straba_res);
|
||||
}
|
||||
}
|
||||
}
|
||||
// all the other cases
|
||||
_ => {
|
||||
// show a help message
|
||||
help();
|
||||
return ExitCode::SUCCESS;
|
||||
}
|
||||
// Publish data to MQTT
|
||||
}
|
||||
if let Some(ref client) = mqtt_client {
|
||||
publish_to_mqtt(client, &last_data, &straba_res);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use chrono::DateTime;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use chrono::Local;
|
||||
|
||||
use serde::Deserialize;
|
||||
|
||||
const STATION_URL:&str = "https://www.rnv-online.de/rest/departure/2494";
|
||||
@@ -79,11 +79,10 @@ pub struct NextDeparture {
|
||||
}
|
||||
|
||||
pub fn fetch_data(debug_print : Option<bool>) -> NextDeparture {
|
||||
let date = Local::now();
|
||||
|
||||
let st_now = SystemTime::now();
|
||||
let seconds = st_now.duration_since(UNIX_EPOCH).unwrap().as_secs();
|
||||
let timeString = date.format("%Y-%m-%d %H:%M:%S");
|
||||
let url = &format!("{}?datetime={}", STATION_URL, timeString);
|
||||
let url = &format!("{}?datetime={}", STATION_URL, seconds);
|
||||
let result = reqwest::blocking::get(url);
|
||||
|
||||
let mut return_value = NextDeparture {
|
||||
@@ -119,24 +118,10 @@ pub fn fetch_data(debug_print : Option<bool>) -> NextDeparture {
|
||||
return return_value;
|
||||
}
|
||||
|
||||
if debug_print.is_some() && debug_print.unwrap() == true {
|
||||
println!("----------- Seconds {:} {:} requesting ... -----------", seconds, timeString);
|
||||
}
|
||||
// parse JSON result.. search of both directions
|
||||
let json = body.unwrap();
|
||||
|
||||
if debug_print.is_some() && debug_print.unwrap() == true {
|
||||
println!("Requesting {:}", json.graph_ql.response.name);
|
||||
println!("Elements {:}", json.graph_ql.response.journeys.elements.len() );
|
||||
println!("------------------------- %< ----------------------------");
|
||||
println!("{}", &raw_text);
|
||||
println!("------------------------- %< ----------------------------");
|
||||
}
|
||||
|
||||
for el in json.graph_ql.response.journeys.elements {
|
||||
|
||||
if debug_print.is_some() && debug_print.unwrap() == true {
|
||||
println!("Requesting {:}", json.graph_ql.response.name);
|
||||
println!("Line {:}", el.line.line_group.label);
|
||||
}
|
||||
for stop in el.stops {
|
||||
@@ -155,8 +140,6 @@ pub fn fetch_data(debug_print : Option<bool>) -> NextDeparture {
|
||||
if diff < return_value.outbound_diff {
|
||||
return_value.outbound_station = stop.destination_label;
|
||||
return_value.outbound_diff = diff;
|
||||
} else if debug_print.is_some() && debug_print.unwrap() == true {
|
||||
println!("Unkown diff Stop {:} {:} (in {:} seconds)", stop.destination_label, txt_departure, diff );
|
||||
}
|
||||
} else if stop.destination_label.contains("Hochschule") ||
|
||||
stop.destination_label.contains("Hauptbahnhof") ||
|
||||
@@ -164,19 +147,13 @@ pub fn fetch_data(debug_print : Option<bool>) -> NextDeparture {
|
||||
if diff < return_value.inbound_diff {
|
||||
return_value.inbound_station = stop.destination_label;
|
||||
return_value.inbound_diff = diff;
|
||||
} else if debug_print.is_some() && debug_print.unwrap() == true {
|
||||
println!("Unkown diff Stop {:} {:} (in {:} seconds)", stop.destination_label, txt_departure, diff );
|
||||
}
|
||||
} else if debug_print.is_some() && debug_print.unwrap() == true {
|
||||
println!("Unkown Stop {:} {:} (in {:} seconds)", stop.destination_label, txt_departure, diff );
|
||||
}
|
||||
} else {
|
||||
println!("Planned {:} {:?}", stop.destination_label, stop.planned_departure.iso_string)
|
||||
}
|
||||
}
|
||||
}
|
||||
if debug_print.is_some() && debug_print.unwrap() == true {
|
||||
println!("----------- end of straba.rs -----------");
|
||||
}
|
||||
|
||||
return_value
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user