diff --git a/client/bin/src/main.rs b/client/bin/src/main.rs index 294d3ee..a773be2 100644 --- a/client/bin/src/main.rs +++ b/client/bin/src/main.rs @@ -1,7 +1,7 @@ use std::{time::Duration, fmt::format}; use std::sync::{RwLock, Arc}; use async_trait::async_trait; -use paho_mqtt::{AsyncClient,Message as MqttMessage}; +use paho_mqtt::{Client,Message as MqttMessage}; use str; use bit::BitIndex; use chrono_tz::Europe::Berlin; @@ -54,7 +54,7 @@ const PACKAGE_LENGTH: usize = (IMAGE_LENGTH + 1) as usize; /// The `MqttClient` struct contains various configuration parameters that control the behavior of the application. pub struct MqttClient { /// MQTT client option - pub mqtt_client: Option, + pub mqtt_client: Option, } impl MqttClient { @@ -66,12 +66,12 @@ impl MqttClient { } /// Setter for mqtt_client - pub fn set_mqtt_client(&mut self, client: Option) { + pub fn set_mqtt_client(&mut self, client: Option) { self.mqtt_client = client; } /// Getter for mqtt_client - pub fn get_mqtt_client(&self) -> &Option { + pub fn get_mqtt_client(&self) -> &Option { &self.mqtt_client } } @@ -339,7 +339,7 @@ type UserTopics = RwLock>; // Callback for a successful connection to the broker. // We subscribe to the topic(s) we want here. -fn mqtt_on_connect_success(cli: &paho_mqtt::AsyncClient, _msgid: u16) { +fn mqtt_on_connect_success(cli: &paho_mqtt::Client, _msgid: u16) { println!("MQTT | Connection succeeded"); // Subscribe to the desired topic(s). @@ -353,10 +353,9 @@ fn mqtt_on_connect_success(cli: &paho_mqtt::AsyncClient, _msgid: u16) { // from within a callback. But in this case, we know that the client is // *not* conected, and thus not doing anything important. So we don't worry // too much about stopping its callback thread. -fn mqtt_on_connect_failure(cli: &paho_mqtt::AsyncClient, _msgid: u16, rc: i32) { +fn mqtt_on_connect_failure(cli: &paho_mqtt::Client, _msgid: u16, rc: i32) { println!("MQTT | Connection attempt failed with error code {}.\n", rc); - thread::sleep(Duration::from_millis(2500)); - cli.reconnect_with_callbacks(mqtt_on_connect_success, mqtt_on_connect_failure); + //FIXME exit } fn send_package(ipaddress: String, @@ -460,20 +459,18 @@ lazy_static! { /// /// # Returns /// True if the message was successfully published, false otherwise. -async fn publish_message(topic: &str, message: &str) -> bool { +fn publish_message(topic: &str, message: &str) { let msg = MqttMessage::new(topic, message, 0); - let mqtt_client = as Clone>::clone(&(MQTTCLIENT.lock().unwrap().get_mqtt_client())).unwrap(); + let mqtt_client = as Clone>::clone(&(MQTTCLIENT.lock().unwrap().get_mqtt_client())).unwrap(); // Publish the message and ensure it completes without error - let token = match mqtt_client.publish(msg).await { - Ok(_token) => { - return true; - }, - Err(err) => { - eprintln!("Failed to publish message: {:?}", err); - return false; + let result = mqtt_client.publish(msg); + match result { + Ok(_) => (), + Err(error) => { + println!("Error publishing {error:?}"); } - }; + } } fn main_function(parameter1: String, parameter2: Option) -> ExitCode { @@ -529,7 +526,7 @@ fn main_function(parameter1: String, parameter2: Option) -> ExitCode { .client_id("ledboard") .finalize(); // Create a client. - let local_mqtt = paho_mqtt::AsyncClient::new(create_opts).unwrap(); + let local_mqtt = paho_mqtt::Client::new(create_opts).unwrap(); println!("MQTT | Connecting to {:} MQTT server...", GlobalConfiguration.lock().unwrap().mqttIPAddress); @@ -539,34 +536,30 @@ fn main_function(parameter1: String, parameter2: Option) -> ExitCode { .clean_session(true) .finalize(); - // Set a closure to be called whenever the client connection is established. - local_mqtt.set_connected_callback(|_cli: &paho_mqtt::AsyncClient| { - println!("Connected."); - }); - - // Set a closure to be called whenever the client loses the connection. - // It will attempt to reconnect, and set up function callbacks to keep - // retrying until the connection is re-established. - local_mqtt.set_connection_lost_callback(|cli: &paho_mqtt::AsyncClient| { - println!("Connection lost. Attempting reconnect."); - thread::sleep(Duration::from_millis(2500)); - cli.reconnect_with_callbacks(mqtt_on_connect_success, mqtt_on_connect_failure); - }); + // Starts the client receiving messages + let rx_queue = local_mqtt.start_consuming(); // Attach a closure to the client to receive callback // on incoming messages. let mqtt_message_for_callback = mqtt_message.clone(); - local_mqtt.set_message_callback(move |cli, msg| { - if let Some(msg) = msg { - let topic = msg.topic(); - let payload_str = msg.payload_str(); + + // Create a thread that stays pending over incoming messages. + let handle = thread::spawn(move || { + for mqttmsg in rx_queue.iter() { + if let Some(mqttmsg) = mqttmsg { + let topic = mqttmsg.topic(); + let payload_str = mqttmsg.payload_str(); - //println!("MQTT | {} - {}", topic, payload_str); - let mut lock = mqtt_message_for_callback.lock().unwrap(); - lock.string = Some(payload_str.to_string()) + //println!("MQTT | {} - {}", topic, payload_str); + let mut lock = mqtt_message_for_callback.lock().unwrap(); + lock.string = Some(payload_str.to_string()); + println!("Received: -> {}", mqttmsg.payload_str()); + } else { + println!("Unsubscribe: connection closed"); + break; + } } }); - // Define the set of options for the connection let lwt = paho_mqtt::Message::new(&format!("{}/lwt", GlobalConfiguration.lock().unwrap().mqttPrefix), "lost connection", 1); @@ -579,7 +572,7 @@ fn main_function(parameter1: String, parameter2: Option) -> ExitCode { // Make the connection to the broker println!("MQTT | Connecting to the MQTT server..."); - local_mqtt.connect_with_callbacks(conn_opts, mqtt_on_connect_success, mqtt_on_connect_failure); + local_mqtt.connect(conn_opts); // move local instance to global scope gmc.set_mqtt_client(Some(local_mqtt)); @@ -661,7 +654,7 @@ fn fun_publishinfoviamqtt(straba_res: &NextDeparture) { let topic_in_station: String = format!("{}{}", GlobalConfiguration.lock().unwrap().mqttPrefix, "/inbound/station"); let station_name: String = format!("{}", straba_res.inbound_station); // Execute async publish synchronously - let _ = block_on(publish_message(topic_in_station.as_str(), station_name.as_str())); + let _ = publish_message(topic_in_station.as_str(), station_name.as_str()); println!("MQTT published {:?} = {:?}s", topic_in_station, straba_res.outbound_station); }