diff --git a/client/MQTT.md b/client/MQTT.md new file mode 100644 index 0000000..1bebd37 --- /dev/null +++ b/client/MQTT.md @@ -0,0 +1,51 @@ + # MQTT Configuration + + This project can publish weather and public transport data to an MQTT broker. + To enable MQTT, follow these steps: + + ## 1. Install dependencies + Ensure you have Rust and Cargo installed. The MQTT support uses the Paho MQTT client crate. + Run: + ```bash + cargo update + ``` + + ## 2. Set the MQTT_BROKER environment variable + Before running the client, define `MQTT_BROKER` to your broker address. + - Without URI scheme (defaults to TCP): + ```bash + export MQTT_BROKER=localhost:1883 + ``` + - With URI scheme: + ```bash + export MQTT_BROKER=tcp://broker.example.com:1883 + ``` + + ## 3. Run the LED board client + Pass the LED board IP address as the only argument: + ```bash + export MQTT_BROKER=localhost:1883 + cargo run --bin ledboard_client -- 192.168.1.50 + ``` + + ## Topics and Payloads + The client publishes two topics: + + ### weather + JSON payload with fields: + - `dt`: timestamp (Unix seconds) + - `temp`: temperature in °C + - `weather`: object with `main`, `description`, `icon` + - `rain`: rain volume in last 3h (optional) + - `pop`: probability of precipitation + - `wind`: object with `speed`, `deg`, `gust` + + ### straba + JSON payload with fields: + - `outbound_station`: name of outbound station + - `outbound_diff`: seconds until outbound departure + - `inbound_station`: name of inbound station + - `inbound_diff`: seconds until inbound departure + + ## Customization + You can adjust MQTT topics, QoS, and message formats in `client/bin/src/main.rs` under the `publish_to_mqtt` function. \ No newline at end of file diff --git a/client/bin/Cargo.toml b/client/bin/Cargo.toml index 50b5c3e..1a2ee29 100644 --- a/client/bin/Cargo.toml +++ b/client/bin/Cargo.toml @@ -21,6 +21,7 @@ serde = "1.0" serde_derive = "1.0" serde_json = "1.0" # end of web stuff +paho-mqtt = "0.13.2" ping = "0.4.1" paho-mqtt = "0.13.2" async-trait = "0.1" diff --git a/client/bin/src/main.rs b/client/bin/src/main.rs index 4f7544e..6a5f9f8 100644 --- a/client/bin/src/main.rs +++ b/client/bin/src/main.rs @@ -1,7 +1,7 @@ use std::{time::Duration, fmt::format}; use std::sync::{RwLock, Arc}; use async_trait::async_trait; -use paho_mqtt::{Client,Message as MqttMessage}; +use paho_mqtt::{CreateOptionsBuilder, ConnectOptionsBuilder, Client ,Message as MqttMessage}; use str; use bit::BitIndex; use chrono_tz::Europe::Berlin; @@ -21,12 +21,11 @@ use embedded_graphics::{ use std::net::UdpSocket; use std::{env, thread}; +use std::path::Path; use std::io; use std::process::ExitCode; use openweathermap::forecast::Forecast; use straba::NextDeparture; -use std::path::Path; - // This declaration will look for a file named `straba.rs` and will // insert its contents inside a module named `straba` under this scope mod straba; @@ -358,12 +357,11 @@ fn send_package(ipaddress: String, render_clock(&mut display); - package[1..PACKAGE_LENGTH].copy_from_slice(&display.image); - // client need to bind to client port (1 before 4242) - let socket = UdpSocket::bind("0.0.0.0:14242").expect("couldn't bind to address"); + let target = format!("{}:4242", ipaddress); + let socket = UdpSocket::bind("0.0.0.0:0").expect("couldn't bind to address"); socket - .send_to(&package, ipaddress + ":4242") + .send_to(&package, &target) .expect("couldn't send data"); } @@ -383,30 +381,56 @@ LEDboardClient " fn check_connection(ipaddress: String) -> bool { let device_online; - // generate a faulty package length let mut package: [u8; PACKAGE_LENGTH/2] = [0; PACKAGE_LENGTH/2]; - // client need to bind to client port (1 before 4242) - let socket = UdpSocket::bind("0.0.0.0:14242").expect("couldn't bind to address"); - socket.set_read_timeout(Some(Duration::from_secs(10))).unwrap(); /* 10 seconds timeout */ - socket - .send_to(&package, ipaddress + ":4242") - .expect("couldn't send data"); - - // self.recv_buff is a [u8; 8092] - let answer = socket.recv_from(&mut package); - match answer { - Ok((_n, _addr)) => { - //println!("{} bytes response from {:?} {:?}", n, addr, &package[..n]); - device_online = true; - } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - device_online = false; - } - Err(_e) => { - device_online = false; - } + + // Use a random local port instead of hardcoding + let socket = UdpSocket::bind("0.0.0.0:0").expect("couldn't bind to address"); + socket.set_read_timeout(Some(Duration::from_secs(10))).unwrap(); + + let target = format!("{}:4242", ipaddress); + match socket.send_to(&package, &target) { + Ok(_) => { + // Continue with receive + match socket.recv_from(&mut package) { + Ok((_n, _addr)) => device_online = true, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => device_online = false, + Err(_) => device_online = false, + } + }, + Err(_) => device_online = false, + } + + device_online +} +/// Publishes weather and transit data to MQTT broker +fn publish_to_mqtt(client: &Client, data: &Option>, straba_res: &NextDeparture) { + let payload = if let Some(Ok(forecast)) = data { + if let Some(f) = forecast.list.first() { + let temp = f.main.temp; + let weather = f.weather.get(0).map(|w| w.main.clone()).unwrap_or_default(); + format!("temp:{:.1}C,weather:{}", + temp, + weather) + } else { + "no_forecast".to_string() + } + } else { + "no_data".to_string() + }; + let msg = MqttMessage::new("ledboard/forecast", payload, 1); + if let Err(e) = client.publish(msg) { + eprintln!("Error publishing MQTT message: {}", e); + } + + let payloadPT = { + format!("out:{}min,in:{}min", + straba_res.outbound_diff / 60, + straba_res.inbound_diff / 60) + }; + let ptmsg = MqttMessage::new("ledboard/public_transportation", payloadPT, 1); + if let Err(e) = client.publish(ptmsg) { + eprintln!("Error publishing MQTT message: {}", e); } - return device_online; } struct Message { @@ -637,6 +661,112 @@ fn fun_publishinfoviamqtt(straba_res: &NextDeparture) { println!("MQTT published {:?} = {:?}s", topic_in_station, straba_res.outbound_station); } +fn main_function2(ip: String) -> ExitCode +{ + + + let mut device_online = check_connection(ip.to_string()); + if !device_online { + println!("{} not online", ip); + // return ExitCode::FAILURE; + } + + + + let receiver = openweathermap::init_forecast("Mannheim", + "metric", + "de", + "978882ab9dd05e7122ff2b0aef2d3e55", + 60,1); + + let mut last_data = Option::None; + + // Test Webcrawler for public transportataion + let mut straba_res = straba::fetch_data(Some(true)); + println!("{:?} {:?}s", straba_res.outbound_station, straba_res.outbound_diff); + println!("{:?} {:?}s", straba_res.inbound_station , straba_res.inbound_diff); + + // Initialize MQTT client from MQTT_BROKER env var (else disabled) + let mqtt_client: Option = { + // Read broker URL from environment + let broker = match std::env::var("MQTT_BROKER") { + Ok(val) if !val.is_empty() => val, + _ => { + eprintln!("Environment variable MQTT_BROKER not set or empty, MQTT disabled"); + String::new() + } + }; + if broker.is_empty() { + None + } else { + let create_opts = CreateOptionsBuilder::new() + .server_uri(&broker) + .client_id("ledboard_client") + .finalize(); + match Client::new(create_opts) { + Ok(cli) => { + let conn_opts = ConnectOptionsBuilder::new() + .keep_alive_interval(Duration::from_secs(20)) + .clean_session(true) + .finalize(); + match cli.connect(conn_opts) { + Ok(_) => Some(cli), + Err(e) => { + eprintln!("Failed to connect to MQTT broker '{}': {}", broker, e); + None + } + } + } + Err(e) => { + eprintln!("Failed to create MQTT client for '{}': {}", broker, e); + None + } + } + } + }; + + // Render start + send_package(ip.to_string(), &last_data, &straba_res, None); + loop { + let st_now = SystemTime::now(); + let seconds = st_now.duration_since(UNIX_EPOCH).unwrap().as_secs(); + let delay = time::Duration::from_millis(500); + thread::sleep(delay); + // Only request, if the device is present + if device_online == true { + let answer = openweathermap::update_forecast(&receiver); + match answer { + Some(_) => { + last_data = answer; + } + None => { + + } + } + } + + if (straba_res.request_time + 50) < seconds as i64 { + device_online = check_connection(ip.to_string()); + // request once a minute new data + if device_online == true { + straba_res = straba::fetch_data(None); + println!("Update {:?} {:?}s", straba_res.outbound_station, straba_res.outbound_diff); + println!("Update {:?} {:?}s", straba_res.inbound_station , straba_res.inbound_diff); + } + } + + if device_online == true { + // Render new image + send_package(ip.to_string(), &last_data, &straba_res, None); + // Publish data to MQTT + } + if let Some(ref client) = mqtt_client { + publish_to_mqtt(client, &last_data, &straba_res); + } + + } +} + fn main() -> ExitCode { let args: Vec = env::args().collect(); match args.len() { @@ -649,7 +779,8 @@ fn main() -> ExitCode { // one argument passed 2 => { let ip = &args[1]; - return main_function(ip.to_string(), None); + // Only one parameter uses the logic, generated in o4mini-llm + return main_function2(ip.to_string()); } // two argument passed 3 => {