diff --git a/client/bin/src/main.rs b/client/bin/src/main.rs index bd14f00..d3e5e07 100644 --- a/client/bin/src/main.rs +++ b/client/bin/src/main.rs @@ -510,49 +510,79 @@ fn main_function(parameter1: String, parameter2: Option) -> ExitCode { let mqtt_message: Arc> = Arc::new(Mutex::new(Message{ string: None })); if GlobalConfiguration.lock().is_ok() && (GlobalConfiguration.lock().unwrap().mqttIPAddress.len() > 0) - { - let lwt = paho_mqtt::Message::new(&format!("{}/lwt", GlobalConfiguration.lock().unwrap().mqttPrefix), "lost connection", 1); + { + // Create a thread that stays pending over incoming messages. + let handle = thread::spawn(move || { + + // Define the set of options for the create. + // Use an ID for a persistent session. + let create_opts = paho_mqtt::CreateOptionsBuilder::new() + .server_uri(GlobalConfiguration.lock().unwrap().mqttIPAddress.clone()) + .client_id("ledboard") + .finalize(); + // Create a client. + let local_mqtt = paho_mqtt::Client::new(create_opts).unwrap(); + + println!("MQTT | Connecting to {:} MQTT server...", GlobalConfiguration.lock().unwrap().mqttIPAddress); + + let lwt = paho_mqtt::Message::new(&format!("{}/lwt", GlobalConfiguration.lock().unwrap().mqttPrefix), "lost connection", 1); - // Make the connection to the broker - println!("MQTT | Connecting to the MQTT server..."); + // The connect options. Defaults to an MQTT v3.x connection. + let conn_opts = paho_mqtt::ConnectOptionsBuilder::new() + .keep_alive_interval(Duration::from_secs(20)) + .will_message(lwt) + .finalize(); - // Initialize MQTT client from MQTT_BROKER env var (else disabled) - let mqtt_client: Option = { - println!("MQTT | Connecting to {:} MQTT server...", GlobalConfiguration.lock().unwrap().mqttIPAddress); - // Read broker URL from environment - let broker = GlobalConfiguration.lock().unwrap().mqttIPAddress.clone(); - if broker.is_empty() { - None - } else { - let create_opts = CreateOptionsBuilder::new() - .server_uri(&broker) - .client_id("ledboard_client") - .finalize(); - match Client::new(create_opts) { - Ok(cli) => { - let conn_opts = ConnectOptionsBuilder::new() - .keep_alive_interval(Duration::from_secs(20)) - .clean_session(true) - .finalize(); - match cli.connect(conn_opts) { - Ok(_) => Some(cli), - Err(e) => { - eprintln!("MQTT | Failed to connect to MQTT broker '{}': {}", broker, e); - None - } - } - } - Err(e) => { - eprintln!("MQTT | Failed to create MQTT client for '{}': {}", broker, e); - None - } + // Make the connection to the broker + println!("MQTT | Connecting to the MQTT server..."); + let result = local_mqtt.connect(conn_opts); + + match result { + Ok(_) => { + println!("MQTT | Server connected"); + }, + Err(error) => { + println!("MQTT | Server connecting {error:?}"); + }, + } + + let thread_mqtt = local_mqtt.clone(); + // move local instance to global scope + gmc.set_mqtt_client(Some(local_mqtt)); + + // Attach a closure to the client to receive callback + // on incoming messages. + let mqtt_message_for_callback = mqtt_message.clone(); + + // Subscribe to the desired topic(s). + let resultSubscribe = thread_mqtt.subscribe("room/ledboard", paho_mqtt::QOS_0); + match resultSubscribe { + Ok(_) => { + println!("MQTT | Subscribe "); + }, + Err(error) => { + println!("MQTT | Subscribe failed {error:?}"); + }, + } + + // Starts the client receiving messages + let rx_queue = thread_mqtt.start_consuming(); + for mqttmsg in rx_queue.iter() { + if let Some(mqttmsg) = mqttmsg { + let topic = mqttmsg.topic(); + let payload_str = mqttmsg.payload_str(); + + //println!("MQTT | {} - {}", topic, payload_str); + let mut lock = mqtt_message_for_callback.lock().unwrap(); + lock.string = Some(payload_str.to_string()); + println!("MQTT | Received: -> {}", mqttmsg.payload_str()); + } else { + println!("MQTT | Unsubscribe: connection closed"); } } }; // Define the set of options for the connection - // move local instance to global scope - gmc.set_mqtt_client(mqtt_client); } let receiver = openweathermap::init_forecast("Mannheim",