use always option2 for update MQTT
This commit is contained in:
@@ -444,32 +444,6 @@ lazy_static! {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/// Asynchronously publishes a message to the specified topic.
|
|
||||||
///
|
|
||||||
/// @author Gwen2.5
|
|
||||||
///
|
|
||||||
/// # Arguments
|
|
||||||
/// * `topic` - The MQTT topic to which the message will be published.
|
|
||||||
/// * `message` - The message to be published.
|
|
||||||
///
|
|
||||||
/// # Returns
|
|
||||||
/// True if the message was successfully published, false otherwise.
|
|
||||||
fn publish_message(topic: &str, message: &str) {
|
|
||||||
let msg = MqttMessage::new(topic, message, 0);
|
|
||||||
|
|
||||||
let mqtt_client = <std::option::Option<Client> as Clone>::clone(&(MQTTCLIENT.lock().unwrap().get_mqtt_client())).unwrap();
|
|
||||||
// Publish the message and ensure it completes without error
|
|
||||||
let result = mqtt_client.publish(msg);
|
|
||||||
match result {
|
|
||||||
Ok(_) => (),
|
|
||||||
Err(error) => {
|
|
||||||
println!("Error publishing {error:?}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn main_function(parameter1: String, parameter2: Option<String>) -> ExitCode {
|
fn main_function(parameter1: String, parameter2: Option<String>) -> ExitCode {
|
||||||
|
|
||||||
// Read configuration file
|
// Read configuration file
|
||||||
@@ -528,69 +502,49 @@ fn main_function(parameter1: String, parameter2: Option<String>) -> ExitCode {
|
|||||||
|
|
||||||
if GlobalConfiguration.lock().is_ok() && (GlobalConfiguration.lock().unwrap().mqttIPAddress.len() > 0)
|
if GlobalConfiguration.lock().is_ok() && (GlobalConfiguration.lock().unwrap().mqttIPAddress.len() > 0)
|
||||||
{
|
{
|
||||||
// Define the set of options for the create.
|
|
||||||
// Use an ID for a persistent session.
|
|
||||||
let create_opts = paho_mqtt::CreateOptionsBuilder::new()
|
|
||||||
.server_uri(GlobalConfiguration.lock().unwrap().mqttIPAddress.clone())
|
|
||||||
.client_id("ledboard")
|
|
||||||
.finalize();
|
|
||||||
// Create a client.
|
|
||||||
let local_mqtt = paho_mqtt::Client::new(create_opts).unwrap();
|
|
||||||
|
|
||||||
println!("MQTT | Connecting to {:} MQTT server...", GlobalConfiguration.lock().unwrap().mqttIPAddress);
|
|
||||||
|
|
||||||
let lwt = paho_mqtt::Message::new(&format!("{}/lwt", GlobalConfiguration.lock().unwrap().mqttPrefix), "lost connection", 1);
|
let lwt = paho_mqtt::Message::new(&format!("{}/lwt", GlobalConfiguration.lock().unwrap().mqttPrefix), "lost connection", 1);
|
||||||
|
|
||||||
// The connect options. Defaults to an MQTT v3.x connection.
|
|
||||||
let conn_opts = paho_mqtt::ConnectOptionsBuilder::new()
|
|
||||||
.keep_alive_interval(Duration::from_secs(20))
|
|
||||||
.will_message(lwt)
|
|
||||||
.finalize();
|
|
||||||
|
|
||||||
// Make the connection to the broker
|
// Make the connection to the broker
|
||||||
println!("MQTT | Connecting to the MQTT server...");
|
println!("MQTT | Connecting to the MQTT server...");
|
||||||
let result = local_mqtt.connect(conn_opts);
|
|
||||||
|
|
||||||
match result {
|
// Initialize MQTT client from MQTT_BROKER env var (else disabled)
|
||||||
Ok(_) => {
|
let mqtt_client: Option<Client> = {
|
||||||
println!("MQTT | Server connected");
|
println!("MQTT | Connecting to {:} MQTT server...", GlobalConfiguration.lock().unwrap().mqttIPAddress);
|
||||||
},
|
// Read broker URL from environment
|
||||||
Err(error) => {
|
let broker = GlobalConfiguration.lock().unwrap().mqttIPAddress.clone();
|
||||||
println!("MQTT | Server connecting {error:?}");
|
if broker.is_empty() {
|
||||||
},
|
None
|
||||||
}
|
} else {
|
||||||
|
let create_opts = CreateOptionsBuilder::new()
|
||||||
// Subscribe to the desired topic(s).
|
.server_uri(&broker)
|
||||||
local_mqtt.subscribe("room/ledboard", paho_mqtt::QOS_0);
|
.client_id("ledboard_client")
|
||||||
|
.finalize();
|
||||||
// Starts the client receiving messages
|
match Client::new(create_opts) {
|
||||||
let rx_queue = local_mqtt.start_consuming();
|
Ok(cli) => {
|
||||||
|
let conn_opts = ConnectOptionsBuilder::new()
|
||||||
// Attach a closure to the client to receive callback
|
.keep_alive_interval(Duration::from_secs(20))
|
||||||
// on incoming messages.
|
.clean_session(true)
|
||||||
let mqtt_message_for_callback = mqtt_message.clone();
|
.finalize();
|
||||||
|
match cli.connect(conn_opts) {
|
||||||
// Create a thread that stays pending over incoming messages.
|
Ok(_) => Some(cli),
|
||||||
let handle = thread::spawn(move || {
|
Err(e) => {
|
||||||
for mqttmsg in rx_queue.iter() {
|
eprintln!("MQTT | Failed to connect to MQTT broker '{}': {}", broker, e);
|
||||||
if let Some(mqttmsg) = mqttmsg {
|
None
|
||||||
let topic = mqttmsg.topic();
|
}
|
||||||
let payload_str = mqttmsg.payload_str();
|
}
|
||||||
|
}
|
||||||
//println!("MQTT | {} - {}", topic, payload_str);
|
Err(e) => {
|
||||||
let mut lock = mqtt_message_for_callback.lock().unwrap();
|
eprintln!("MQTT | Failed to create MQTT client for '{}': {}", broker, e);
|
||||||
lock.string = Some(payload_str.to_string());
|
None
|
||||||
println!("MQTT | Received: -> {}", mqttmsg.payload_str());
|
}
|
||||||
} else {
|
|
||||||
println!("MQTT | Unsubscribe: connection closed");
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
};
|
||||||
|
|
||||||
// Define the set of options for the connection
|
// Define the set of options for the connection
|
||||||
|
|
||||||
// move local instance to global scope
|
// move local instance to global scope
|
||||||
gmc.set_mqtt_client(Some(local_mqtt));
|
gmc.set_mqtt_client(mqtt_client);
|
||||||
}
|
}
|
||||||
|
|
||||||
let receiver = openweathermap::init_forecast("Mannheim",
|
let receiver = openweathermap::init_forecast("Mannheim",
|
||||||
@@ -612,6 +566,19 @@ fn main_function(parameter1: String, parameter2: Option<String>) -> ExitCode {
|
|||||||
send_package(GlobalConfiguration.lock().unwrap().panelIPAddress.clone(), &last_data, &straba_res, Some("MQTT: room/ledboard".to_string()));
|
send_package(GlobalConfiguration.lock().unwrap().panelIPAddress.clone(), &last_data, &straba_res, Some("MQTT: room/ledboard".to_string()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if GlobalConfiguration.lock().is_ok() && GlobalConfiguration.lock().unwrap().mqttPrefix.len() > 0
|
||||||
|
{
|
||||||
|
if let Some(ref client) = &(MQTTCLIENT.lock().unwrap().get_mqtt_client()) {
|
||||||
|
publish_to_mqtt(client, &last_data, &straba_res);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
eprintln!("MQTT | client for publishing not found");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let st_now = SystemTime::now();
|
let st_now = SystemTime::now();
|
||||||
let seconds = st_now.duration_since(UNIX_EPOCH).unwrap().as_secs();
|
let seconds = st_now.duration_since(UNIX_EPOCH).unwrap().as_secs();
|
||||||
@@ -639,7 +606,9 @@ fn main_function(parameter1: String, parameter2: Option<String>) -> ExitCode {
|
|||||||
if GlobalConfiguration.lock().is_ok() && GlobalConfiguration.lock().unwrap().mqttPrefix.len() > 0
|
if GlobalConfiguration.lock().is_ok() && GlobalConfiguration.lock().unwrap().mqttPrefix.len() > 0
|
||||||
{
|
{
|
||||||
//FIXME if mqtt_client.is_some()
|
//FIXME if mqtt_client.is_some()
|
||||||
fun_publishinfoviamqtt(&straba_res);
|
if let Some(ref client) = &(MQTTCLIENT.lock().unwrap().get_mqtt_client()) {
|
||||||
|
publish_to_mqtt(client, &last_data, &straba_res);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// request once a minute new data
|
// request once a minute new data
|
||||||
@@ -661,19 +630,11 @@ fn main_function(parameter1: String, parameter2: Option<String>) -> ExitCode {
|
|||||||
// Render new image
|
// Render new image
|
||||||
send_package(GlobalConfiguration.lock().unwrap().mqttIPAddress.clone(), &last_data, &straba_res, mqtt_message);
|
send_package(GlobalConfiguration.lock().unwrap().mqttIPAddress.clone(), &last_data, &straba_res, mqtt_message);
|
||||||
}
|
}
|
||||||
// Handle MQTT messages
|
//FIXME Handle MQTT messages
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn fun_publishinfoviamqtt(straba_res: &NextDeparture) {
|
|
||||||
let topic_in_station: String = format!("{}{}", GlobalConfiguration.lock().unwrap().mqttPrefix, "/inbound/station");
|
|
||||||
let station_name: String = format!("{}", straba_res.inbound_station);
|
|
||||||
// Execute async publish synchronously
|
|
||||||
let _ = publish_message(topic_in_station.as_str(), station_name.as_str());
|
|
||||||
println!("MQTT | published {:?} = {:?}s", topic_in_station, straba_res.outbound_station);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn main_function2(ip: String) -> ExitCode
|
fn main_function2(ip: String) -> ExitCode
|
||||||
{
|
{
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user