From c69fb8370527aa9946fd94990cd7be93b6387825 Mon Sep 17 00:00:00 2001 From: Ollo Date: Fri, 6 Jun 2025 21:30:31 +0200 Subject: [PATCH] use always option2 for update MQTT --- client/bin/src/main.rs | 141 +++++++++++++++-------------------------- 1 file changed, 51 insertions(+), 90 deletions(-) diff --git a/client/bin/src/main.rs b/client/bin/src/main.rs index 3219f94..b58f106 100644 --- a/client/bin/src/main.rs +++ b/client/bin/src/main.rs @@ -444,32 +444,6 @@ lazy_static! { }); } - - -/// Asynchronously publishes a message to the specified topic. -/// -/// @author Gwen2.5 -/// -/// # Arguments -/// * `topic` - The MQTT topic to which the message will be published. -/// * `message` - The message to be published. -/// -/// # Returns -/// True if the message was successfully published, false otherwise. -fn publish_message(topic: &str, message: &str) { - let msg = MqttMessage::new(topic, message, 0); - - let mqtt_client = as Clone>::clone(&(MQTTCLIENT.lock().unwrap().get_mqtt_client())).unwrap(); - // Publish the message and ensure it completes without error - let result = mqtt_client.publish(msg); - match result { - Ok(_) => (), - Err(error) => { - println!("Error publishing {error:?}"); - } - } -} - fn main_function(parameter1: String, parameter2: Option) -> ExitCode { // Read configuration file @@ -527,70 +501,50 @@ fn main_function(parameter1: String, parameter2: Option) -> ExitCode { let mqtt_message: Arc> = Arc::new(Mutex::new(Message{ string: None })); if GlobalConfiguration.lock().is_ok() && (GlobalConfiguration.lock().unwrap().mqttIPAddress.len() > 0) - { - // Define the set of options for the create. - // Use an ID for a persistent session. - let create_opts = paho_mqtt::CreateOptionsBuilder::new() - .server_uri(GlobalConfiguration.lock().unwrap().mqttIPAddress.clone()) - .client_id("ledboard") - .finalize(); - // Create a client. - let local_mqtt = paho_mqtt::Client::new(create_opts).unwrap(); - - println!("MQTT | Connecting to {:} MQTT server...", GlobalConfiguration.lock().unwrap().mqttIPAddress); - + { let lwt = paho_mqtt::Message::new(&format!("{}/lwt", GlobalConfiguration.lock().unwrap().mqttPrefix), "lost connection", 1); - // The connect options. Defaults to an MQTT v3.x connection. - let conn_opts = paho_mqtt::ConnectOptionsBuilder::new() - .keep_alive_interval(Duration::from_secs(20)) - .will_message(lwt) - .finalize(); - // Make the connection to the broker println!("MQTT | Connecting to the MQTT server..."); - let result = local_mqtt.connect(conn_opts); - match result { - Ok(_) => { - println!("MQTT | Server connected"); - }, - Err(error) => { - println!("MQTT | Server connecting {error:?}"); - }, - } - - // Subscribe to the desired topic(s). - local_mqtt.subscribe("room/ledboard", paho_mqtt::QOS_0); - - // Starts the client receiving messages - let rx_queue = local_mqtt.start_consuming(); - - // Attach a closure to the client to receive callback - // on incoming messages. - let mqtt_message_for_callback = mqtt_message.clone(); - - // Create a thread that stays pending over incoming messages. - let handle = thread::spawn(move || { - for mqttmsg in rx_queue.iter() { - if let Some(mqttmsg) = mqttmsg { - let topic = mqttmsg.topic(); - let payload_str = mqttmsg.payload_str(); - - //println!("MQTT | {} - {}", topic, payload_str); - let mut lock = mqtt_message_for_callback.lock().unwrap(); - lock.string = Some(payload_str.to_string()); - println!("MQTT | Received: -> {}", mqttmsg.payload_str()); - } else { - println!("MQTT | Unsubscribe: connection closed"); - break; + // Initialize MQTT client from MQTT_BROKER env var (else disabled) + let mqtt_client: Option = { + println!("MQTT | Connecting to {:} MQTT server...", GlobalConfiguration.lock().unwrap().mqttIPAddress); + // Read broker URL from environment + let broker = GlobalConfiguration.lock().unwrap().mqttIPAddress.clone(); + if broker.is_empty() { + None + } else { + let create_opts = CreateOptionsBuilder::new() + .server_uri(&broker) + .client_id("ledboard_client") + .finalize(); + match Client::new(create_opts) { + Ok(cli) => { + let conn_opts = ConnectOptionsBuilder::new() + .keep_alive_interval(Duration::from_secs(20)) + .clean_session(true) + .finalize(); + match cli.connect(conn_opts) { + Ok(_) => Some(cli), + Err(e) => { + eprintln!("MQTT | Failed to connect to MQTT broker '{}': {}", broker, e); + None + } + } + } + Err(e) => { + eprintln!("MQTT | Failed to create MQTT client for '{}': {}", broker, e); + None + } } } - }); + }; + // Define the set of options for the connection // move local instance to global scope - gmc.set_mqtt_client(Some(local_mqtt)); + gmc.set_mqtt_client(mqtt_client); } let receiver = openweathermap::init_forecast("Mannheim", @@ -611,6 +565,19 @@ fn main_function(parameter1: String, parameter2: Option) -> ExitCode { // Render start send_package(GlobalConfiguration.lock().unwrap().panelIPAddress.clone(), &last_data, &straba_res, Some("MQTT: room/ledboard".to_string())); } + + + if GlobalConfiguration.lock().is_ok() && GlobalConfiguration.lock().unwrap().mqttPrefix.len() > 0 + { + if let Some(ref client) = &(MQTTCLIENT.lock().unwrap().get_mqtt_client()) { + publish_to_mqtt(client, &last_data, &straba_res); + } + else + { + eprintln!("MQTT | client for publishing not found"); + } + } + loop { let st_now = SystemTime::now(); @@ -639,7 +606,9 @@ fn main_function(parameter1: String, parameter2: Option) -> ExitCode { if GlobalConfiguration.lock().is_ok() && GlobalConfiguration.lock().unwrap().mqttPrefix.len() > 0 { //FIXME if mqtt_client.is_some() - fun_publishinfoviamqtt(&straba_res); + if let Some(ref client) = &(MQTTCLIENT.lock().unwrap().get_mqtt_client()) { + publish_to_mqtt(client, &last_data, &straba_res); + } } // request once a minute new data @@ -661,19 +630,11 @@ fn main_function(parameter1: String, parameter2: Option) -> ExitCode { // Render new image send_package(GlobalConfiguration.lock().unwrap().mqttIPAddress.clone(), &last_data, &straba_res, mqtt_message); } - // Handle MQTT messages + //FIXME Handle MQTT messages } } -fn fun_publishinfoviamqtt(straba_res: &NextDeparture) { - let topic_in_station: String = format!("{}{}", GlobalConfiguration.lock().unwrap().mqttPrefix, "/inbound/station"); - let station_name: String = format!("{}", straba_res.inbound_station); - // Execute async publish synchronously - let _ = publish_message(topic_in_station.as_str(), station_name.as_str()); - println!("MQTT | published {:?} = {:?}s", topic_in_station, straba_res.outbound_station); -} - fn main_function2(ip: String) -> ExitCode {