From 5a011bc1502ea3ceb998f05b4a0608cfc1639d75 Mon Sep 17 00:00:00 2001 From: Ollo Date: Sat, 28 Feb 2026 21:52:25 +0100 Subject: [PATCH] Describe code --- src/main.rs | 45 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/src/main.rs b/src/main.rs index 03b2307..13ae91d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,13 @@ +//! MQTT Publisher for RNV Live Station Data +//! +//! This application fetches departure data from the RNV API and publishes it to an MQTT broker. +//! It uses a two-step process: +//! 1. Look up the station ID by name using [`straba::lookup_station_id`] +//! 2. Fetch departure data using [`straba::fetch_data`] with the obtained station ID +//! +//! The application continuously polls for departure data at the specified interval +//! and publishes the results to various MQTT topics. + use clap::Parser; use rumqttc::{AsyncClient, Event, MqttOptions, Packet, QoS}; use std::time::{Duration, Instant}; @@ -5,6 +15,8 @@ use tokio::time::sleep; mod straba; +/// Publishes departure data to various MQTT topics. +/// Topics include: station name, outbound/inbound station, line, and time diff. async fn publish_departure_data(client: &AsyncClient, data: &straba::NextDeparture) -> Result<(), Box> { client .publish("departure/station", QoS::AtLeastOnce, false, data.station_name.as_bytes()) @@ -32,14 +44,21 @@ async fn publish_departure_data(client: &AsyncClient, data: &straba::NextDepartu #[derive(Parser, Debug)] #[command(version, about, long_about = None)] +/// Command-line arguments for the MQTT publisher struct Args { + /// MQTT broker address #[arg(long, default_value = "localhost")] broker: String, + /// MQTT broker port #[arg(long, default_value_t = 1883)] port: u16, + + /// Interval in seconds between departure data fetches #[arg(long, default_value_t = 300)] interval: u32, + + /// Station name to look up (used to obtain station ID via RNV API) #[arg(long, default_value = "Lettestr")] station: String, @@ -47,8 +66,10 @@ struct Args { #[tokio::main] async fn main() -> Result<(), Box> { + // Parse command-line arguments let args = Args::parse(); + // Configure MQTT connection let mut mqttoptions = MqttOptions::new( "mqtt-publisher", args.broker, @@ -58,31 +79,45 @@ async fn main() -> Result<(), Box> { let (client, mut eventloop) = AsyncClient::new(mqttoptions, 100); - //FIXME Set last_request initially to 1 hour before now + // Initialize timing: set last_request to 1 hour ago so first fetch happens immediately let mut last_request = Instant::now() - Duration::from_secs(3600); let interval = Duration::from_secs(args.interval as u64); + + // Step 1: Look up station ID from station name (blocking call in async context) let station_id = tokio::task::spawn_blocking(move || straba::lookup_station_id(&args.station)) .await .unwrap() .expect("Failed to lookup station ID"); + + // Initial fetch of departure data let mut departure_data = tokio::task::spawn_blocking(move || straba::fetch_data(station_id, Some(true))) - .await - .unwrap(); + .await + .unwrap(); + + // Track whether data has been published to avoid duplicate publishes let mut data_published = false; + // Main loop: fetch and publish departure data at regular intervals loop { let now = Instant::now(); + + // Check if enough time has passed since last fetch if now.duration_since(last_request) > interval { data_published = false; last_request = Instant::now(); + + // Step 2: Fetch fresh departure data from RNV API departure_data = tokio::task::spawn_blocking(move || straba::fetch_data(station_id, Some(false))) .await .unwrap(); + + // Handle fetch failure if departure_data.failure { client .publish("departure/failure", QoS::AtLeastOnce, false, b"Failed to fetch departure data") .await?; } else { + // Print departure info to console let outbound_msg = format!( "Outbound: {} in {} seconds", departure_data.outbound_station, departure_data.outbound_diff @@ -95,10 +130,13 @@ async fn main() -> Result<(), Box> { println!("{}", inbound_msg); } } + + // Process MQTT events and publish data when connection is ready match eventloop.poll().await { Ok(Event::Incoming(Packet::Publish(_))) => {} Ok(Event::Incoming(Packet::ConnAck(_))) => { println!("Connected to MQTT broker"); + // Publish data on successful connection if not already published if data_published == false { publish_departure_data(&client, &departure_data).await?; @@ -106,6 +144,7 @@ async fn main() -> Result<(), Box> { } } Ok(_) => { + // Also publish on any other successful event to ensure data gets sent if data_published == false { publish_departure_data(&client, &departure_data).await?;