From 35d65b64757c841bfb3ec9d511b440f379f97db0 Mon Sep 17 00:00:00 2001
From: Ollo <Ollo@Mobile>
Date: Fri, 25 Apr 2025 21:11:18 +0200
Subject: [PATCH] Removed all Mqtt Async stuff

---
 client/bin/src/main.rs | 77 +++++++++++++++++++-----------------------
 1 file changed, 35 insertions(+), 42 deletions(-)

diff --git a/client/bin/src/main.rs b/client/bin/src/main.rs
index 294d3ee..a773be2 100644
--- a/client/bin/src/main.rs
+++ b/client/bin/src/main.rs
@@ -1,7 +1,7 @@
 use std::{time::Duration, fmt::format};
 use std::sync::{RwLock, Arc};
 use async_trait::async_trait;
-use paho_mqtt::{AsyncClient,Message as MqttMessage};
+use paho_mqtt::{Client,Message as MqttMessage};
 use str;
 use bit::BitIndex;
 use chrono_tz::Europe::Berlin;
@@ -54,7 +54,7 @@ const PACKAGE_LENGTH: usize = (IMAGE_LENGTH + 1) as usize;
 /// The `MqttClient` struct contains various configuration parameters that control the behavior of the application.
 pub struct MqttClient {
     /// MQTT client option
-    pub mqtt_client: Option<paho_mqtt::AsyncClient>,
+    pub mqtt_client: Option<paho_mqtt::Client>,
 }
 
 impl MqttClient {
@@ -66,12 +66,12 @@ impl MqttClient {
     }
 
     /// Setter for mqtt_client
-    pub fn set_mqtt_client(&mut self, client: Option<paho_mqtt::AsyncClient>) {
+    pub fn set_mqtt_client(&mut self, client: Option<paho_mqtt::Client>) {
         self.mqtt_client = client;
     }
 
     /// Getter for mqtt_client
-    pub fn get_mqtt_client(&self) -> &Option<paho_mqtt::AsyncClient> {
+    pub fn get_mqtt_client(&self) -> &Option<paho_mqtt::Client> {
         &self.mqtt_client
     }
 }
@@ -339,7 +339,7 @@ type UserTopics = RwLock<Vec<String>>;
 
 // Callback for a successful connection to the broker.
 // We subscribe to the topic(s) we want here.
-fn mqtt_on_connect_success(cli: &paho_mqtt::AsyncClient, _msgid: u16) {
+fn mqtt_on_connect_success(cli: &paho_mqtt::Client, _msgid: u16) {
     println!("MQTT | Connection succeeded");
 
     // Subscribe to the desired topic(s).
@@ -353,10 +353,9 @@ fn mqtt_on_connect_success(cli: &paho_mqtt::AsyncClient, _msgid: u16) {
 // from  within a callback. But in this case, we know that the client is
 // *not* conected, and thus not doing anything important. So we don't worry
 // too much about stopping its callback thread.
-fn mqtt_on_connect_failure(cli: &paho_mqtt::AsyncClient, _msgid: u16, rc: i32) {
+fn mqtt_on_connect_failure(cli: &paho_mqtt::Client, _msgid: u16, rc: i32) {
     println!("MQTT | Connection attempt failed with error code {}.\n", rc);
-    thread::sleep(Duration::from_millis(2500));
-    cli.reconnect_with_callbacks(mqtt_on_connect_success, mqtt_on_connect_failure);
+    //FIXME exit 
 }
 
 fn send_package(ipaddress: String, 
@@ -460,20 +459,18 @@ lazy_static! {
 /// 
 /// # Returns
 /// True if the message was successfully published, false otherwise.
-async fn publish_message(topic: &str, message: &str) -> bool {
+fn publish_message(topic: &str, message: &str) {
     let msg = MqttMessage::new(topic, message, 0);
 
-    let mqtt_client =  <std::option::Option<AsyncClient> as Clone>::clone(&(MQTTCLIENT.lock().unwrap().get_mqtt_client())).unwrap();
+    let mqtt_client =  <std::option::Option<Client> as Clone>::clone(&(MQTTCLIENT.lock().unwrap().get_mqtt_client())).unwrap();
     // Publish the message and ensure it completes without error
-    let token = match mqtt_client.publish(msg).await {
-        Ok(_token) => {
-            return true;
-        },
-        Err(err) => {
-            eprintln!("Failed to publish message: {:?}", err);
-            return false;
+   let result =  mqtt_client.publish(msg);
+    match result {
+        Ok(_) => (),
+        Err(error) => {
+            println!("Error publishing {error:?}");
         }
-    };
+    }
 }
 
 fn main_function(parameter1: String, parameter2: Option<String>) -> ExitCode {
@@ -529,7 +526,7 @@ fn main_function(parameter1: String, parameter2: Option<String>) -> ExitCode {
         .client_id("ledboard")
         .finalize();
         // Create a client.
-        let local_mqtt = paho_mqtt::AsyncClient::new(create_opts).unwrap();
+        let local_mqtt = paho_mqtt::Client::new(create_opts).unwrap();
         
         println!("MQTT | Connecting to {:} MQTT server...", GlobalConfiguration.lock().unwrap().mqttIPAddress);
         
@@ -539,34 +536,30 @@ fn main_function(parameter1: String, parameter2: Option<String>) -> ExitCode {
             .clean_session(true)
             .finalize();
 
-        // Set a closure to be called whenever the client connection is established.
-        local_mqtt.set_connected_callback(|_cli: &paho_mqtt::AsyncClient| {
-            println!("Connected.");
-        });
-
-        // Set a closure to be called whenever the client loses the connection.
-        // It will attempt to reconnect, and set up function callbacks to keep
-        // retrying until the connection is re-established.
-        local_mqtt.set_connection_lost_callback(|cli: &paho_mqtt::AsyncClient| {
-            println!("Connection lost. Attempting reconnect.");
-            thread::sleep(Duration::from_millis(2500));
-            cli.reconnect_with_callbacks(mqtt_on_connect_success, mqtt_on_connect_failure);
-        });
+        // Starts the client receiving messages
+        let rx_queue = local_mqtt.start_consuming();
 
         // Attach a closure to the client to receive callback
         // on incoming messages.
         let mqtt_message_for_callback = mqtt_message.clone();
-        local_mqtt.set_message_callback(move |cli, msg| {
-            if let Some(msg) = msg {
-                let topic = msg.topic();
-                let payload_str = msg.payload_str();
+        
+        // Create a thread that stays pending over incoming messages.
+        let handle = thread::spawn(move || {
+            for mqttmsg in rx_queue.iter() {
+                if let Some(mqttmsg) = mqttmsg {
+                    let topic = mqttmsg.topic();
+                    let payload_str = mqttmsg.payload_str();
 
-                //println!("MQTT | {} - {}", topic, payload_str);
-                let mut lock = mqtt_message_for_callback.lock().unwrap();
-                lock.string = Some(payload_str.to_string())
+                    //println!("MQTT | {} - {}", topic, payload_str);
+                    let mut lock = mqtt_message_for_callback.lock().unwrap();
+                    lock.string = Some(payload_str.to_string());
+                    println!("Received: -> {}", mqttmsg.payload_str());
+                } else {
+                    println!("Unsubscribe: connection closed");
+                    break;
+                }
             }
         });
-
         // Define the set of options for the connection
 
         let lwt = paho_mqtt::Message::new(&format!("{}/lwt", GlobalConfiguration.lock().unwrap().mqttPrefix), "lost connection", 1);
@@ -579,7 +572,7 @@ fn main_function(parameter1: String, parameter2: Option<String>) -> ExitCode {
 
         // Make the connection to the broker
         println!("MQTT | Connecting to the MQTT server...");
-        local_mqtt.connect_with_callbacks(conn_opts, mqtt_on_connect_success, mqtt_on_connect_failure);
+        local_mqtt.connect(conn_opts);
 
         // move local instance to global scope
         gmc.set_mqtt_client(Some(local_mqtt));
@@ -661,7 +654,7 @@ fn fun_publishinfoviamqtt(straba_res: &NextDeparture) {
     let topic_in_station: String = format!("{}{}", GlobalConfiguration.lock().unwrap().mqttPrefix, "/inbound/station");
     let station_name: String = format!("{}", straba_res.inbound_station);
     // Execute async publish synchronously
-    let _ = block_on(publish_message(topic_in_station.as_str(), station_name.as_str()));
+    let _ = publish_message(topic_in_station.as_str(), station_name.as_str());
     println!("MQTT published {:?} = {:?}s", topic_in_station, straba_res.outbound_station);
 }