Compare commits

..

No commits in common. "o4mini-llm" and "master" have entirely different histories.

3 changed files with 77 additions and 216 deletions

View File

@ -1,51 +0,0 @@
# MQTT Configuration
This project can publish weather and public transport data to an MQTT broker.
To enable MQTT, follow these steps:
## 1. Install dependencies
Ensure you have Rust and Cargo installed. The MQTT support uses the Paho MQTT client crate.
Run:
```bash
cargo update
```
## 2. Set the MQTT_BROKER environment variable
Before running the client, define `MQTT_BROKER` to your broker address.
- Without URI scheme (defaults to TCP):
```bash
export MQTT_BROKER=localhost:1883
```
- With URI scheme:
```bash
export MQTT_BROKER=tcp://broker.example.com:1883
```
## 3. Run the LED board client
Pass the LED board IP address as the only argument:
```bash
export MQTT_BROKER=localhost:1883
cargo run --bin ledboard_client -- 192.168.1.50
```
## Topics and Payloads
The client publishes two topics:
### weather
JSON payload with fields:
- `dt`: timestamp (Unix seconds)
- `temp`: temperature in °C
- `weather`: object with `main`, `description`, `icon`
- `rain`: rain volume in last 3h (optional)
- `pop`: probability of precipitation
- `wind`: object with `speed`, `deg`, `gust`
### straba
JSON payload with fields:
- `outbound_station`: name of outbound station
- `outbound_diff`: seconds until outbound departure
- `inbound_station`: name of inbound station
- `inbound_diff`: seconds until inbound departure
## Customization
You can adjust MQTT topics, QoS, and message formats in `client/bin/src/main.rs` under the `publish_to_mqtt` function.

View File

@ -21,5 +21,4 @@ serde = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
serde_json = "1.0" serde_json = "1.0"
# end of web stuff # end of web stuff
paho-mqtt = "0.13.2"
ping = "0.4.1" ping = "0.4.1"

View File

@ -23,7 +23,6 @@ use std::process::ExitCode;
use openweathermap::forecast::Forecast; use openweathermap::forecast::Forecast;
use straba::NextDeparture; use straba::NextDeparture;
use paho_mqtt::{Client, CreateOptionsBuilder, ConnectOptionsBuilder, Message};
// This declaration will look for a file named `straba.rs` and will // This declaration will look for a file named `straba.rs` and will
// insert its contents inside a module named `straba` under this scope // insert its contents inside a module named `straba` under this scope
mod straba; mod straba;
@ -303,11 +302,12 @@ fn send_package(ipaddress: String,
render_clock(&mut display); render_clock(&mut display);
package[1..PACKAGE_LENGTH].copy_from_slice(&display.image); package[1..PACKAGE_LENGTH].copy_from_slice(&display.image);
let target = format!("{}:4242", ipaddress); // client need to bind to client port (1 before 4242)
let socket = UdpSocket::bind("0.0.0.0:0").expect("couldn't bind to address"); let socket = UdpSocket::bind("0.0.0.0:14242").expect("couldn't bind to address");
socket socket
.send_to(&package, &target) .send_to(&package, ipaddress + ":4242")
.expect("couldn't send data"); .expect("couldn't send data");
} }
@ -322,56 +322,30 @@ LEDboardClient <ip address>"
fn check_connection(ipaddress: String) -> bool { fn check_connection(ipaddress: String) -> bool {
let device_online; let device_online;
// generate a faulty package length
let mut package: [u8; PACKAGE_LENGTH/2] = [0; PACKAGE_LENGTH/2]; let mut package: [u8; PACKAGE_LENGTH/2] = [0; PACKAGE_LENGTH/2];
// client need to bind to client port (1 before 4242)
// Use a random local port instead of hardcoding let socket = UdpSocket::bind("0.0.0.0:14242").expect("couldn't bind to address");
let socket = UdpSocket::bind("0.0.0.0:0").expect("couldn't bind to address"); socket.set_read_timeout(Some(Duration::from_secs(10))).unwrap(); /* 10 seconds timeout */
socket.set_read_timeout(Some(Duration::from_secs(10))).unwrap(); socket
.send_to(&package, ipaddress + ":4242")
let target = format!("{}:4242", ipaddress); .expect("couldn't send data");
match socket.send_to(&package, &target) {
Ok(_) => {
// Continue with receive
match socket.recv_from(&mut package) {
Ok((_n, _addr)) => device_online = true,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => device_online = false,
Err(_) => device_online = false,
}
},
Err(_) => device_online = false,
}
device_online
}
/// Publishes weather and transit data to MQTT broker
fn publish_to_mqtt(client: &Client, data: &Option<Result<Forecast, String>>, straba_res: &NextDeparture) {
let payload = if let Some(Ok(forecast)) = data {
if let Some(f) = forecast.list.first() {
let temp = f.main.temp;
let weather = f.weather.get(0).map(|w| w.main.clone()).unwrap_or_default();
format!("temp:{:.1}C,weather:{}",
temp,
weather)
} else {
"no_forecast".to_string()
}
} else {
"no_data".to_string()
};
let msg = Message::new("ledboard/forecast", payload, 1);
if let Err(e) = client.publish(msg) {
eprintln!("Error publishing MQTT message: {}", e);
}
let payloadPT = { // self.recv_buff is a [u8; 8092]
format!("out:{}min,in:{}min", let answer = socket.recv_from(&mut package);
straba_res.outbound_diff / 60, match answer {
straba_res.inbound_diff / 60) Ok((_n, _addr)) => {
}; //println!("{} bytes response from {:?} {:?}", n, addr, &package[..n]);
let ptmsg = Message::new("ledboard/public_transportation", payloadPT, 1); device_online = true;
if let Err(e) = client.publish(ptmsg) { }
eprintln!("Error publishing MQTT message: {}", e); Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
device_online = false;
}
Err(_e) => {
device_online = false;
}
} }
return device_online;
} }
fn main() -> ExitCode { fn main() -> ExitCode {
@ -387,25 +361,61 @@ fn main() -> ExitCode {
2 => { 2 => {
let ip = &args[1]; let ip = &args[1];
// Read broker URL from environment
let broker = match std::env::var("MQTT_BROKER") { let mut device_online = check_connection(ip.to_string());
Ok(val) if !val.is_empty() => val, if !device_online {
_ => { println!("{} not online", ip);
eprintln!("Environment variable MQTT_BROKER not set or empty, MQTT disabled"); return ExitCode::FAILURE;
String::new() }
let receiver = openweathermap::init_forecast("Mannheim",
"metric",
"de",
"978882ab9dd05e7122ff2b0aef2d3e55",
60,1);
let mut last_data = Option::None;
// Test Webcrawler for public transportataion
let mut straba_res = straba::fetch_data(Some(true));
println!("{:?} {:?}s", straba_res.outbound_station, straba_res.outbound_diff);
println!("{:?} {:?}s", straba_res.inbound_station , straba_res.inbound_diff);
// Render start
send_package(ip.to_string(), &last_data, &straba_res);
loop {
let st_now = SystemTime::now();
let seconds = st_now.duration_since(UNIX_EPOCH).unwrap().as_secs();
let delay = time::Duration::from_millis(500);
thread::sleep(delay);
// Only request, if the device is present
if device_online == true {
let answer = openweathermap::update_forecast(&receiver);
match answer {
Some(_) => {
last_data = answer;
}
None => {
}
}
} }
};
return main_function(ip, Some(broker)) if (straba_res.request_time + 50) < seconds as i64 {
device_online = check_connection(ip.to_string());
// request once a minute new data
if device_online == true {
straba_res = straba::fetch_data(None);
println!("Update {:?} {:?}s", straba_res.outbound_station, straba_res.outbound_diff);
println!("Update {:?} {:?}s", straba_res.inbound_station , straba_res.inbound_diff);
}
}
} if device_online == true {
// two argument passed // Render new image
3 => { send_package(ip.to_string(), &last_data, &straba_res);
let ip = &args[1]; }
let mqtt = &args[2]; }
return main_function( ip,
Some(mqtt.to_string())
);
} }
// all the other cases // all the other cases
_ => { _ => {
@ -415,100 +425,3 @@ fn main() -> ExitCode {
} }
} }
} }
fn main_function(ip: &String, mqttBroker: Option<String>) -> ExitCode {
let mut device_online: bool = check_connection(ip.to_string());
if !device_online {
println!("{} not online", ip);
return ExitCode::FAILURE;
}
let receiver = openweathermap::init_forecast("Mannheim",
"metric",
"de",
"978882ab9dd05e7122ff2b0aef2d3e55",
60,1);
let mut last_data = Option::None;
// Test Webcrawler for public transportataion
let mut straba_res = straba::fetch_data(Some(true));
println!("{:?} {:?}s", straba_res.outbound_station, straba_res.outbound_diff);
println!("{:?} {:?}s", straba_res.inbound_station , straba_res.inbound_diff);
// Initialize MQTT client from MQTT_BROKER env var (else disabled)
let mqtt_client: Option<Client> = {
if mqttBroker.is_none() {
None
} else if mqttBroker.is_some() {
let create_opts = CreateOptionsBuilder::new()
.server_uri(mqttBroker.clone().unwrap())
.client_id("ledboard_client")
.finalize();
match Client::new(create_opts) {
Ok(cli) => {
let conn_opts = ConnectOptionsBuilder::new()
.keep_alive_interval(Duration::from_secs(20))
.clean_session(true)
.finalize();
match cli.connect(conn_opts) {
Ok(_) => Some(cli),
Err(e) => {
eprintln!("Failed to connect to MQTT broker '{}': {}", mqttBroker.clone().unwrap(), e);
None
}
}
}
Err(e) => {
eprintln!("Failed to create MQTT client for '{}': {}", mqttBroker.clone().unwrap(), e);
None
}
}
} else {
None
}
};
// Render start
send_package(ip.to_string(), &last_data, &straba_res);
loop {
let st_now = SystemTime::now();
let seconds = st_now.duration_since(UNIX_EPOCH).unwrap().as_secs();
let delay = time::Duration::from_millis(500);
thread::sleep(delay);
// Only request, if the device is present
if device_online == true {
let answer = openweathermap::update_forecast(&receiver);
match answer {
Some(_) => {
last_data = answer;
}
None => {
}
}
}
if (straba_res.request_time + 50) < seconds as i64 {
device_online = check_connection(ip.to_string());
// request once a minute new data
if device_online == true {
straba_res = straba::fetch_data(None);
println!("Update {:?} {:?}s", straba_res.outbound_station, straba_res.outbound_diff);
println!("Update {:?} {:?}s", straba_res.inbound_station , straba_res.inbound_diff);
}
}
if device_online == true {
// Render new image
send_package(ip.to_string(), &last_data, &straba_res);
// Publish data to MQTT
}
if let Some(ref client) = mqtt_client {
publish_to_mqtt(client, &last_data, &straba_res);
}
}
}