diff --git a/client/bin/src/main.rs b/client/bin/src/main.rs index 6a5f9f8..b86fa88 100644 --- a/client/bin/src/main.rs +++ b/client/bin/src/main.rs @@ -516,50 +516,62 @@ fn main_function(parameter1: String, parameter2: Option) -> ExitCode { if GlobalConfiguration.lock().is_ok() && (GlobalConfiguration.lock().unwrap().mqttIPAddress.len() > 0) { - // Define the set of options for the create. - // Use an ID for a persistent session. - let create_opts = paho_mqtt::CreateOptionsBuilder::new() - .server_uri(GlobalConfiguration.lock().unwrap().mqttIPAddress.clone()) - .client_id("ledboard") - .finalize(); - // Create a client. - let local_mqtt = paho_mqtt::Client::new(create_opts).unwrap(); - - println!("MQTT | Connecting to {:} MQTT server...", GlobalConfiguration.lock().unwrap().mqttIPAddress); - - let lwt = paho_mqtt::Message::new(&format!("{}/lwt", GlobalConfiguration.lock().unwrap().mqttPrefix), "lost connection", 1); - - // The connect options. Defaults to an MQTT v3.x connection. - let conn_opts = paho_mqtt::ConnectOptionsBuilder::new() - .keep_alive_interval(Duration::from_secs(20)) - .will_message(lwt) - .finalize(); - - // Make the connection to the broker - println!("MQTT | Connecting to the MQTT server..."); - let result = local_mqtt.connect(conn_opts); - - match result { - Ok(_) => { - println!("MQTT | Server connected"); - }, - Err(error) => { - println!("MQTT | Server connecting {error:?}"); - }, - } - - // Subscribe to the desired topic(s). - local_mqtt.subscribe("room/ledboard", paho_mqtt::QOS_0); - - // Starts the client receiving messages - let rx_queue = local_mqtt.start_consuming(); - - // Attach a closure to the client to receive callback - // on incoming messages. - let mqtt_message_for_callback = mqtt_message.clone(); - // Create a thread that stays pending over incoming messages. let handle = thread::spawn(move || { + + // Define the set of options for the create. + // Use an ID for a persistent session. + let create_opts = paho_mqtt::CreateOptionsBuilder::new() + .server_uri(GlobalConfiguration.lock().unwrap().mqttIPAddress.clone()) + .client_id("ledboard") + .finalize(); + // Create a client. + let local_mqtt = paho_mqtt::Client::new(create_opts).unwrap(); + + println!("MQTT | Connecting to {:} MQTT server...", GlobalConfiguration.lock().unwrap().mqttIPAddress); + + let lwt = paho_mqtt::Message::new(&format!("{}/lwt", GlobalConfiguration.lock().unwrap().mqttPrefix), "lost connection", 1); + + // The connect options. Defaults to an MQTT v3.x connection. + let conn_opts = paho_mqtt::ConnectOptionsBuilder::new() + .keep_alive_interval(Duration::from_secs(20)) + .will_message(lwt) + .finalize(); + + // Make the connection to the broker + println!("MQTT | Connecting to the MQTT server..."); + let result = local_mqtt.connect(conn_opts); + + match result { + Ok(_) => { + println!("MQTT | Server connected"); + }, + Err(error) => { + println!("MQTT | Server connecting {error:?}"); + }, + } + + let thread_mqtt = local_mqtt.clone(); + // move local instance to global scope + gmc.set_mqtt_client(Some(local_mqtt)); + + // Attach a closure to the client to receive callback + // on incoming messages. + let mqtt_message_for_callback = mqtt_message.clone(); + + // Subscribe to the desired topic(s). + let resultSubscribe = thread_mqtt.subscribe("room/ledboard", paho_mqtt::QOS_0); + match resultSubscribe { + Ok(_) => { + println!("MQTT | Subscribe "); + }, + Err(error) => { + println!("MQTT | Subscribe failed {error:?}"); + }, + } + + // Starts the client receiving messages + let rx_queue = thread_mqtt.start_consuming(); for mqttmsg in rx_queue.iter() { if let Some(mqttmsg) = mqttmsg { let topic = mqttmsg.topic(); @@ -568,17 +580,13 @@ fn main_function(parameter1: String, parameter2: Option) -> ExitCode { //println!("MQTT | {} - {}", topic, payload_str); let mut lock = mqtt_message_for_callback.lock().unwrap(); lock.string = Some(payload_str.to_string()); - println!("Received: -> {}", mqttmsg.payload_str()); + println!("MQTT | Received: -> {}", mqttmsg.payload_str()); } else { - println!("Unsubscribe: connection closed"); - break; + println!("MQTT | Unsubscribe: connection closed"); } } }); // Define the set of options for the connection - - // move local instance to global scope - gmc.set_mqtt_client(Some(local_mqtt)); } let receiver = openweathermap::init_forecast("Mannheim",