Moved all stuff into thread

This commit is contained in:
Ollo
2025-11-14 22:29:52 +01:00
parent 7180acd809
commit 349f69b8ca

View File

@@ -516,50 +516,62 @@ fn main_function(parameter1: String, parameter2: Option<String>) -> ExitCode {
if GlobalConfiguration.lock().is_ok() && (GlobalConfiguration.lock().unwrap().mqttIPAddress.len() > 0) if GlobalConfiguration.lock().is_ok() && (GlobalConfiguration.lock().unwrap().mqttIPAddress.len() > 0)
{ {
// Define the set of options for the create.
// Use an ID for a persistent session.
let create_opts = paho_mqtt::CreateOptionsBuilder::new()
.server_uri(GlobalConfiguration.lock().unwrap().mqttIPAddress.clone())
.client_id("ledboard")
.finalize();
// Create a client.
let local_mqtt = paho_mqtt::Client::new(create_opts).unwrap();
println!("MQTT | Connecting to {:} MQTT server...", GlobalConfiguration.lock().unwrap().mqttIPAddress);
let lwt = paho_mqtt::Message::new(&format!("{}/lwt", GlobalConfiguration.lock().unwrap().mqttPrefix), "lost connection", 1);
// The connect options. Defaults to an MQTT v3.x connection.
let conn_opts = paho_mqtt::ConnectOptionsBuilder::new()
.keep_alive_interval(Duration::from_secs(20))
.will_message(lwt)
.finalize();
// Make the connection to the broker
println!("MQTT | Connecting to the MQTT server...");
let result = local_mqtt.connect(conn_opts);
match result {
Ok(_) => {
println!("MQTT | Server connected");
},
Err(error) => {
println!("MQTT | Server connecting {error:?}");
},
}
// Subscribe to the desired topic(s).
local_mqtt.subscribe("room/ledboard", paho_mqtt::QOS_0);
// Starts the client receiving messages
let rx_queue = local_mqtt.start_consuming();
// Attach a closure to the client to receive callback
// on incoming messages.
let mqtt_message_for_callback = mqtt_message.clone();
// Create a thread that stays pending over incoming messages. // Create a thread that stays pending over incoming messages.
let handle = thread::spawn(move || { let handle = thread::spawn(move || {
// Define the set of options for the create.
// Use an ID for a persistent session.
let create_opts = paho_mqtt::CreateOptionsBuilder::new()
.server_uri(GlobalConfiguration.lock().unwrap().mqttIPAddress.clone())
.client_id("ledboard")
.finalize();
// Create a client.
let local_mqtt = paho_mqtt::Client::new(create_opts).unwrap();
println!("MQTT | Connecting to {:} MQTT server...", GlobalConfiguration.lock().unwrap().mqttIPAddress);
let lwt = paho_mqtt::Message::new(&format!("{}/lwt", GlobalConfiguration.lock().unwrap().mqttPrefix), "lost connection", 1);
// The connect options. Defaults to an MQTT v3.x connection.
let conn_opts = paho_mqtt::ConnectOptionsBuilder::new()
.keep_alive_interval(Duration::from_secs(20))
.will_message(lwt)
.finalize();
// Make the connection to the broker
println!("MQTT | Connecting to the MQTT server...");
let result = local_mqtt.connect(conn_opts);
match result {
Ok(_) => {
println!("MQTT | Server connected");
},
Err(error) => {
println!("MQTT | Server connecting {error:?}");
},
}
let thread_mqtt = local_mqtt.clone();
// move local instance to global scope
gmc.set_mqtt_client(Some(local_mqtt));
// Attach a closure to the client to receive callback
// on incoming messages.
let mqtt_message_for_callback = mqtt_message.clone();
// Subscribe to the desired topic(s).
let resultSubscribe = thread_mqtt.subscribe("room/ledboard", paho_mqtt::QOS_0);
match resultSubscribe {
Ok(_) => {
println!("MQTT | Subscribe ");
},
Err(error) => {
println!("MQTT | Subscribe failed {error:?}");
},
}
// Starts the client receiving messages
let rx_queue = thread_mqtt.start_consuming();
for mqttmsg in rx_queue.iter() { for mqttmsg in rx_queue.iter() {
if let Some(mqttmsg) = mqttmsg { if let Some(mqttmsg) = mqttmsg {
let topic = mqttmsg.topic(); let topic = mqttmsg.topic();
@@ -568,17 +580,13 @@ fn main_function(parameter1: String, parameter2: Option<String>) -> ExitCode {
//println!("MQTT | {} - {}", topic, payload_str); //println!("MQTT | {} - {}", topic, payload_str);
let mut lock = mqtt_message_for_callback.lock().unwrap(); let mut lock = mqtt_message_for_callback.lock().unwrap();
lock.string = Some(payload_str.to_string()); lock.string = Some(payload_str.to_string());
println!("Received: -> {}", mqttmsg.payload_str()); println!("MQTT | Received: -> {}", mqttmsg.payload_str());
} else { } else {
println!("Unsubscribe: connection closed"); println!("MQTT | Unsubscribe: connection closed");
break;
} }
} }
}); });
// Define the set of options for the connection // Define the set of options for the connection
// move local instance to global scope
gmc.set_mqtt_client(Some(local_mqtt));
} }
let receiver = openweathermap::init_forecast("Mannheim", let receiver = openweathermap::init_forecast("Mannheim",