From 98b72736ee2616540269c36e37b5b6a4cc07b164 Mon Sep 17 00:00:00 2001 From: Ollo Date: Sat, 28 Feb 2026 20:50:01 +0100 Subject: [PATCH] Request update in a configurable intervall --- src/main.rs | 60 ++++++++++++++++++++++++++++++++--------------------- 1 file changed, 36 insertions(+), 24 deletions(-) diff --git a/src/main.rs b/src/main.rs index e18b2ab..433e251 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ use clap::Parser; use rumqttc::{AsyncClient, Event, MqttOptions, Packet, QoS}; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio::time::sleep; mod straba; @@ -29,15 +29,24 @@ async fn main() -> Result<(), Box> { ); mqttoptions.set_keep_alive(Duration::from_secs(30)); - let (client, mut eventloop) = AsyncClient::new(mqttoptions, args.interval as usize); + let (client, mut eventloop) = AsyncClient::new(mqttoptions, 100); + let mut last_request = Instant::now(); + let interval = Duration::from_millis(args.interval as u64); + let mut departure_data = tokio::task::spawn_blocking(|| straba::fetch_data(Some(true))) + .await + .unwrap(); + let mut data_published = false; + loop { - - let departure_data = tokio::task::spawn_blocking(|| straba::fetch_data(Some(true))) - .await - .unwrap(); - + let now = Instant::now(); + if now.duration_since(last_request) > interval { + data_published = false; + last_request = Instant::now(); + let mut departure_data = tokio::task::spawn_blocking(|| straba::fetch_data(Some(false))) + .await + .unwrap(); if departure_data.failure { client .publish("departure/failure", QoS::AtLeastOnce, false, b"Failed to fetch departure data") @@ -54,30 +63,33 @@ async fn main() -> Result<(), Box> { println!("{}", outbound_msg); println!("{}", inbound_msg); } + } match eventloop.poll().await { Ok(Event::Incoming(Packet::Publish(_))) => {} Ok(Event::Incoming(Packet::ConnAck(_))) => { println!("Connected to MQTT broker"); } Ok(_) => { + if data_published == false + { + client + .publish("departure/station", QoS::AtLeastOnce, false, departure_data.station_name.as_bytes()) + .await?; + client + .publish("departure/outbound/station", QoS::AtLeastOnce, false, departure_data.outbound_station.as_bytes()) + .await?; - client - .publish("departure/station", QoS::AtLeastOnce, false, departure_data.station_name.as_bytes()) - .await?; - client - .publish("departure/outbound/station", QoS::AtLeastOnce, false, departure_data.outbound_station.as_bytes()) - .await?; - - client - .publish("departure/outbound/diff", QoS::AtLeastOnce, false, format!("{}", departure_data.outbound_diff)) - .await?; - client - .publish("departure/inbound/station", QoS::AtLeastOnce, false, departure_data.inbound_station.as_bytes()) - .await?; - client - .publish("departure/inbound/diff", QoS::AtLeastOnce, false, format!("{}", departure_data.inbound_diff)) - .await?; - + client + .publish("departure/outbound/diff", QoS::AtLeastOnce, false, format!("{}", departure_data.outbound_diff)) + .await?; + client + .publish("departure/inbound/station", QoS::AtLeastOnce, false, departure_data.inbound_station.as_bytes()) + .await?; + client + .publish("departure/inbound/diff", QoS::AtLeastOnce, false, format!("{}", departure_data.inbound_diff)) + .await?; + data_published = true; + } } Err(e) => { println!("Error: {:?}", e);