diff --git a/client/bin/src/main.rs b/client/bin/src/main.rs index bb2066a..bbc845f 100644 --- a/client/bin/src/main.rs +++ b/client/bin/src/main.rs @@ -1,5 +1,6 @@ use std::{time::Duration, fmt::format}; -use paho_mqtt::{Message, client}; +use std::sync::RwLock; +use paho_mqtt; use str; use bit::BitIndex; use chrono_tz::Europe::Berlin; @@ -280,6 +281,35 @@ fn render_strab(display: &mut UdpDisplay, straba_res: &NextDeparture) { render_strab_partial(display, &straba_res.inbound_station, straba_res.inbound_diff, 27); } +// The type we'll use to keep our dynamic list of topics inside the +// MQTT client. Since we want to update it after creating the client, +// we need to wrap the data in a lock, like a Mutex or RwLock. +type UserTopics = RwLock>; + +///////////////////////////////////////////////////////////////////////////// + +// Callback for a successful connection to the broker. +// We subscribe to the topic(s) we want here. +fn mqtt_on_connect_success(cli: &paho_mqtt::AsyncClient, _msgid: u16) { + println!("MQTT | Connection succeeded"); + + // Subscribe to the desired topic(s). + cli.subscribe("room/ledboard", paho_mqtt::QOS_0); +} + +// Callback for a failed attempt to connect to the server. +// We simply sleep and then try again. +// +// Note that normally we don't want to do a blocking operation or sleep +// from within a callback. But in this case, we know that the client is +// *not* conected, and thus not doing anything important. So we don't worry +// too much about stopping its callback thread. +fn mqtt_on_connect_failure(cli: &paho_mqtt::AsyncClient, _msgid: u16, rc: i32) { + println!("MQTT | Connection attempt failed with error code {}.\n", rc); + thread::sleep(Duration::from_millis(2500)); + cli.reconnect_with_callbacks(mqtt_on_connect_success, mqtt_on_connect_failure); +} + fn send_package(ipaddress: String, data: &Option>, straba_res: &NextDeparture) { @@ -356,7 +386,7 @@ fn main_function(ipaddress: String, mqtt: Option) -> ExitCode { println!("{:} not online", &ipaddress); return ExitCode::FAILURE; } - let mut mqtt_client: Option = None; + let mut mqtt_client: Option = None; if mqtt.is_some() { let mqtt_ip: String = mqtt.clone().unwrap(); // Define the set of options for the create. @@ -366,9 +396,9 @@ fn main_function(ipaddress: String, mqtt: Option) -> ExitCode { .client_id("ledboard") .finalize(); // Create a client. - let local_mqtt = paho_mqtt::Client::new(create_opts).unwrap(); + let local_mqtt = paho_mqtt::AsyncClient::new(create_opts).unwrap(); - println!("Connecting to {:} MQTT server...", mqtt_ip); + println!("MQTT | Connecting to {:} MQTT server...", mqtt_ip); // Define the set of options for the connection. let conn_opts = paho_mqtt::ConnectOptionsBuilder::new() @@ -376,15 +406,44 @@ fn main_function(ipaddress: String, mqtt: Option) -> ExitCode { .clean_session(true) .finalize(); - // Connect and wait for it to complete or fail. - if let Err(e) = local_mqtt.connect(conn_opts) { - println!("Unable to connect:\n\t{:?}", e); - return ExitCode::FAILURE; - } - if let Err(e) = local_mqtt.subscribe("room/ledboard", 0) { - println!("Error subscribes topics: {:?}", e); - return ExitCode::FAILURE; - } + // Set a closure to be called whenever the client connection is established. + local_mqtt.set_connected_callback(|_cli: &paho_mqtt::AsyncClient| { + println!("Connected."); + }); + + // Set a closure to be called whenever the client loses the connection. + // It will attempt to reconnect, and set up function callbacks to keep + // retrying until the connection is re-established. + local_mqtt.set_connection_lost_callback(|cli: &paho_mqtt::AsyncClient| { + println!("Connection lost. Attempting reconnect."); + thread::sleep(Duration::from_millis(2500)); + cli.reconnect_with_callbacks(mqtt_on_connect_success, mqtt_on_connect_failure); + }); + + // Attach a closure to the client to receive callback + // on incoming messages. + local_mqtt.set_message_callback(|cli, msg| { + if let Some(msg) = msg { + let topic = msg.topic(); + let payload_str = msg.payload_str(); + + println!("MQTT | {} - {}", topic, payload_str); + } + }); + + // Define the set of options for the connection + let lwt = paho_mqtt::Message::new("room/ledboard/lwt", "lost connection", 1); + + // The connect options. Defaults to an MQTT v3.x connection. + let conn_opts = paho_mqtt::ConnectOptionsBuilder::new() + .keep_alive_interval(Duration::from_secs(20)) + .will_message(lwt) + .finalize(); + + // Make the connection to the broker + println!("MQTT | Connecting to the MQTT server..."); + local_mqtt.connect_with_callbacks(conn_opts, mqtt_on_connect_success, mqtt_on_connect_failure); + // move local instance to global scope mqtt_client = Some(local_mqtt); } @@ -405,8 +464,6 @@ fn main_function(ipaddress: String, mqtt: Option) -> ExitCode { // Render start send_package(ipaddress.clone(), &last_data, &straba_res); - // Initialize the consumer before connecting. - let mqtt_rx = mqtt_client.and_then(|client|{ return Some(client.start_consuming());}); loop { let st_now = SystemTime::now(); let seconds = st_now.duration_since(UNIX_EPOCH).unwrap().as_secs(); @@ -441,19 +498,7 @@ fn main_function(ipaddress: String, mqtt: Option) -> ExitCode { } // Handle MQTT messages - // FIXME https://www.emqx.com/en/blog/how-to-use-mqtt-in-rust - //if mqtt_rx.is - // let cli = mqtt_client.unwrap(); - //} - if mqtt_rx.is_some() { - println!("MQTT Client present"); - //for msg in mqtt_rx.as_ref().unwrap().try_iter() { - for msg in mqtt_rx.unwrap().iter() { - if let Some(msg) = msg { - println!("MQTT {}", msg); - } - } - } + } }