From 1afa8fbe81666c856a412a70f9f3829f2faa21a1 Mon Sep 17 00:00:00 2001 From: Ollo Date: Wed, 10 Jan 2024 21:44:23 +0100 Subject: [PATCH] MQTT client working outside main loop --- client/bin/src/main.rs | 65 ++++++++++++++++++++++++------------------ 1 file changed, 37 insertions(+), 28 deletions(-) diff --git a/client/bin/src/main.rs b/client/bin/src/main.rs index 2ab4124..bb2066a 100644 --- a/client/bin/src/main.rs +++ b/client/bin/src/main.rs @@ -1,10 +1,11 @@ use std::{time::Duration, fmt::format}; +use paho_mqtt::{Message, client}; use str; use bit::BitIndex; use chrono_tz::Europe::Berlin; use chrono::{DateTime, NaiveDateTime, Utc, Timelike}; use std::time::{SystemTime, UNIX_EPOCH}; -use openweathermap::forecast::Weather; +use openweathermap::{forecast::Weather, Receiver}; use substring::Substring; use tinybmp::Bmp; use core::time; @@ -355,7 +356,7 @@ fn main_function(ipaddress: String, mqtt: Option) -> ExitCode { println!("{:} not online", &ipaddress); return ExitCode::FAILURE; } - let mut mqtt_rx; + let mut mqtt_client: Option = None; if mqtt.is_some() { let mqtt_ip: String = mqtt.clone().unwrap(); // Define the set of options for the create. @@ -365,31 +366,27 @@ fn main_function(ipaddress: String, mqtt: Option) -> ExitCode { .client_id("ledboard") .finalize(); // Create a client. - let mqtt_client = paho_mqtt::Client::new(create_opts); - if mqtt_client.is_ok() { - println!("Connecting to {:} MQTT server...", mqtt_ip); - let cli = mqtt_client.unwrap(); - // Define the set of options for the connection. - let conn_opts = paho_mqtt::ConnectOptionsBuilder::new() - .keep_alive_interval(Duration::from_secs(20)) - .clean_session(true) - .finalize(); + let local_mqtt = paho_mqtt::Client::new(create_opts).unwrap(); + + println!("Connecting to {:} MQTT server...", mqtt_ip); + + // Define the set of options for the connection. + let conn_opts = paho_mqtt::ConnectOptionsBuilder::new() + .keep_alive_interval(Duration::from_secs(20)) + .clean_session(true) + .finalize(); - // Initialize the consumer before connecting. - mqtt_rx = cli.start_consuming(); - - // Connect and wait for it to complete or fail. - if let Err(e) = cli.connect(conn_opts) { - println!("Unable to connect:\n\t{:?}", e); - return ExitCode::FAILURE; - } - if let Err(e) = cli.subscribe("room/ledboard", 0) { - println!("Error subscribes topics: {:?}", e); - return ExitCode::FAILURE; - } - } else { - println!("{:} not online", &mqtt_ip); + // Connect and wait for it to complete or fail. + if let Err(e) = local_mqtt.connect(conn_opts) { + println!("Unable to connect:\n\t{:?}", e); + return ExitCode::FAILURE; } + if let Err(e) = local_mqtt.subscribe("room/ledboard", 0) { + println!("Error subscribes topics: {:?}", e); + return ExitCode::FAILURE; + } + // move local instance to global scope + mqtt_client = Some(local_mqtt); } let receiver = openweathermap::init_forecast("Mannheim", @@ -407,6 +404,9 @@ fn main_function(ipaddress: String, mqtt: Option) -> ExitCode { // Render start send_package(ipaddress.clone(), &last_data, &straba_res); + + // Initialize the consumer before connecting. + let mqtt_rx = mqtt_client.and_then(|client|{ return Some(client.start_consuming());}); loop { let st_now = SystemTime::now(); let seconds = st_now.duration_since(UNIX_EPOCH).unwrap().as_secs(); @@ -442,9 +442,18 @@ fn main_function(ipaddress: String, mqtt: Option) -> ExitCode { // Handle MQTT messages // FIXME https://www.emqx.com/en/blog/how-to-use-mqtt-in-rust - if mqtt_rx.is - let cli = mqtt_client.unwrap(); - } + //if mqtt_rx.is + // let cli = mqtt_client.unwrap(); + //} + if mqtt_rx.is_some() { + println!("MQTT Client present"); + //for msg in mqtt_rx.as_ref().unwrap().try_iter() { + for msg in mqtt_rx.unwrap().iter() { + if let Some(msg) = msg { + println!("MQTT {}", msg); + } + } + } } }