diff --git a/client/bin/Cargo.toml b/client/bin/Cargo.toml index bf4e525..749b1ec 100644 --- a/client/bin/Cargo.toml +++ b/client/bin/Cargo.toml @@ -22,7 +22,8 @@ serde_derive = "1.0" serde_json = "1.0" # end of web stuff ping = "0.4.1" -paho-mqtt = "0.12.3" +paho-mqtt = "0.13.2" +async-trait = "0.1" # Ini File parser rust-ini = "0.21" lazy_static = "1.4" diff --git a/client/bin/src/main.rs b/client/bin/src/main.rs index 4fdc26f..968c273 100644 --- a/client/bin/src/main.rs +++ b/client/bin/src/main.rs @@ -1,6 +1,7 @@ use std::{time::Duration, fmt::format}; use std::sync::{RwLock, Mutex, Arc}; -use paho_mqtt; +use async_trait::async_trait; +use paho_mqtt::{AsyncClient,Message as MqttMessage}; use str; use bit::BitIndex; use chrono_tz::Europe::Berlin; @@ -454,6 +455,35 @@ lazy_static! { mqttPrefix : "test/ledboard".to_string() }; } + + +/// Asynchronously publishes a message to the specified topic. +/// +/// @author Gwen2.5 +/// +/// # Arguments +/// * `mac` - Paho async client +/// * `topic` - The MQTT topic to which the message will be published. +/// * `message` - The message to be published. +/// +/// # Returns +/// True if the message was successfully published, false otherwise. +async fn publish_message(ref mac: AsyncClient, topic: &str, message: &str) -> bool { + let msg = MqttMessage::new(topic, message, 0); + + // Publish the message and ensure it completes without error + let token = match mac.publish(msg).await { + Ok(_token) => { + return true; + }, + Err(err) => { + eprintln!("Failed to publish message: {:?}", err); + return false; + } + }; + +} + fn main_function(parameter1: String, parameter2: Option) -> ExitCode { // Read configuration file @@ -576,18 +606,14 @@ fn main_function(parameter1: String, parameter2: Option) -> ExitCode { } else if GlobalConfiguration.mqttPrefix.len() > 0 { - let topicInStation = format!("{}{}", GlobalConfiguration.mqttPrefix, "/inbound/station"); - let payload = straba_res.outbound_station.as_bytes(); //Get the payload as a &[u8] - - let mc = mqtt_client.as_ref().unwrap(); - - let message = paho_mqtt::Message::new ( topicInStation.as_str(), - payload, - 0 - ); - - mc.publish(message); - println!("MQTT published {:?} = {:?}s", topicInStation, straba_res.outbound_station); + let topicInStation: String = format!("{}{}", GlobalConfiguration.mqttPrefix, "/inbound/station"); + let stationName: String = format!("{}", straba_res.inbound_station); + if (mqtt_client.is_some()) { + publish_message(mqtt_client.unwrap(), topicInStation.as_str(), stationName.as_str()); + println!("MQTT published {:?} = {:?}s", topicInStation, straba_res.outbound_station); + } else { + println!("MQTT not ready... {:?} = {:?}s", topicInStation, straba_res.outbound_station); + } }