MQTT client working outside main loop

This commit is contained in:
Ollo 2024-01-10 21:44:23 +01:00
parent b4a4cdd1ac
commit 1afa8fbe81

View File

@ -1,10 +1,11 @@
use std::{time::Duration, fmt::format}; use std::{time::Duration, fmt::format};
use paho_mqtt::{Message, client};
use str; use str;
use bit::BitIndex; use bit::BitIndex;
use chrono_tz::Europe::Berlin; use chrono_tz::Europe::Berlin;
use chrono::{DateTime, NaiveDateTime, Utc, Timelike}; use chrono::{DateTime, NaiveDateTime, Utc, Timelike};
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use openweathermap::forecast::Weather; use openweathermap::{forecast::Weather, Receiver};
use substring::Substring; use substring::Substring;
use tinybmp::Bmp; use tinybmp::Bmp;
use core::time; use core::time;
@ -355,7 +356,7 @@ fn main_function(ipaddress: String, mqtt: Option<String>) -> ExitCode {
println!("{:} not online", &ipaddress); println!("{:} not online", &ipaddress);
return ExitCode::FAILURE; return ExitCode::FAILURE;
} }
let mut mqtt_rx; let mut mqtt_client: Option<paho_mqtt::Client> = None;
if mqtt.is_some() { if mqtt.is_some() {
let mqtt_ip: String = mqtt.clone().unwrap(); let mqtt_ip: String = mqtt.clone().unwrap();
// Define the set of options for the create. // Define the set of options for the create.
@ -365,31 +366,27 @@ fn main_function(ipaddress: String, mqtt: Option<String>) -> ExitCode {
.client_id("ledboard") .client_id("ledboard")
.finalize(); .finalize();
// Create a client. // Create a client.
let mqtt_client = paho_mqtt::Client::new(create_opts); let local_mqtt = paho_mqtt::Client::new(create_opts).unwrap();
if mqtt_client.is_ok() {
println!("Connecting to {:} MQTT server...", mqtt_ip); println!("Connecting to {:} MQTT server...", mqtt_ip);
let cli = mqtt_client.unwrap();
// Define the set of options for the connection. // Define the set of options for the connection.
let conn_opts = paho_mqtt::ConnectOptionsBuilder::new() let conn_opts = paho_mqtt::ConnectOptionsBuilder::new()
.keep_alive_interval(Duration::from_secs(20)) .keep_alive_interval(Duration::from_secs(20))
.clean_session(true) .clean_session(true)
.finalize(); .finalize();
// Initialize the consumer before connecting.
mqtt_rx = cli.start_consuming();
// Connect and wait for it to complete or fail. // Connect and wait for it to complete or fail.
if let Err(e) = cli.connect(conn_opts) { if let Err(e) = local_mqtt.connect(conn_opts) {
println!("Unable to connect:\n\t{:?}", e); println!("Unable to connect:\n\t{:?}", e);
return ExitCode::FAILURE; return ExitCode::FAILURE;
} }
if let Err(e) = cli.subscribe("room/ledboard", 0) { if let Err(e) = local_mqtt.subscribe("room/ledboard", 0) {
println!("Error subscribes topics: {:?}", e); println!("Error subscribes topics: {:?}", e);
return ExitCode::FAILURE; return ExitCode::FAILURE;
} }
} else { // move local instance to global scope
println!("{:} not online", &mqtt_ip); mqtt_client = Some(local_mqtt);
}
} }
let receiver = openweathermap::init_forecast("Mannheim", let receiver = openweathermap::init_forecast("Mannheim",
@ -407,6 +404,9 @@ fn main_function(ipaddress: String, mqtt: Option<String>) -> ExitCode {
// Render start // Render start
send_package(ipaddress.clone(), &last_data, &straba_res); send_package(ipaddress.clone(), &last_data, &straba_res);
// Initialize the consumer before connecting.
let mqtt_rx = mqtt_client.and_then(|client|{ return Some(client.start_consuming());});
loop { loop {
let st_now = SystemTime::now(); let st_now = SystemTime::now();
let seconds = st_now.duration_since(UNIX_EPOCH).unwrap().as_secs(); let seconds = st_now.duration_since(UNIX_EPOCH).unwrap().as_secs();
@ -442,8 +442,17 @@ fn main_function(ipaddress: String, mqtt: Option<String>) -> ExitCode {
// Handle MQTT messages // Handle MQTT messages
// FIXME https://www.emqx.com/en/blog/how-to-use-mqtt-in-rust // FIXME https://www.emqx.com/en/blog/how-to-use-mqtt-in-rust
if mqtt_rx.is //if mqtt_rx.is
let cli = mqtt_client.unwrap(); // let cli = mqtt_client.unwrap();
//}
if mqtt_rx.is_some() {
println!("MQTT Client present");
//for msg in mqtt_rx.as_ref().unwrap().try_iter() {
for msg in mqtt_rx.unwrap().iter() {
if let Some(msg) = msg {
println!("MQTT {}", msg);
}
}
} }
} }
} }