Request update in a configurable intervall
This commit is contained in:
60
src/main.rs
60
src/main.rs
@@ -1,6 +1,6 @@
|
|||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use rumqttc::{AsyncClient, Event, MqttOptions, Packet, QoS};
|
use rumqttc::{AsyncClient, Event, MqttOptions, Packet, QoS};
|
||||||
use std::time::Duration;
|
use std::time::{Duration, Instant};
|
||||||
use tokio::time::sleep;
|
use tokio::time::sleep;
|
||||||
|
|
||||||
mod straba;
|
mod straba;
|
||||||
@@ -29,15 +29,24 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
);
|
);
|
||||||
mqttoptions.set_keep_alive(Duration::from_secs(30));
|
mqttoptions.set_keep_alive(Duration::from_secs(30));
|
||||||
|
|
||||||
let (client, mut eventloop) = AsyncClient::new(mqttoptions, args.interval as usize);
|
let (client, mut eventloop) = AsyncClient::new(mqttoptions, 100);
|
||||||
|
|
||||||
|
|
||||||
|
let mut last_request = Instant::now();
|
||||||
|
let interval = Duration::from_millis(args.interval as u64);
|
||||||
|
let mut departure_data = tokio::task::spawn_blocking(|| straba::fetch_data(Some(true)))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let mut data_published = false;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
let now = Instant::now();
|
||||||
let departure_data = tokio::task::spawn_blocking(|| straba::fetch_data(Some(true)))
|
if now.duration_since(last_request) > interval {
|
||||||
.await
|
data_published = false;
|
||||||
.unwrap();
|
last_request = Instant::now();
|
||||||
|
let mut departure_data = tokio::task::spawn_blocking(|| straba::fetch_data(Some(false)))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
if departure_data.failure {
|
if departure_data.failure {
|
||||||
client
|
client
|
||||||
.publish("departure/failure", QoS::AtLeastOnce, false, b"Failed to fetch departure data")
|
.publish("departure/failure", QoS::AtLeastOnce, false, b"Failed to fetch departure data")
|
||||||
@@ -54,30 +63,33 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
println!("{}", outbound_msg);
|
println!("{}", outbound_msg);
|
||||||
println!("{}", inbound_msg);
|
println!("{}", inbound_msg);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
match eventloop.poll().await {
|
match eventloop.poll().await {
|
||||||
Ok(Event::Incoming(Packet::Publish(_))) => {}
|
Ok(Event::Incoming(Packet::Publish(_))) => {}
|
||||||
Ok(Event::Incoming(Packet::ConnAck(_))) => {
|
Ok(Event::Incoming(Packet::ConnAck(_))) => {
|
||||||
println!("Connected to MQTT broker");
|
println!("Connected to MQTT broker");
|
||||||
}
|
}
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
|
if data_published == false
|
||||||
|
{
|
||||||
|
client
|
||||||
|
.publish("departure/station", QoS::AtLeastOnce, false, departure_data.station_name.as_bytes())
|
||||||
|
.await?;
|
||||||
|
client
|
||||||
|
.publish("departure/outbound/station", QoS::AtLeastOnce, false, departure_data.outbound_station.as_bytes())
|
||||||
|
.await?;
|
||||||
|
|
||||||
client
|
client
|
||||||
.publish("departure/station", QoS::AtLeastOnce, false, departure_data.station_name.as_bytes())
|
.publish("departure/outbound/diff", QoS::AtLeastOnce, false, format!("{}", departure_data.outbound_diff))
|
||||||
.await?;
|
.await?;
|
||||||
client
|
client
|
||||||
.publish("departure/outbound/station", QoS::AtLeastOnce, false, departure_data.outbound_station.as_bytes())
|
.publish("departure/inbound/station", QoS::AtLeastOnce, false, departure_data.inbound_station.as_bytes())
|
||||||
.await?;
|
.await?;
|
||||||
|
client
|
||||||
client
|
.publish("departure/inbound/diff", QoS::AtLeastOnce, false, format!("{}", departure_data.inbound_diff))
|
||||||
.publish("departure/outbound/diff", QoS::AtLeastOnce, false, format!("{}", departure_data.outbound_diff))
|
.await?;
|
||||||
.await?;
|
data_published = true;
|
||||||
client
|
}
|
||||||
.publish("departure/inbound/station", QoS::AtLeastOnce, false, departure_data.inbound_station.as_bytes())
|
|
||||||
.await?;
|
|
||||||
client
|
|
||||||
.publish("departure/inbound/diff", QoS::AtLeastOnce, false, format!("{}", departure_data.inbound_diff))
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
println!("Error: {:?}", e);
|
println!("Error: {:?}", e);
|
||||||
|
|||||||
Reference in New Issue
Block a user