Describe code
This commit is contained in:
41
src/main.rs
41
src/main.rs
@@ -1,3 +1,13 @@
|
|||||||
|
//! MQTT Publisher for RNV Live Station Data
|
||||||
|
//!
|
||||||
|
//! This application fetches departure data from the RNV API and publishes it to an MQTT broker.
|
||||||
|
//! It uses a two-step process:
|
||||||
|
//! 1. Look up the station ID by name using [`straba::lookup_station_id`]
|
||||||
|
//! 2. Fetch departure data using [`straba::fetch_data`] with the obtained station ID
|
||||||
|
//!
|
||||||
|
//! The application continuously polls for departure data at the specified interval
|
||||||
|
//! and publishes the results to various MQTT topics.
|
||||||
|
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use rumqttc::{AsyncClient, Event, MqttOptions, Packet, QoS};
|
use rumqttc::{AsyncClient, Event, MqttOptions, Packet, QoS};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
@@ -5,6 +15,8 @@ use tokio::time::sleep;
|
|||||||
|
|
||||||
mod straba;
|
mod straba;
|
||||||
|
|
||||||
|
/// Publishes departure data to various MQTT topics.
|
||||||
|
/// Topics include: station name, outbound/inbound station, line, and time diff.
|
||||||
async fn publish_departure_data(client: &AsyncClient, data: &straba::NextDeparture) -> Result<(), Box<dyn std::error::Error>> {
|
async fn publish_departure_data(client: &AsyncClient, data: &straba::NextDeparture) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
client
|
client
|
||||||
.publish("departure/station", QoS::AtLeastOnce, false, data.station_name.as_bytes())
|
.publish("departure/station", QoS::AtLeastOnce, false, data.station_name.as_bytes())
|
||||||
@@ -32,14 +44,21 @@ async fn publish_departure_data(client: &AsyncClient, data: &straba::NextDepartu
|
|||||||
|
|
||||||
#[derive(Parser, Debug)]
|
#[derive(Parser, Debug)]
|
||||||
#[command(version, about, long_about = None)]
|
#[command(version, about, long_about = None)]
|
||||||
|
/// Command-line arguments for the MQTT publisher
|
||||||
struct Args {
|
struct Args {
|
||||||
|
/// MQTT broker address
|
||||||
#[arg(long, default_value = "localhost")]
|
#[arg(long, default_value = "localhost")]
|
||||||
broker: String,
|
broker: String,
|
||||||
|
|
||||||
|
/// MQTT broker port
|
||||||
#[arg(long, default_value_t = 1883)]
|
#[arg(long, default_value_t = 1883)]
|
||||||
port: u16,
|
port: u16,
|
||||||
|
|
||||||
|
/// Interval in seconds between departure data fetches
|
||||||
#[arg(long, default_value_t = 300)]
|
#[arg(long, default_value_t = 300)]
|
||||||
interval: u32,
|
interval: u32,
|
||||||
|
|
||||||
|
/// Station name to look up (used to obtain station ID via RNV API)
|
||||||
#[arg(long, default_value = "Lettestr")]
|
#[arg(long, default_value = "Lettestr")]
|
||||||
station: String,
|
station: String,
|
||||||
|
|
||||||
@@ -47,8 +66,10 @@ struct Args {
|
|||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
// Parse command-line arguments
|
||||||
let args = Args::parse();
|
let args = Args::parse();
|
||||||
|
|
||||||
|
// Configure MQTT connection
|
||||||
let mut mqttoptions = MqttOptions::new(
|
let mut mqttoptions = MqttOptions::new(
|
||||||
"mqtt-publisher",
|
"mqtt-publisher",
|
||||||
args.broker,
|
args.broker,
|
||||||
@@ -58,31 +79,45 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
|
|
||||||
let (client, mut eventloop) = AsyncClient::new(mqttoptions, 100);
|
let (client, mut eventloop) = AsyncClient::new(mqttoptions, 100);
|
||||||
|
|
||||||
//FIXME Set last_request initially to 1 hour before now
|
// Initialize timing: set last_request to 1 hour ago so first fetch happens immediately
|
||||||
let mut last_request = Instant::now() - Duration::from_secs(3600);
|
let mut last_request = Instant::now() - Duration::from_secs(3600);
|
||||||
let interval = Duration::from_secs(args.interval as u64);
|
let interval = Duration::from_secs(args.interval as u64);
|
||||||
|
|
||||||
|
// Step 1: Look up station ID from station name (blocking call in async context)
|
||||||
let station_id = tokio::task::spawn_blocking(move || straba::lookup_station_id(&args.station))
|
let station_id = tokio::task::spawn_blocking(move || straba::lookup_station_id(&args.station))
|
||||||
.await
|
.await
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.expect("Failed to lookup station ID");
|
.expect("Failed to lookup station ID");
|
||||||
|
|
||||||
|
// Initial fetch of departure data
|
||||||
let mut departure_data = tokio::task::spawn_blocking(move || straba::fetch_data(station_id, Some(true)))
|
let mut departure_data = tokio::task::spawn_blocking(move || straba::fetch_data(station_id, Some(true)))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
// Track whether data has been published to avoid duplicate publishes
|
||||||
let mut data_published = false;
|
let mut data_published = false;
|
||||||
|
|
||||||
|
// Main loop: fetch and publish departure data at regular intervals
|
||||||
loop {
|
loop {
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
|
|
||||||
|
// Check if enough time has passed since last fetch
|
||||||
if now.duration_since(last_request) > interval {
|
if now.duration_since(last_request) > interval {
|
||||||
data_published = false;
|
data_published = false;
|
||||||
last_request = Instant::now();
|
last_request = Instant::now();
|
||||||
|
|
||||||
|
// Step 2: Fetch fresh departure data from RNV API
|
||||||
departure_data = tokio::task::spawn_blocking(move || straba::fetch_data(station_id, Some(false)))
|
departure_data = tokio::task::spawn_blocking(move || straba::fetch_data(station_id, Some(false)))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
// Handle fetch failure
|
||||||
if departure_data.failure {
|
if departure_data.failure {
|
||||||
client
|
client
|
||||||
.publish("departure/failure", QoS::AtLeastOnce, false, b"Failed to fetch departure data")
|
.publish("departure/failure", QoS::AtLeastOnce, false, b"Failed to fetch departure data")
|
||||||
.await?;
|
.await?;
|
||||||
} else {
|
} else {
|
||||||
|
// Print departure info to console
|
||||||
let outbound_msg = format!(
|
let outbound_msg = format!(
|
||||||
"Outbound: {} in {} seconds",
|
"Outbound: {} in {} seconds",
|
||||||
departure_data.outbound_station, departure_data.outbound_diff
|
departure_data.outbound_station, departure_data.outbound_diff
|
||||||
@@ -95,10 +130,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
println!("{}", inbound_msg);
|
println!("{}", inbound_msg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Process MQTT events and publish data when connection is ready
|
||||||
match eventloop.poll().await {
|
match eventloop.poll().await {
|
||||||
Ok(Event::Incoming(Packet::Publish(_))) => {}
|
Ok(Event::Incoming(Packet::Publish(_))) => {}
|
||||||
Ok(Event::Incoming(Packet::ConnAck(_))) => {
|
Ok(Event::Incoming(Packet::ConnAck(_))) => {
|
||||||
println!("Connected to MQTT broker");
|
println!("Connected to MQTT broker");
|
||||||
|
// Publish data on successful connection if not already published
|
||||||
if data_published == false
|
if data_published == false
|
||||||
{
|
{
|
||||||
publish_departure_data(&client, &departure_data).await?;
|
publish_departure_data(&client, &departure_data).await?;
|
||||||
@@ -106,6 +144,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
|
// Also publish on any other successful event to ensure data gets sent
|
||||||
if data_published == false
|
if data_published == false
|
||||||
{
|
{
|
||||||
publish_departure_data(&client, &departure_data).await?;
|
publish_departure_data(&client, &departure_data).await?;
|
||||||
|
|||||||
Reference in New Issue
Block a user