Merged
This commit is contained in:
@@ -511,48 +511,78 @@ fn main_function(parameter1: String, parameter2: Option<String>) -> ExitCode {
|
||||
|
||||
if GlobalConfiguration.lock().is_ok() && (GlobalConfiguration.lock().unwrap().mqttIPAddress.len() > 0)
|
||||
{
|
||||
let lwt = paho_mqtt::Message::new(&format!("{}/lwt", GlobalConfiguration.lock().unwrap().mqttPrefix), "lost connection", 1);
|
||||
// Create a thread that stays pending over incoming messages.
|
||||
let handle = thread::spawn(move || {
|
||||
|
||||
// Make the connection to the broker
|
||||
println!("MQTT | Connecting to the MQTT server...");
|
||||
// Define the set of options for the create.
|
||||
// Use an ID for a persistent session.
|
||||
let create_opts = paho_mqtt::CreateOptionsBuilder::new()
|
||||
.server_uri(GlobalConfiguration.lock().unwrap().mqttIPAddress.clone())
|
||||
.client_id("ledboard")
|
||||
.finalize();
|
||||
// Create a client.
|
||||
let local_mqtt = paho_mqtt::Client::new(create_opts).unwrap();
|
||||
|
||||
// Initialize MQTT client from MQTT_BROKER env var (else disabled)
|
||||
let mqtt_client: Option<Client> = {
|
||||
println!("MQTT | Connecting to {:} MQTT server...", GlobalConfiguration.lock().unwrap().mqttIPAddress);
|
||||
// Read broker URL from environment
|
||||
let broker = GlobalConfiguration.lock().unwrap().mqttIPAddress.clone();
|
||||
if broker.is_empty() {
|
||||
None
|
||||
} else {
|
||||
let create_opts = CreateOptionsBuilder::new()
|
||||
.server_uri(&broker)
|
||||
.client_id("ledboard_client")
|
||||
.finalize();
|
||||
match Client::new(create_opts) {
|
||||
Ok(cli) => {
|
||||
let conn_opts = ConnectOptionsBuilder::new()
|
||||
.keep_alive_interval(Duration::from_secs(20))
|
||||
.clean_session(true)
|
||||
.finalize();
|
||||
match cli.connect(conn_opts) {
|
||||
Ok(_) => Some(cli),
|
||||
Err(e) => {
|
||||
eprintln!("MQTT | Failed to connect to MQTT broker '{}': {}", broker, e);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("MQTT | Failed to create MQTT client for '{}': {}", broker, e);
|
||||
None
|
||||
}
|
||||
println!("MQTT | Connecting to {:} MQTT server...", GlobalConfiguration.lock().unwrap().mqttIPAddress);
|
||||
|
||||
let lwt = paho_mqtt::Message::new(&format!("{}/lwt", GlobalConfiguration.lock().unwrap().mqttPrefix), "lost connection", 1);
|
||||
|
||||
// The connect options. Defaults to an MQTT v3.x connection.
|
||||
let conn_opts = paho_mqtt::ConnectOptionsBuilder::new()
|
||||
.keep_alive_interval(Duration::from_secs(20))
|
||||
.will_message(lwt)
|
||||
.finalize();
|
||||
|
||||
// Make the connection to the broker
|
||||
println!("MQTT | Connecting to the MQTT server...");
|
||||
let result = local_mqtt.connect(conn_opts);
|
||||
|
||||
match result {
|
||||
Ok(_) => {
|
||||
println!("MQTT | Server connected");
|
||||
},
|
||||
Err(error) => {
|
||||
println!("MQTT | Server connecting {error:?}");
|
||||
},
|
||||
}
|
||||
|
||||
let thread_mqtt = local_mqtt.clone();
|
||||
// move local instance to global scope
|
||||
gmc.set_mqtt_client(Some(local_mqtt));
|
||||
|
||||
// Attach a closure to the client to receive callback
|
||||
// on incoming messages.
|
||||
let mqtt_message_for_callback = mqtt_message.clone();
|
||||
|
||||
// Subscribe to the desired topic(s).
|
||||
let resultSubscribe = thread_mqtt.subscribe("room/ledboard", paho_mqtt::QOS_0);
|
||||
match resultSubscribe {
|
||||
Ok(_) => {
|
||||
println!("MQTT | Subscribe ");
|
||||
},
|
||||
Err(error) => {
|
||||
println!("MQTT | Subscribe failed {error:?}");
|
||||
},
|
||||
}
|
||||
|
||||
// Starts the client receiving messages
|
||||
let rx_queue = thread_mqtt.start_consuming();
|
||||
for mqttmsg in rx_queue.iter() {
|
||||
if let Some(mqttmsg) = mqttmsg {
|
||||
let topic = mqttmsg.topic();
|
||||
let payload_str = mqttmsg.payload_str();
|
||||
|
||||
//println!("MQTT | {} - {}", topic, payload_str);
|
||||
let mut lock = mqtt_message_for_callback.lock().unwrap();
|
||||
lock.string = Some(payload_str.to_string());
|
||||
println!("MQTT | Received: -> {}", mqttmsg.payload_str());
|
||||
} else {
|
||||
println!("MQTT | Unsubscribe: connection closed");
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Define the set of options for the connection
|
||||
// move local instance to global scope
|
||||
gmc.set_mqtt_client(mqtt_client);
|
||||
}
|
||||
|
||||
let receiver = openweathermap::init_forecast("Mannheim",
|
||||
|
||||
Reference in New Issue
Block a user