Mqtt subscribe works
This commit is contained in:
		| @@ -1,5 +1,6 @@ | |||||||
| use std::{time::Duration, fmt::format}; | use std::{time::Duration, fmt::format}; | ||||||
| use paho_mqtt::{Message, client}; | use std::sync::RwLock; | ||||||
|  | use paho_mqtt; | ||||||
| use str; | use str; | ||||||
| use bit::BitIndex; | use bit::BitIndex; | ||||||
| use chrono_tz::Europe::Berlin; | use chrono_tz::Europe::Berlin; | ||||||
| @@ -280,6 +281,35 @@ fn render_strab(display: &mut UdpDisplay, straba_res: &NextDeparture) { | |||||||
|     render_strab_partial(display, &straba_res.inbound_station, straba_res.inbound_diff, 27); |     render_strab_partial(display, &straba_res.inbound_station, straba_res.inbound_diff, 27); | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // The type we'll use to keep our dynamic list of topics inside the | ||||||
|  | // MQTT client. Since we want to update it after creating the client, | ||||||
|  | // we need to wrap the data in a lock, like a Mutex or RwLock. | ||||||
|  | type UserTopics = RwLock<Vec<String>>; | ||||||
|  |  | ||||||
|  | ///////////////////////////////////////////////////////////////////////////// | ||||||
|  |  | ||||||
|  | // Callback for a successful connection to the broker. | ||||||
|  | // We subscribe to the topic(s) we want here. | ||||||
|  | fn mqtt_on_connect_success(cli: &paho_mqtt::AsyncClient, _msgid: u16) { | ||||||
|  |     println!("MQTT | Connection succeeded"); | ||||||
|  |      | ||||||
|  |     // Subscribe to the desired topic(s). | ||||||
|  |     cli.subscribe("room/ledboard", paho_mqtt::QOS_0); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Callback for a failed attempt to connect to the server. | ||||||
|  | // We simply sleep and then try again. | ||||||
|  | // | ||||||
|  | // Note that normally we don't want to do a blocking operation or sleep | ||||||
|  | // from  within a callback. But in this case, we know that the client is | ||||||
|  | // *not* conected, and thus not doing anything important. So we don't worry | ||||||
|  | // too much about stopping its callback thread. | ||||||
|  | fn mqtt_on_connect_failure(cli: &paho_mqtt::AsyncClient, _msgid: u16, rc: i32) { | ||||||
|  |     println!("MQTT | Connection attempt failed with error code {}.\n", rc); | ||||||
|  |     thread::sleep(Duration::from_millis(2500)); | ||||||
|  |     cli.reconnect_with_callbacks(mqtt_on_connect_success, mqtt_on_connect_failure); | ||||||
|  | } | ||||||
|  |  | ||||||
| fn send_package(ipaddress: String,  | fn send_package(ipaddress: String,  | ||||||
|                 data: &Option<Result<Forecast, String>>, |                 data: &Option<Result<Forecast, String>>, | ||||||
|                 straba_res: &NextDeparture) { |                 straba_res: &NextDeparture) { | ||||||
| @@ -356,7 +386,7 @@ fn main_function(ipaddress: String, mqtt: Option<String>) -> ExitCode { | |||||||
|         println!("{:} not online", &ipaddress); |         println!("{:} not online", &ipaddress); | ||||||
|         return ExitCode::FAILURE; |         return ExitCode::FAILURE; | ||||||
|     } |     } | ||||||
|     let mut mqtt_client: Option<paho_mqtt::Client> = None; |     let mut mqtt_client: Option<paho_mqtt::AsyncClient> = None; | ||||||
|     if mqtt.is_some() { |     if mqtt.is_some() { | ||||||
|         let mqtt_ip: String = mqtt.clone().unwrap(); |         let mqtt_ip: String = mqtt.clone().unwrap(); | ||||||
|         // Define the set of options for the create. |         // Define the set of options for the create. | ||||||
| @@ -366,9 +396,9 @@ fn main_function(ipaddress: String, mqtt: Option<String>) -> ExitCode { | |||||||
|         .client_id("ledboard") |         .client_id("ledboard") | ||||||
|         .finalize(); |         .finalize(); | ||||||
|         // Create a client. |         // Create a client. | ||||||
|         let local_mqtt = paho_mqtt::Client::new(create_opts).unwrap(); |         let local_mqtt = paho_mqtt::AsyncClient::new(create_opts).unwrap(); | ||||||
|          |          | ||||||
|         println!("Connecting to {:} MQTT server...", mqtt_ip); |         println!("MQTT | Connecting to {:} MQTT server...", mqtt_ip); | ||||||
|          |          | ||||||
|         // Define the set of options for the connection. |         // Define the set of options for the connection. | ||||||
|         let conn_opts = paho_mqtt::ConnectOptionsBuilder::new() |         let conn_opts = paho_mqtt::ConnectOptionsBuilder::new() | ||||||
| @@ -376,15 +406,44 @@ fn main_function(ipaddress: String, mqtt: Option<String>) -> ExitCode { | |||||||
|             .clean_session(true) |             .clean_session(true) | ||||||
|             .finalize(); |             .finalize(); | ||||||
|  |  | ||||||
|         // Connect and wait for it to complete or fail. |         // Set a closure to be called whenever the client connection is established. | ||||||
|         if let Err(e) = local_mqtt.connect(conn_opts) { |         local_mqtt.set_connected_callback(|_cli: &paho_mqtt::AsyncClient| { | ||||||
|             println!("Unable to connect:\n\t{:?}", e); |             println!("Connected."); | ||||||
|             return ExitCode::FAILURE; |         }); | ||||||
|         } |  | ||||||
|         if let Err(e) = local_mqtt.subscribe("room/ledboard", 0) { |         // Set a closure to be called whenever the client loses the connection. | ||||||
|             println!("Error subscribes topics: {:?}", e); |         // It will attempt to reconnect, and set up function callbacks to keep | ||||||
|             return ExitCode::FAILURE; |         // retrying until the connection is re-established. | ||||||
|  |         local_mqtt.set_connection_lost_callback(|cli: &paho_mqtt::AsyncClient| { | ||||||
|  |             println!("Connection lost. Attempting reconnect."); | ||||||
|  |             thread::sleep(Duration::from_millis(2500)); | ||||||
|  |             cli.reconnect_with_callbacks(mqtt_on_connect_success, mqtt_on_connect_failure); | ||||||
|  |         }); | ||||||
|  |  | ||||||
|  |         // Attach a closure to the client to receive callback | ||||||
|  |         // on incoming messages. | ||||||
|  |         local_mqtt.set_message_callback(|cli, msg| { | ||||||
|  |             if let Some(msg) = msg { | ||||||
|  |                 let topic = msg.topic(); | ||||||
|  |                 let payload_str = msg.payload_str(); | ||||||
|  |  | ||||||
|  |                 println!("MQTT | {} - {}", topic, payload_str); | ||||||
|             } |             } | ||||||
|  |         }); | ||||||
|  |  | ||||||
|  |         // Define the set of options for the connection | ||||||
|  |         let lwt = paho_mqtt::Message::new("room/ledboard/lwt", "lost connection", 1); | ||||||
|  |  | ||||||
|  |         // The connect options. Defaults to an MQTT v3.x connection. | ||||||
|  |         let conn_opts = paho_mqtt::ConnectOptionsBuilder::new() | ||||||
|  |             .keep_alive_interval(Duration::from_secs(20)) | ||||||
|  |             .will_message(lwt) | ||||||
|  |             .finalize(); | ||||||
|  |  | ||||||
|  |         // Make the connection to the broker | ||||||
|  |         println!("MQTT | Connecting to the MQTT server..."); | ||||||
|  |         local_mqtt.connect_with_callbacks(conn_opts, mqtt_on_connect_success, mqtt_on_connect_failure); | ||||||
|  |  | ||||||
|         // move local instance to global scope |         // move local instance to global scope | ||||||
|         mqtt_client = Some(local_mqtt); |         mqtt_client = Some(local_mqtt); | ||||||
|     } |     } | ||||||
| @@ -405,8 +464,6 @@ fn main_function(ipaddress: String, mqtt: Option<String>) -> ExitCode { | |||||||
|     // Render start |     // Render start | ||||||
|     send_package(ipaddress.clone(), &last_data, &straba_res); |     send_package(ipaddress.clone(), &last_data, &straba_res); | ||||||
|  |  | ||||||
|     // Initialize the consumer before connecting. |  | ||||||
|     let mqtt_rx = mqtt_client.and_then(|client|{ return Some(client.start_consuming());});  |  | ||||||
|     loop { |     loop { | ||||||
|         let st_now = SystemTime::now(); |         let st_now = SystemTime::now(); | ||||||
|         let seconds = st_now.duration_since(UNIX_EPOCH).unwrap().as_secs(); |         let seconds = st_now.duration_since(UNIX_EPOCH).unwrap().as_secs(); | ||||||
| @@ -441,19 +498,7 @@ fn main_function(ipaddress: String, mqtt: Option<String>) -> ExitCode { | |||||||
|         } |         } | ||||||
|  |  | ||||||
|         // Handle MQTT messages |         // Handle MQTT messages | ||||||
|         // FIXME https://www.emqx.com/en/blog/how-to-use-mqtt-in-rust |          | ||||||
|         //if mqtt_rx.is  |  | ||||||
|         //    let cli = mqtt_client.unwrap(); |  | ||||||
|         //} |  | ||||||
|             if mqtt_rx.is_some() { |  | ||||||
|                 println!("MQTT Client present"); |  | ||||||
|                 //for msg in mqtt_rx.as_ref().unwrap().try_iter() { |  | ||||||
|                 for msg in mqtt_rx.unwrap().iter() { |  | ||||||
|                     if let Some(msg) = msg { |  | ||||||
|                         println!("MQTT {}", msg); |  | ||||||
|                     } |  | ||||||
|                 } |  | ||||||
|             } |  | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user