diff --git a/src/main.rs b/src/main.rs index 4e7db5b..e18b2ab 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,6 +13,9 @@ struct Args { #[arg(default_value_t = 1883)] port: u16, + #[arg(default_value_t = 30000)] + interval: u32, + } #[tokio::main] @@ -26,36 +29,31 @@ async fn main() -> Result<(), Box> { ); mqttoptions.set_keep_alive(Duration::from_secs(30)); - let (client, mut eventloop) = AsyncClient::new(mqttoptions, 100); + let (client, mut eventloop) = AsyncClient::new(mqttoptions, args.interval as usize); - client - .publish("test/topic", QoS::AtLeastOnce, false, b"Hello from Rust MQTT!") - .await?; - - println!("Published message to test/topic"); - - let departure_data = tokio::task::spawn_blocking(|| straba::fetch_data(Some(true))) - .await - .unwrap(); - - if departure_data.failure { - client - .publish("departure/failure", QoS::AtLeastOnce, false, b"Failed to fetch departure data") - .await?; - } else { - let outbound_msg = format!( - "Outbound: {} in {} seconds", - departure_data.outbound_station, departure_data.outbound_diff - ); - let inbound_msg = format!( - "Inbound: {} in {} seconds", - departure_data.inbound_station, departure_data.inbound_diff - ); - println!("{}", outbound_msg); - println!("{}", inbound_msg); - } loop { + + let departure_data = tokio::task::spawn_blocking(|| straba::fetch_data(Some(true))) + .await + .unwrap(); + + if departure_data.failure { + client + .publish("departure/failure", QoS::AtLeastOnce, false, b"Failed to fetch departure data") + .await?; + } else { + let outbound_msg = format!( + "Outbound: {} in {} seconds", + departure_data.outbound_station, departure_data.outbound_diff + ); + let inbound_msg = format!( + "Inbound: {} in {} seconds", + departure_data.inbound_station, departure_data.inbound_diff + ); + println!("{}", outbound_msg); + println!("{}", inbound_msg); + } match eventloop.poll().await { Ok(Event::Incoming(Packet::Publish(_))) => {} Ok(Event::Incoming(Packet::ConnAck(_))) => { @@ -63,19 +61,22 @@ async fn main() -> Result<(), Box> { } Ok(_) => { - client - .publish("departure/outbound/station", QoS::AtLeastOnce, false, departure_data.outbound_station.as_bytes()) - .await?; + client + .publish("departure/station", QoS::AtLeastOnce, false, departure_data.station_name.as_bytes()) + .await?; + client + .publish("departure/outbound/station", QoS::AtLeastOnce, false, departure_data.outbound_station.as_bytes()) + .await?; - client - .publish("departure/outbound/diff", QoS::AtLeastOnce, false, format!("{}", departure_data.outbound_diff)) - .await?; - client - .publish("departure/inbound/station", QoS::AtLeastOnce, false, departure_data.inbound_station.as_bytes()) - .await?; - client - .publish("departure/inbound/diff", QoS::AtLeastOnce, false, format!("{}", departure_data.inbound_diff)) - .await?; + client + .publish("departure/outbound/diff", QoS::AtLeastOnce, false, format!("{}", departure_data.outbound_diff)) + .await?; + client + .publish("departure/inbound/station", QoS::AtLeastOnce, false, departure_data.inbound_station.as_bytes()) + .await?; + client + .publish("departure/inbound/diff", QoS::AtLeastOnce, false, format!("{}", departure_data.inbound_diff)) + .await?; } Err(e) => { diff --git a/src/straba.rs b/src/straba.rs index 86f80ff..6309042 100644 --- a/src/straba.rs +++ b/src/straba.rs @@ -3,7 +3,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use serde::Deserialize; -const STATION_URL:&str = "https://www.rnv-online.de/rest/departure/2494"; +const STATION_URL: &str = "https://www.rnv-online.de/rest/departure/2494"; /* ******************** JSON Description ****************************** */ #[derive(Default, Debug, Clone, PartialEq, Deserialize)] @@ -57,9 +57,9 @@ pub struct Journey { #[derive(Default, Debug, Clone, PartialEq, Deserialize)] #[serde(rename_all = "camelCase")] pub struct StopsElement { - pub destination_label: String, - pub planned_departure: IsoStringDateTime, - pub realtime_departure: IsoStringDateTime, + pub destination_label: String, + pub planned_departure: IsoStringDateTime, + pub realtime_departure: IsoStringDateTime, } #[derive(Default, Debug, Clone, PartialEq, Deserialize)] @@ -71,6 +71,7 @@ pub struct IsoStringDateTime { // Return value pub struct NextDeparture { pub request_time: i64, + pub station_name: String, pub outbound_station: String, pub outbound_diff: i64, pub inbound_station: String, @@ -78,20 +79,20 @@ pub struct NextDeparture { pub failure: bool, } -pub fn fetch_data(debug_print : Option) -> NextDeparture { - +pub fn fetch_data(debug_print: Option) -> NextDeparture { let st_now = SystemTime::now(); let seconds = st_now.duration_since(UNIX_EPOCH).unwrap().as_secs(); let url = &format!("{}?datetime={}", STATION_URL, seconds); let result = reqwest::blocking::get(url); - + let mut return_value = NextDeparture { - failure : false, - outbound_station : String::from(""), - outbound_diff : 10000, - inbound_station : String::from(""), - inbound_diff : 10000, - request_time : seconds as i64, + failure: false, + outbound_station: String::from(""), + outbound_diff: 10000, + inbound_station: String::from(""), + inbound_diff: 10000, + request_time: seconds as i64, + station_name: String::from(""), }; if result.is_err() { @@ -110,7 +111,7 @@ pub fn fetch_data(debug_print : Option) -> NextDeparture { let body: std::result::Result = serde_json::from_str(&raw_text); if body.is_err() { - println!("Could not parse json {:?}", body.err()); + println!("Could not parse json {:?}", body.err()); println!("------------------------- %< ----------------------------"); println!("{}", &raw_text); println!("------------------------- %< ----------------------------"); @@ -120,37 +121,44 @@ pub fn fetch_data(debug_print : Option) -> NextDeparture { // parse JSON result.. search of both directions let json = body.unwrap(); + return_value.station_name = json.graph_ql.response.name.clone(); for el in json.graph_ql.response.journeys.elements { if debug_print.is_some() && debug_print.unwrap() == true { - println!("Line {:}", el.line.line_group.label); + println!("Line {:}", el.line.line_group.label); } for stop in el.stops { // use only valid data - if stop.realtime_departure.iso_string.is_some() && - stop.destination_label != "" { + if stop.realtime_departure.iso_string.is_some() && stop.destination_label != "" { let txt_departure = stop.realtime_departure.iso_string.unwrap(); let next_departure = DateTime::parse_from_rfc3339(&txt_departure).unwrap(); - - let diff = next_departure.timestamp() - (seconds as i64); + + let diff = next_departure.timestamp() - (seconds as i64); if debug_print.is_some() && debug_print.unwrap() == true { - println!("To {:} {:} (in {:} seconds)", stop.destination_label, txt_departure, diff ); + println!( + "To {:} {:} (in {:} seconds)", + stop.destination_label, txt_departure, diff + ); } - + if stop.destination_label.contains("Rheinau") { - if diff < return_value.outbound_diff { + if diff < return_value.outbound_diff { return_value.outbound_station = stop.destination_label; return_value.outbound_diff = diff; } - } else if stop.destination_label.contains("Hochschule") || - stop.destination_label.contains("Hauptbahnhof") || - stop.destination_label.contains("Schönau") { - if diff < return_value.inbound_diff { + } else if stop.destination_label.contains("Hochschule") + || stop.destination_label.contains("Hauptbahnhof") + || stop.destination_label.contains("Schönau") + { + if diff < return_value.inbound_diff { return_value.inbound_station = stop.destination_label; return_value.inbound_diff = diff; } } } else { - println!("Planned {:} {:?}", stop.destination_label, stop.planned_departure.iso_string) + println!( + "Planned {:} {:?}", + stop.destination_label, stop.planned_departure.iso_string + ) } } }