Initial responsible Pixelflut client
This commit is contained in:
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
@@ -0,0 +1,3 @@
|
||||
/target/
|
||||
*.log
|
||||
.DS_Store
|
||||
2822
Cargo.lock
generated
Normal file
2822
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
13
Cargo.toml
Normal file
13
Cargo.toml
Normal file
@@ -0,0 +1,13 @@
|
||||
[package]
|
||||
name = "responsible-pixelflut"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1"
|
||||
clap = { version = "4", features = ["derive"] }
|
||||
image = "0.25"
|
||||
reqwest = { version = "0.12", features = ["blocking", "rustls-tls"] }
|
||||
ctrlc = "3"
|
||||
socket2 = "0.5"
|
||||
libc = "0.2"
|
||||
268
src/main.rs
Normal file
268
src/main.rs
Normal file
@@ -0,0 +1,268 @@
|
||||
use anyhow::{Context, Result};
|
||||
use socket2::{Domain, Protocol, Socket, Type};
|
||||
use std::ffi::CString;
|
||||
use std::os::fd::AsRawFd;
|
||||
use std::net::ToSocketAddrs;
|
||||
use clap::Parser;
|
||||
use std::{
|
||||
io::Write,
|
||||
net::TcpStream,
|
||||
path::Path,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
},
|
||||
thread,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
const CCC_MANNHEIM_LOGO: &str = "Logo2.png";
|
||||
|
||||
#[derive(Parser, Debug, Clone)]
|
||||
#[command(author, version, about)]
|
||||
struct Args {
|
||||
#[arg(long, default_value = "pixelflut.at:24")]
|
||||
endpoint: String,
|
||||
|
||||
#[arg(long, default_value = CCC_MANNHEIM_LOGO)]
|
||||
image: String,
|
||||
|
||||
#[arg(long, default_value_t = 1280)]
|
||||
x: i32,
|
||||
|
||||
#[arg(long, default_value_t = 875)]
|
||||
y: i32,
|
||||
|
||||
#[arg(short = 'c', long, default_value_t = 8)]
|
||||
connections: usize,
|
||||
|
||||
/// Bind outgoing TCP connections to a Linux network interface, e.g. eno1 or eno2
|
||||
#[arg(long)]
|
||||
interface: Option<String>,
|
||||
|
||||
#[arg(long)]
|
||||
mbps: Option<f64>,
|
||||
|
||||
#[arg(long)]
|
||||
send_alpha: bool,
|
||||
|
||||
#[arg(long)]
|
||||
once: bool,
|
||||
}
|
||||
|
||||
fn load_image_bytes(src: &str) -> Result<Vec<u8>> {
|
||||
if src.starts_with("http://") || src.starts_with("https://") {
|
||||
let resp = reqwest::blocking::get(src)
|
||||
.with_context(|| format!("failed to download image: {src}"))?
|
||||
.error_for_status()
|
||||
.with_context(|| format!("image URL returned an error: {src}"))?;
|
||||
|
||||
Ok(resp.bytes()?.to_vec())
|
||||
} else {
|
||||
Ok(std::fs::read(Path::new(src))
|
||||
.with_context(|| format!("failed to read image file: {src}"))?)
|
||||
}
|
||||
}
|
||||
|
||||
fn build_commands(args: &Args) -> Result<Vec<Vec<u8>>> {
|
||||
let bytes = load_image_bytes(&args.image)?;
|
||||
let img = image::load_from_memory(&bytes)
|
||||
.context("failed to decode image")?
|
||||
.to_rgba8();
|
||||
|
||||
let (w, h) = img.dimensions();
|
||||
let mut cmds = Vec::with_capacity((w * h) as usize);
|
||||
|
||||
for yy in 0..h {
|
||||
for xx in 0..w {
|
||||
let p = img.get_pixel(xx, yy);
|
||||
let [r, g, b, a] = p.0;
|
||||
|
||||
if a == 0 && !args.send_alpha {
|
||||
continue;
|
||||
}
|
||||
|
||||
let x = args.x + xx as i32;
|
||||
let y = args.y + yy as i32;
|
||||
|
||||
if x < 0 || y < 0 {
|
||||
continue;
|
||||
}
|
||||
|
||||
let line = if args.send_alpha && a < 255 {
|
||||
format!("PX {x} {y} {r:02X}{g:02X}{b:02X}{a:02X}\n")
|
||||
} else {
|
||||
format!("PX {x} {y} {r:02X}{g:02X}{b:02X}\n")
|
||||
};
|
||||
|
||||
cmds.push(line.into_bytes());
|
||||
}
|
||||
}
|
||||
|
||||
anyhow::ensure!(!cmds.is_empty(), "no drawable pixels found");
|
||||
Ok(cmds)
|
||||
}
|
||||
|
||||
fn shard_commands(commands: Vec<Vec<u8>>, connections: usize) -> Vec<Vec<u8>> {
|
||||
let n = connections.max(1);
|
||||
let mut shards = vec![Vec::new(); n];
|
||||
|
||||
for (idx, cmd) in commands.into_iter().enumerate() {
|
||||
shards[idx % n].extend_from_slice(&cmd);
|
||||
}
|
||||
|
||||
shards.into_iter().filter(|s| !s.is_empty()).collect()
|
||||
}
|
||||
|
||||
fn connect_tcp(endpoint: &str, interface: Option<&str>) -> Result<TcpStream> {
|
||||
if let Some(iface) = interface {
|
||||
let socket = Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP))
|
||||
.context("failed to create TCP socket")?;
|
||||
|
||||
let iface_cstr = CString::new(iface)
|
||||
.context("interface name contains invalid null byte")?;
|
||||
|
||||
let rc = unsafe {
|
||||
libc::setsockopt(
|
||||
socket.as_raw_fd(),
|
||||
libc::SOL_SOCKET,
|
||||
libc::SO_BINDTODEVICE,
|
||||
iface_cstr.as_ptr() as *const libc::c_void,
|
||||
iface_cstr.as_bytes_with_nul().len() as libc::socklen_t,
|
||||
)
|
||||
};
|
||||
|
||||
if rc != 0 {
|
||||
return Err(std::io::Error::last_os_error())
|
||||
.with_context(|| format!("failed to bind socket to interface {iface}"));
|
||||
}
|
||||
|
||||
let addr = endpoint
|
||||
.to_socket_addrs()
|
||||
.with_context(|| format!("failed to resolve endpoint {endpoint}"))?
|
||||
.next()
|
||||
.with_context(|| format!("no socket address found for endpoint {endpoint}"))?;
|
||||
|
||||
socket
|
||||
.connect(&addr.into())
|
||||
.with_context(|| format!("failed to connect to {endpoint} via {iface}"))?;
|
||||
|
||||
Ok(socket.into())
|
||||
} else {
|
||||
TcpStream::connect(endpoint)
|
||||
.with_context(|| format!("failed to connect to {endpoint}"))
|
||||
}
|
||||
}
|
||||
|
||||
fn write_with_limit(stream: &mut TcpStream, buf: &[u8], bytes_per_sec: Option<f64>) -> Result<()> {
|
||||
match bytes_per_sec {
|
||||
None => {
|
||||
stream.write_all(buf)?;
|
||||
}
|
||||
Some(limit) if limit > 0.0 => {
|
||||
let mut sent = 0usize;
|
||||
let start = Instant::now();
|
||||
|
||||
while sent < buf.len() {
|
||||
let chunk = ((limit / 20.0).max(4096.0)) as usize;
|
||||
let end = (sent + chunk).min(buf.len());
|
||||
|
||||
stream.write_all(&buf[sent..end])?;
|
||||
sent = end;
|
||||
|
||||
let expected = Duration::from_secs_f64(sent as f64 / limit);
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
if expected > elapsed {
|
||||
thread::sleep(expected - elapsed);
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
stream.write_all(buf)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn worker(
|
||||
id: usize,
|
||||
endpoint: String,
|
||||
interface: Option<String>,
|
||||
shard: Vec<u8>,
|
||||
bytes_per_sec: Option<f64>,
|
||||
once: bool,
|
||||
running: Arc<AtomicBool>,
|
||||
) {
|
||||
while running.load(Ordering::Relaxed) {
|
||||
match connect_tcp(&endpoint, interface.as_deref()) {
|
||||
Ok(mut stream) => {
|
||||
let _ = stream.set_nodelay(true);
|
||||
|
||||
loop {
|
||||
if !running.load(Ordering::Relaxed) {
|
||||
return;
|
||||
}
|
||||
|
||||
if let Err(err) = write_with_limit(&mut stream, &shard, bytes_per_sec) {
|
||||
let _ = id;
|
||||
let _ = err;
|
||||
break;
|
||||
}
|
||||
|
||||
if once {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
let _ = id;
|
||||
let _ = err;
|
||||
thread::sleep(Duration::from_millis(500));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn main() -> Result<()> {
|
||||
let args = Args::parse();
|
||||
|
||||
anyhow::ensure!(args.connections > 0, "--connections must be >= 1");
|
||||
|
||||
let commands = build_commands(&args)?;
|
||||
let total_bytes: usize = commands.iter().map(Vec::len).sum();
|
||||
let shards = shard_commands(commands, args.connections);
|
||||
|
||||
let global_bps = args.mbps.map(|mbps| mbps * 1_000_000.0 / 8.0);
|
||||
let per_conn_bps = global_bps.map(|bps| bps / shards.len() as f64);
|
||||
|
||||
let _ = total_bytes;
|
||||
|
||||
let running = Arc::new(AtomicBool::new(true));
|
||||
{
|
||||
let running = running.clone();
|
||||
ctrlc::set_handler(move || {
|
||||
running.store(false, Ordering::Relaxed);
|
||||
})?;
|
||||
}
|
||||
|
||||
let mut handles = Vec::new();
|
||||
|
||||
for (id, shard) in shards.into_iter().enumerate() {
|
||||
let endpoint = args.endpoint.clone();
|
||||
let interface = args.interface.clone();
|
||||
let running = running.clone();
|
||||
let once = args.once;
|
||||
|
||||
handles.push(thread::spawn(move || {
|
||||
worker(id, endpoint, interface, shard, per_conn_bps, once, running);
|
||||
}));
|
||||
}
|
||||
|
||||
for h in handles {
|
||||
let _ = h.join();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user