Files
MqttRnvPublisher/src/main.rs

89 lines
2.9 KiB
Rust

use clap::Parser;
use rumqttc::{AsyncClient, Event, MqttOptions, Packet, QoS};
use std::time::Duration;
use tokio::time::sleep;
mod straba;
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Args {
#[arg(default_value = "localhost")]
broker: String,
#[arg(default_value_t = 1883)]
port: u16,
#[arg(default_value_t = 30000)]
interval: u32,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
let mut mqttoptions = MqttOptions::new(
"mqtt-publisher",
args.broker,
args.port,
);
mqttoptions.set_keep_alive(Duration::from_secs(30));
let (client, mut eventloop) = AsyncClient::new(mqttoptions, args.interval as usize);
loop {
let departure_data = tokio::task::spawn_blocking(|| straba::fetch_data(Some(true)))
.await
.unwrap();
if departure_data.failure {
client
.publish("departure/failure", QoS::AtLeastOnce, false, b"Failed to fetch departure data")
.await?;
} else {
let outbound_msg = format!(
"Outbound: {} in {} seconds",
departure_data.outbound_station, departure_data.outbound_diff
);
let inbound_msg = format!(
"Inbound: {} in {} seconds",
departure_data.inbound_station, departure_data.inbound_diff
);
println!("{}", outbound_msg);
println!("{}", inbound_msg);
}
match eventloop.poll().await {
Ok(Event::Incoming(Packet::Publish(_))) => {}
Ok(Event::Incoming(Packet::ConnAck(_))) => {
println!("Connected to MQTT broker");
}
Ok(_) => {
client
.publish("departure/station", QoS::AtLeastOnce, false, departure_data.station_name.as_bytes())
.await?;
client
.publish("departure/outbound/station", QoS::AtLeastOnce, false, departure_data.outbound_station.as_bytes())
.await?;
client
.publish("departure/outbound/diff", QoS::AtLeastOnce, false, format!("{}", departure_data.outbound_diff))
.await?;
client
.publish("departure/inbound/station", QoS::AtLeastOnce, false, departure_data.inbound_station.as_bytes())
.await?;
client
.publish("departure/inbound/diff", QoS::AtLeastOnce, false, format!("{}", departure_data.inbound_diff))
.await?;
}
Err(e) => {
println!("Error: {:?}", e);
sleep(Duration::from_secs(1)).await;
}
}
}
}