Integrate straba.rs in MQTT
This commit is contained in:
@@ -9,3 +9,5 @@ rumqttc = "0.24"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
clap = { version = "4", features = ["derive"] }
|
||||
chrono = "0.4"
|
||||
reqwest = { version = "0.11", features = ["blocking"] }
|
||||
|
||||
31
src/main.rs
31
src/main.rs
@@ -3,6 +3,8 @@ use rumqttc::{AsyncClient, Event, MqttOptions, Packet, QoS};
|
||||
use std::time::Duration;
|
||||
use tokio::time::sleep;
|
||||
|
||||
mod straba;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(version, about, long_about = None)]
|
||||
struct Args {
|
||||
@@ -32,6 +34,35 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
|
||||
println!("Published message to test/topic");
|
||||
|
||||
let departure_data = tokio::task::spawn_blocking(|| straba::fetch_data(Some(true)))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
if departure_data.failure {
|
||||
client
|
||||
.publish("departure/failure", QoS::AtLeastOnce, false, b"Failed to fetch departure data")
|
||||
.await?;
|
||||
} else {
|
||||
let outbound_msg = format!(
|
||||
"Outbound: {} in {} seconds",
|
||||
departure_data.outbound_station, departure_data.outbound_diff
|
||||
);
|
||||
let inbound_msg = format!(
|
||||
"Inbound: {} in {} seconds",
|
||||
departure_data.inbound_station, departure_data.inbound_diff
|
||||
);
|
||||
|
||||
client
|
||||
.publish("departure/outbound", QoS::AtLeastOnce, false, outbound_msg.as_bytes())
|
||||
.await?;
|
||||
client
|
||||
.publish("departure/inbound", QoS::AtLeastOnce, false, inbound_msg.as_bytes())
|
||||
.await?;
|
||||
|
||||
println!("{}", outbound_msg);
|
||||
println!("{}", inbound_msg);
|
||||
}
|
||||
|
||||
loop {
|
||||
match eventloop.poll().await {
|
||||
Ok(Event::Incoming(Packet::Publish(_))) => {}
|
||||
|
||||
159
src/straba.rs
Normal file
159
src/straba.rs
Normal file
@@ -0,0 +1,159 @@
|
||||
use chrono::DateTime;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
use serde::Deserialize;
|
||||
|
||||
const STATION_URL:&str = "https://www.rnv-online.de/rest/departure/2494";
|
||||
|
||||
/* ******************** JSON Description ****************************** */
|
||||
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Station {
|
||||
pub id: String,
|
||||
pub name: String,
|
||||
#[serde(alias = "graphQL")]
|
||||
pub graph_ql: GraphQL,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct GraphQL {
|
||||
pub response: GraphQLResponse,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct GraphQLResponse {
|
||||
pub name: String,
|
||||
pub journeys: JourneysElement,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct JourneysElement {
|
||||
pub elements: Vec<Journey>,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Line {
|
||||
pub line_group: LineGroup,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct LineGroup {
|
||||
pub label: String,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Journey {
|
||||
pub line: Line,
|
||||
pub canceled: bool,
|
||||
pub stops: Vec<StopsElement>,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct StopsElement {
|
||||
pub destination_label: String,
|
||||
pub planned_departure: IsoStringDateTime,
|
||||
pub realtime_departure: IsoStringDateTime,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Clone, PartialEq, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct IsoStringDateTime {
|
||||
pub iso_string: Option<String>,
|
||||
}
|
||||
|
||||
// Return value
|
||||
pub struct NextDeparture {
|
||||
pub request_time: i64,
|
||||
pub outbound_station: String,
|
||||
pub outbound_diff: i64,
|
||||
pub inbound_station: String,
|
||||
pub inbound_diff: i64,
|
||||
pub failure: bool,
|
||||
}
|
||||
|
||||
pub fn fetch_data(debug_print : Option<bool>) -> NextDeparture {
|
||||
|
||||
let st_now = SystemTime::now();
|
||||
let seconds = st_now.duration_since(UNIX_EPOCH).unwrap().as_secs();
|
||||
let url = &format!("{}?datetime={}", STATION_URL, seconds);
|
||||
let result = reqwest::blocking::get(url);
|
||||
|
||||
let mut return_value = NextDeparture {
|
||||
failure : false,
|
||||
outbound_station : String::from(""),
|
||||
outbound_diff : 10000,
|
||||
inbound_station : String::from(""),
|
||||
inbound_diff : 10000,
|
||||
request_time : seconds as i64,
|
||||
};
|
||||
|
||||
if result.is_err() {
|
||||
println!("Could not read station response {:?}", result.err());
|
||||
return_value.failure = true;
|
||||
return return_value;
|
||||
}
|
||||
let text = result.unwrap().text();
|
||||
if text.is_err() {
|
||||
println!("Could not convert response {:?}", text.err());
|
||||
return_value.failure = true;
|
||||
return return_value;
|
||||
}
|
||||
|
||||
let raw_text = &text.unwrap();
|
||||
let body: std::result::Result<Station, serde_json::Error> = serde_json::from_str(&raw_text);
|
||||
|
||||
if body.is_err() {
|
||||
println!("Could not parse json {:?}", body.err());
|
||||
println!("------------------------- %< ----------------------------");
|
||||
println!("{}", &raw_text);
|
||||
println!("------------------------- %< ----------------------------");
|
||||
return_value.failure = true;
|
||||
return return_value;
|
||||
}
|
||||
|
||||
// parse JSON result.. search of both directions
|
||||
let json = body.unwrap();
|
||||
for el in json.graph_ql.response.journeys.elements {
|
||||
if debug_print.is_some() && debug_print.unwrap() == true {
|
||||
println!("Line {:}", el.line.line_group.label);
|
||||
}
|
||||
for stop in el.stops {
|
||||
// use only valid data
|
||||
if stop.realtime_departure.iso_string.is_some() &&
|
||||
stop.destination_label != "" {
|
||||
let txt_departure = stop.realtime_departure.iso_string.unwrap();
|
||||
let next_departure = DateTime::parse_from_rfc3339(&txt_departure).unwrap();
|
||||
|
||||
let diff = next_departure.timestamp() - (seconds as i64);
|
||||
if debug_print.is_some() && debug_print.unwrap() == true {
|
||||
println!("To {:} {:} (in {:} seconds)", stop.destination_label, txt_departure, diff );
|
||||
}
|
||||
|
||||
if stop.destination_label.contains("Rheinau") {
|
||||
if diff < return_value.outbound_diff {
|
||||
return_value.outbound_station = stop.destination_label;
|
||||
return_value.outbound_diff = diff;
|
||||
}
|
||||
} else if stop.destination_label.contains("Hochschule") ||
|
||||
stop.destination_label.contains("Hauptbahnhof") ||
|
||||
stop.destination_label.contains("Schönau") {
|
||||
if diff < return_value.inbound_diff {
|
||||
return_value.inbound_station = stop.destination_label;
|
||||
return_value.inbound_diff = diff;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
println!("Planned {:} {:?}", stop.destination_label, stop.planned_departure.iso_string)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return_value
|
||||
}
|
||||
Reference in New Issue
Block a user