Removed all Mqtt Async stuff
This commit is contained in:
parent
ce7afca13c
commit
35d65b6475
@ -1,7 +1,7 @@
|
|||||||
use std::{time::Duration, fmt::format};
|
use std::{time::Duration, fmt::format};
|
||||||
use std::sync::{RwLock, Arc};
|
use std::sync::{RwLock, Arc};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use paho_mqtt::{AsyncClient,Message as MqttMessage};
|
use paho_mqtt::{Client,Message as MqttMessage};
|
||||||
use str;
|
use str;
|
||||||
use bit::BitIndex;
|
use bit::BitIndex;
|
||||||
use chrono_tz::Europe::Berlin;
|
use chrono_tz::Europe::Berlin;
|
||||||
@ -54,7 +54,7 @@ const PACKAGE_LENGTH: usize = (IMAGE_LENGTH + 1) as usize;
|
|||||||
/// The `MqttClient` struct contains various configuration parameters that control the behavior of the application.
|
/// The `MqttClient` struct contains various configuration parameters that control the behavior of the application.
|
||||||
pub struct MqttClient {
|
pub struct MqttClient {
|
||||||
/// MQTT client option
|
/// MQTT client option
|
||||||
pub mqtt_client: Option<paho_mqtt::AsyncClient>,
|
pub mqtt_client: Option<paho_mqtt::Client>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MqttClient {
|
impl MqttClient {
|
||||||
@ -66,12 +66,12 @@ impl MqttClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Setter for mqtt_client
|
/// Setter for mqtt_client
|
||||||
pub fn set_mqtt_client(&mut self, client: Option<paho_mqtt::AsyncClient>) {
|
pub fn set_mqtt_client(&mut self, client: Option<paho_mqtt::Client>) {
|
||||||
self.mqtt_client = client;
|
self.mqtt_client = client;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Getter for mqtt_client
|
/// Getter for mqtt_client
|
||||||
pub fn get_mqtt_client(&self) -> &Option<paho_mqtt::AsyncClient> {
|
pub fn get_mqtt_client(&self) -> &Option<paho_mqtt::Client> {
|
||||||
&self.mqtt_client
|
&self.mqtt_client
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -339,7 +339,7 @@ type UserTopics = RwLock<Vec<String>>;
|
|||||||
|
|
||||||
// Callback for a successful connection to the broker.
|
// Callback for a successful connection to the broker.
|
||||||
// We subscribe to the topic(s) we want here.
|
// We subscribe to the topic(s) we want here.
|
||||||
fn mqtt_on_connect_success(cli: &paho_mqtt::AsyncClient, _msgid: u16) {
|
fn mqtt_on_connect_success(cli: &paho_mqtt::Client, _msgid: u16) {
|
||||||
println!("MQTT | Connection succeeded");
|
println!("MQTT | Connection succeeded");
|
||||||
|
|
||||||
// Subscribe to the desired topic(s).
|
// Subscribe to the desired topic(s).
|
||||||
@ -353,10 +353,9 @@ fn mqtt_on_connect_success(cli: &paho_mqtt::AsyncClient, _msgid: u16) {
|
|||||||
// from within a callback. But in this case, we know that the client is
|
// from within a callback. But in this case, we know that the client is
|
||||||
// *not* conected, and thus not doing anything important. So we don't worry
|
// *not* conected, and thus not doing anything important. So we don't worry
|
||||||
// too much about stopping its callback thread.
|
// too much about stopping its callback thread.
|
||||||
fn mqtt_on_connect_failure(cli: &paho_mqtt::AsyncClient, _msgid: u16, rc: i32) {
|
fn mqtt_on_connect_failure(cli: &paho_mqtt::Client, _msgid: u16, rc: i32) {
|
||||||
println!("MQTT | Connection attempt failed with error code {}.\n", rc);
|
println!("MQTT | Connection attempt failed with error code {}.\n", rc);
|
||||||
thread::sleep(Duration::from_millis(2500));
|
//FIXME exit
|
||||||
cli.reconnect_with_callbacks(mqtt_on_connect_success, mqtt_on_connect_failure);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn send_package(ipaddress: String,
|
fn send_package(ipaddress: String,
|
||||||
@ -460,20 +459,18 @@ lazy_static! {
|
|||||||
///
|
///
|
||||||
/// # Returns
|
/// # Returns
|
||||||
/// True if the message was successfully published, false otherwise.
|
/// True if the message was successfully published, false otherwise.
|
||||||
async fn publish_message(topic: &str, message: &str) -> bool {
|
fn publish_message(topic: &str, message: &str) {
|
||||||
let msg = MqttMessage::new(topic, message, 0);
|
let msg = MqttMessage::new(topic, message, 0);
|
||||||
|
|
||||||
let mqtt_client = <std::option::Option<AsyncClient> as Clone>::clone(&(MQTTCLIENT.lock().unwrap().get_mqtt_client())).unwrap();
|
let mqtt_client = <std::option::Option<Client> as Clone>::clone(&(MQTTCLIENT.lock().unwrap().get_mqtt_client())).unwrap();
|
||||||
// Publish the message and ensure it completes without error
|
// Publish the message and ensure it completes without error
|
||||||
let token = match mqtt_client.publish(msg).await {
|
let result = mqtt_client.publish(msg);
|
||||||
Ok(_token) => {
|
match result {
|
||||||
return true;
|
Ok(_) => (),
|
||||||
},
|
Err(error) => {
|
||||||
Err(err) => {
|
println!("Error publishing {error:?}");
|
||||||
eprintln!("Failed to publish message: {:?}", err);
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
};
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn main_function(parameter1: String, parameter2: Option<String>) -> ExitCode {
|
fn main_function(parameter1: String, parameter2: Option<String>) -> ExitCode {
|
||||||
@ -529,7 +526,7 @@ fn main_function(parameter1: String, parameter2: Option<String>) -> ExitCode {
|
|||||||
.client_id("ledboard")
|
.client_id("ledboard")
|
||||||
.finalize();
|
.finalize();
|
||||||
// Create a client.
|
// Create a client.
|
||||||
let local_mqtt = paho_mqtt::AsyncClient::new(create_opts).unwrap();
|
let local_mqtt = paho_mqtt::Client::new(create_opts).unwrap();
|
||||||
|
|
||||||
println!("MQTT | Connecting to {:} MQTT server...", GlobalConfiguration.lock().unwrap().mqttIPAddress);
|
println!("MQTT | Connecting to {:} MQTT server...", GlobalConfiguration.lock().unwrap().mqttIPAddress);
|
||||||
|
|
||||||
@ -539,34 +536,30 @@ fn main_function(parameter1: String, parameter2: Option<String>) -> ExitCode {
|
|||||||
.clean_session(true)
|
.clean_session(true)
|
||||||
.finalize();
|
.finalize();
|
||||||
|
|
||||||
// Set a closure to be called whenever the client connection is established.
|
// Starts the client receiving messages
|
||||||
local_mqtt.set_connected_callback(|_cli: &paho_mqtt::AsyncClient| {
|
let rx_queue = local_mqtt.start_consuming();
|
||||||
println!("Connected.");
|
|
||||||
});
|
|
||||||
|
|
||||||
// Set a closure to be called whenever the client loses the connection.
|
|
||||||
// It will attempt to reconnect, and set up function callbacks to keep
|
|
||||||
// retrying until the connection is re-established.
|
|
||||||
local_mqtt.set_connection_lost_callback(|cli: &paho_mqtt::AsyncClient| {
|
|
||||||
println!("Connection lost. Attempting reconnect.");
|
|
||||||
thread::sleep(Duration::from_millis(2500));
|
|
||||||
cli.reconnect_with_callbacks(mqtt_on_connect_success, mqtt_on_connect_failure);
|
|
||||||
});
|
|
||||||
|
|
||||||
// Attach a closure to the client to receive callback
|
// Attach a closure to the client to receive callback
|
||||||
// on incoming messages.
|
// on incoming messages.
|
||||||
let mqtt_message_for_callback = mqtt_message.clone();
|
let mqtt_message_for_callback = mqtt_message.clone();
|
||||||
local_mqtt.set_message_callback(move |cli, msg| {
|
|
||||||
if let Some(msg) = msg {
|
|
||||||
let topic = msg.topic();
|
|
||||||
let payload_str = msg.payload_str();
|
|
||||||
|
|
||||||
//println!("MQTT | {} - {}", topic, payload_str);
|
// Create a thread that stays pending over incoming messages.
|
||||||
let mut lock = mqtt_message_for_callback.lock().unwrap();
|
let handle = thread::spawn(move || {
|
||||||
lock.string = Some(payload_str.to_string())
|
for mqttmsg in rx_queue.iter() {
|
||||||
|
if let Some(mqttmsg) = mqttmsg {
|
||||||
|
let topic = mqttmsg.topic();
|
||||||
|
let payload_str = mqttmsg.payload_str();
|
||||||
|
|
||||||
|
//println!("MQTT | {} - {}", topic, payload_str);
|
||||||
|
let mut lock = mqtt_message_for_callback.lock().unwrap();
|
||||||
|
lock.string = Some(payload_str.to_string());
|
||||||
|
println!("Received: -> {}", mqttmsg.payload_str());
|
||||||
|
} else {
|
||||||
|
println!("Unsubscribe: connection closed");
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// Define the set of options for the connection
|
// Define the set of options for the connection
|
||||||
|
|
||||||
let lwt = paho_mqtt::Message::new(&format!("{}/lwt", GlobalConfiguration.lock().unwrap().mqttPrefix), "lost connection", 1);
|
let lwt = paho_mqtt::Message::new(&format!("{}/lwt", GlobalConfiguration.lock().unwrap().mqttPrefix), "lost connection", 1);
|
||||||
@ -579,7 +572,7 @@ fn main_function(parameter1: String, parameter2: Option<String>) -> ExitCode {
|
|||||||
|
|
||||||
// Make the connection to the broker
|
// Make the connection to the broker
|
||||||
println!("MQTT | Connecting to the MQTT server...");
|
println!("MQTT | Connecting to the MQTT server...");
|
||||||
local_mqtt.connect_with_callbacks(conn_opts, mqtt_on_connect_success, mqtt_on_connect_failure);
|
local_mqtt.connect(conn_opts);
|
||||||
|
|
||||||
// move local instance to global scope
|
// move local instance to global scope
|
||||||
gmc.set_mqtt_client(Some(local_mqtt));
|
gmc.set_mqtt_client(Some(local_mqtt));
|
||||||
@ -661,7 +654,7 @@ fn fun_publishinfoviamqtt(straba_res: &NextDeparture) {
|
|||||||
let topic_in_station: String = format!("{}{}", GlobalConfiguration.lock().unwrap().mqttPrefix, "/inbound/station");
|
let topic_in_station: String = format!("{}{}", GlobalConfiguration.lock().unwrap().mqttPrefix, "/inbound/station");
|
||||||
let station_name: String = format!("{}", straba_res.inbound_station);
|
let station_name: String = format!("{}", straba_res.inbound_station);
|
||||||
// Execute async publish synchronously
|
// Execute async publish synchronously
|
||||||
let _ = block_on(publish_message(topic_in_station.as_str(), station_name.as_str()));
|
let _ = publish_message(topic_in_station.as_str(), station_name.as_str());
|
||||||
println!("MQTT published {:?} = {:?}s", topic_in_station, straba_res.outbound_station);
|
println!("MQTT published {:?} = {:?}s", topic_in_station, straba_res.outbound_station);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user