Compare commits
	
		
			6 Commits
		
	
	
		
			master
			...
			o4mini-llm
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | a0e18c1a7d | ||
|  | fbdf1ea24b | ||
| 06ce74da9f | |||
| 0acb2a2538 | |||
| bd300f163e | |||
| 25da1ac04b | 
							
								
								
									
										51
									
								
								client/MQTT.md
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										51
									
								
								client/MQTT.md
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,51 @@ | ||||
|  # MQTT Configuration | ||||
|  | ||||
|  This project can publish weather and public transport data to an MQTT broker. | ||||
|  To enable MQTT, follow these steps: | ||||
|  | ||||
|  ## 1. Install dependencies | ||||
|  Ensure you have Rust and Cargo installed. The MQTT support uses the Paho MQTT client crate. | ||||
|  Run: | ||||
|  ```bash | ||||
|  cargo update | ||||
|  ``` | ||||
|  | ||||
|  ## 2. Set the MQTT_BROKER environment variable | ||||
|  Before running the client, define `MQTT_BROKER` to your broker address. | ||||
|  - Without URI scheme (defaults to TCP): | ||||
|    ```bash | ||||
|    export MQTT_BROKER=localhost:1883 | ||||
|    ``` | ||||
|  - With URI scheme: | ||||
|    ```bash | ||||
|    export MQTT_BROKER=tcp://broker.example.com:1883 | ||||
|    ``` | ||||
|  | ||||
|  ## 3. Run the LED board client | ||||
|  Pass the LED board IP address as the only argument: | ||||
|  ```bash | ||||
|  export MQTT_BROKER=localhost:1883 | ||||
|  cargo run --bin ledboard_client -- 192.168.1.50 | ||||
|  ``` | ||||
|  | ||||
|  ## Topics and Payloads | ||||
|  The client publishes two topics: | ||||
|  | ||||
|  ### weather | ||||
|  JSON payload with fields: | ||||
|  - `dt`: timestamp (Unix seconds) | ||||
|  - `temp`: temperature in °C | ||||
|  - `weather`: object with `main`, `description`, `icon` | ||||
|  - `rain`: rain volume in last 3h (optional) | ||||
|  - `pop`: probability of precipitation | ||||
|  - `wind`: object with `speed`, `deg`, `gust` | ||||
|  | ||||
|  ### straba | ||||
|  JSON payload with fields: | ||||
|  - `outbound_station`: name of outbound station | ||||
|  - `outbound_diff`: seconds until outbound departure | ||||
|  - `inbound_station`: name of inbound station | ||||
|  - `inbound_diff`: seconds until inbound departure | ||||
|  | ||||
|  ## Customization | ||||
|  You can adjust MQTT topics, QoS, and message formats in `client/bin/src/main.rs` under the `publish_to_mqtt` function. | ||||
| @@ -21,4 +21,5 @@ serde = "1.0" | ||||
| serde_derive = "1.0" | ||||
| serde_json = "1.0" | ||||
| # end of web stuff | ||||
| paho-mqtt = "0.13.2" | ||||
| ping = "0.4.1" | ||||
|   | ||||
| @@ -23,6 +23,7 @@ use std::process::ExitCode; | ||||
|  | ||||
| use openweathermap::forecast::Forecast; | ||||
| use straba::NextDeparture; | ||||
| use paho_mqtt::{Client, CreateOptionsBuilder, ConnectOptionsBuilder, Message}; | ||||
| // This declaration will look for a file named `straba.rs` and will | ||||
| // insert its contents inside a module named `straba` under this scope | ||||
| mod straba; | ||||
| @@ -302,12 +303,11 @@ fn send_package(ipaddress: String, | ||||
|  | ||||
|     render_clock(&mut display); | ||||
|  | ||||
|  | ||||
|     package[1..PACKAGE_LENGTH].copy_from_slice(&display.image); | ||||
|     // client need to bind to client port (1 before 4242) | ||||
|     let socket = UdpSocket::bind("0.0.0.0:14242").expect("couldn't bind to address"); | ||||
|     let target = format!("{}:4242", ipaddress); | ||||
|     let socket = UdpSocket::bind("0.0.0.0:0").expect("couldn't bind to address"); | ||||
|     socket | ||||
|         .send_to(&package, ipaddress + ":4242") | ||||
|         .send_to(&package, &target) | ||||
|         .expect("couldn't send data"); | ||||
| } | ||||
|  | ||||
| @@ -322,30 +322,56 @@ LEDboardClient <ip address>" | ||||
|  | ||||
| fn check_connection(ipaddress: String) -> bool { | ||||
|     let device_online; | ||||
|     // generate a faulty package length | ||||
|     let mut package: [u8; PACKAGE_LENGTH/2] = [0; PACKAGE_LENGTH/2]; | ||||
|     // client need to bind to client port (1 before 4242) | ||||
|     let socket = UdpSocket::bind("0.0.0.0:14242").expect("couldn't bind to address"); | ||||
|     socket.set_read_timeout(Some(Duration::from_secs(10))).unwrap(); /* 10 seconds timeout */ | ||||
|     socket | ||||
|         .send_to(&package, ipaddress + ":4242") | ||||
|         .expect("couldn't send data"); | ||||
|  | ||||
|     // self.recv_buff is a [u8; 8092] | ||||
|     let answer = socket.recv_from(&mut package); | ||||
|     match answer { | ||||
|         Ok((_n, _addr)) => { | ||||
|             //println!("{} bytes response from {:?} {:?}", n, addr, &package[..n]); | ||||
|             device_online = true; | ||||
|         } | ||||
|         Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { | ||||
|             device_online = false; | ||||
|         } | ||||
|         Err(_e) => { | ||||
|             device_online = false; | ||||
|         } | ||||
|      | ||||
|     // Use a random local port instead of hardcoding | ||||
|     let socket = UdpSocket::bind("0.0.0.0:0").expect("couldn't bind to address"); | ||||
|     socket.set_read_timeout(Some(Duration::from_secs(10))).unwrap(); | ||||
|      | ||||
|     let target = format!("{}:4242", ipaddress); | ||||
|     match socket.send_to(&package, &target) { | ||||
|         Ok(_) => { | ||||
|             // Continue with receive | ||||
|             match socket.recv_from(&mut package) { | ||||
|                 Ok((_n, _addr)) => device_online = true, | ||||
|                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => device_online = false, | ||||
|                 Err(_) => device_online = false, | ||||
|             } | ||||
|         }, | ||||
|         Err(_) => device_online = false, | ||||
|     } | ||||
|      | ||||
|     device_online | ||||
| } | ||||
| /// Publishes weather and transit data to MQTT broker | ||||
| fn publish_to_mqtt(client: &Client, data: &Option<Result<Forecast, String>>, straba_res: &NextDeparture) { | ||||
|     let payload = if let Some(Ok(forecast)) = data { | ||||
|         if let Some(f) = forecast.list.first() { | ||||
|             let temp = f.main.temp; | ||||
|             let weather = f.weather.get(0).map(|w| w.main.clone()).unwrap_or_default(); | ||||
|             format!("temp:{:.1}C,weather:{}", | ||||
|                 temp, | ||||
|                 weather) | ||||
|        } else { | ||||
|             "no_forecast".to_string() | ||||
|         } | ||||
|     } else { | ||||
|         "no_data".to_string() | ||||
|     }; | ||||
|     let msg = Message::new("ledboard/forecast", payload, 1); | ||||
|     if let Err(e) = client.publish(msg) { | ||||
|         eprintln!("Error publishing MQTT message: {}", e); | ||||
|     } | ||||
|  | ||||
|     let payloadPT = { | ||||
|             format!("out:{}min,in:{}min", | ||||
|                 straba_res.outbound_diff / 60, | ||||
|                 straba_res.inbound_diff / 60) | ||||
|     }; | ||||
|     let ptmsg = Message::new("ledboard/public_transportation", payloadPT, 1); | ||||
|     if let Err(e) = client.publish(ptmsg) { | ||||
|         eprintln!("Error publishing MQTT message: {}", e); | ||||
|     } | ||||
|     return device_online; | ||||
| } | ||||
|  | ||||
| fn main() -> ExitCode { | ||||
| @@ -361,61 +387,25 @@ fn main() -> ExitCode { | ||||
|         2 => { | ||||
|             let ip = &args[1]; | ||||
|  | ||||
|  | ||||
|             let mut device_online = check_connection(ip.to_string()); | ||||
|             if !device_online { | ||||
|                 println!("{} not online", ip); | ||||
|                 return ExitCode::FAILURE; | ||||
|             } | ||||
|  | ||||
|             let receiver = openweathermap::init_forecast("Mannheim", | ||||
|             "metric", | ||||
|             "de", | ||||
|             "978882ab9dd05e7122ff2b0aef2d3e55", | ||||
|             60,1); | ||||
|  | ||||
|             let mut last_data = Option::None; | ||||
|              | ||||
|             // Test Webcrawler for public transportataion | ||||
|             let mut straba_res = straba::fetch_data(Some(true)); | ||||
|             println!("{:?} {:?}s", straba_res.outbound_station, straba_res.outbound_diff); | ||||
|             println!("{:?} {:?}s", straba_res.inbound_station , straba_res.inbound_diff); | ||||
|  | ||||
|             // Render start | ||||
|             send_package(ip.to_string(), &last_data, &straba_res); | ||||
|             loop { | ||||
|                 let st_now = SystemTime::now(); | ||||
|                 let seconds = st_now.duration_since(UNIX_EPOCH).unwrap().as_secs(); | ||||
|                 let delay = time::Duration::from_millis(500); | ||||
|                 thread::sleep(delay); | ||||
|                 // Only request, if the device is present | ||||
|                 if device_online == true { | ||||
|                     let answer = openweathermap::update_forecast(&receiver); | ||||
|                     match answer { | ||||
|                         Some(_) => { | ||||
|                             last_data = answer; | ||||
|                         } | ||||
|                         None => { | ||||
|              | ||||
|                         } | ||||
|                     } | ||||
|             // Read broker URL from environment | ||||
|             let broker = match std::env::var("MQTT_BROKER") { | ||||
|                 Ok(val) if !val.is_empty() => val, | ||||
|                 _ => { | ||||
|                     eprintln!("Environment variable MQTT_BROKER not set or empty, MQTT disabled"); | ||||
|                     String::new() | ||||
|                 } | ||||
|             }; | ||||
|  | ||||
|                 if (straba_res.request_time + 50) < seconds as i64 { | ||||
|                     device_online = check_connection(ip.to_string()); | ||||
|                     // request once a minute new data | ||||
|                     if device_online == true { | ||||
|                         straba_res = straba::fetch_data(None); | ||||
|                         println!("Update {:?} {:?}s", straba_res.outbound_station, straba_res.outbound_diff); | ||||
|                         println!("Update {:?} {:?}s", straba_res.inbound_station , straba_res.inbound_diff); | ||||
|                     } | ||||
|                 } | ||||
|             return main_function(ip, Some(broker)) | ||||
|  | ||||
|                 if device_online == true { | ||||
|                     // Render new image | ||||
|                     send_package(ip.to_string(), &last_data, &straba_res); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|          // two argument passed | ||||
|          3 => { | ||||
|             let ip = &args[1]; | ||||
|             let mqtt = &args[2]; | ||||
|             return main_function(   ip, | ||||
|                                     Some(mqtt.to_string()) | ||||
|                                 ); | ||||
|         } | ||||
|         // all the other cases | ||||
|         _ => { | ||||
| @@ -425,3 +415,100 @@ fn main() -> ExitCode { | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| fn main_function(ip: &String, mqttBroker: Option<String>) -> ExitCode { | ||||
|     let mut device_online: bool = check_connection(ip.to_string()); | ||||
|     if !device_online { | ||||
|         println!("{} not online", ip); | ||||
|         return ExitCode::FAILURE; | ||||
|     } | ||||
|  | ||||
|              | ||||
|  | ||||
|     let receiver = openweathermap::init_forecast("Mannheim", | ||||
|     "metric", | ||||
|     "de", | ||||
|     "978882ab9dd05e7122ff2b0aef2d3e55", | ||||
|     60,1); | ||||
|  | ||||
|     let mut last_data = Option::None; | ||||
|              | ||||
|     // Test Webcrawler for public transportataion | ||||
|     let mut straba_res = straba::fetch_data(Some(true)); | ||||
|     println!("{:?} {:?}s", straba_res.outbound_station, straba_res.outbound_diff); | ||||
|     println!("{:?} {:?}s", straba_res.inbound_station , straba_res.inbound_diff); | ||||
|  | ||||
|     // Initialize MQTT client from MQTT_BROKER env var (else disabled) | ||||
|     let mqtt_client: Option<Client> = { | ||||
|         if mqttBroker.is_none() { | ||||
|             None | ||||
|         } else if mqttBroker.is_some() { | ||||
|             let create_opts = CreateOptionsBuilder::new() | ||||
|                 .server_uri(mqttBroker.clone().unwrap()) | ||||
|                 .client_id("ledboard_client") | ||||
|                 .finalize(); | ||||
|             match Client::new(create_opts) { | ||||
|                 Ok(cli) => { | ||||
|                     let conn_opts = ConnectOptionsBuilder::new() | ||||
|                         .keep_alive_interval(Duration::from_secs(20)) | ||||
|                         .clean_session(true) | ||||
|                         .finalize(); | ||||
|                     match cli.connect(conn_opts) { | ||||
|                         Ok(_) => Some(cli), | ||||
|                         Err(e) => { | ||||
|                             eprintln!("Failed to connect to MQTT broker '{}': {}", mqttBroker.clone().unwrap(), e); | ||||
|                             None | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|                 Err(e) => { | ||||
|                     eprintln!("Failed to create MQTT client for '{}': {}", mqttBroker.clone().unwrap(), e); | ||||
|                     None | ||||
|                 } | ||||
|             } | ||||
|         } else { | ||||
|             None | ||||
|         } | ||||
|     }; | ||||
|  | ||||
|     // Render start | ||||
|     send_package(ip.to_string(), &last_data, &straba_res); | ||||
|     loop { | ||||
|         let st_now = SystemTime::now(); | ||||
|         let seconds = st_now.duration_since(UNIX_EPOCH).unwrap().as_secs(); | ||||
|         let delay = time::Duration::from_millis(500); | ||||
|         thread::sleep(delay); | ||||
|         // Only request, if the device is present | ||||
|         if device_online == true { | ||||
|             let answer = openweathermap::update_forecast(&receiver); | ||||
|             match answer { | ||||
|                 Some(_) => { | ||||
|                     last_data = answer; | ||||
|                 } | ||||
|                 None => { | ||||
|  | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         if (straba_res.request_time + 50) < seconds as i64 { | ||||
|             device_online = check_connection(ip.to_string()); | ||||
|             // request once a minute new data | ||||
|             if device_online == true { | ||||
|                 straba_res = straba::fetch_data(None); | ||||
|                 println!("Update {:?} {:?}s", straba_res.outbound_station, straba_res.outbound_diff); | ||||
|                 println!("Update {:?} {:?}s", straba_res.inbound_station , straba_res.inbound_diff); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         if device_online == true { | ||||
|             // Render new image | ||||
|             send_package(ip.to_string(), &last_data, &straba_res); | ||||
|             // Publish data to MQTT | ||||
|        } | ||||
|          if let Some(ref client) = mqtt_client { | ||||
|                 publish_to_mqtt(client, &last_data, &straba_res); | ||||
|             } | ||||
|  | ||||
|     } | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user