From eb8b2c27b36d26709dab480dfd86d6b4f3a8bc38 Mon Sep 17 00:00:00 2001 From: acidburns Date: Sun, 7 Jun 2026 01:53:23 +0200 Subject: [PATCH] Add moving pixelflut mode --- src/main.rs | 469 ++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 401 insertions(+), 68 deletions(-) diff --git a/src/main.rs b/src/main.rs index 1efbe11..7f3a136 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,19 +1,23 @@ use anyhow::{Context, Result}; -use socket2::{Domain, Protocol, Socket, Type}; -use std::ffi::CString; -use std::os::fd::AsRawFd; -use std::net::ToSocketAddrs; use clap::Parser; +#[cfg(target_os = "linux")] +use socket2::{Domain, Protocol, Socket, Type}; +#[cfg(target_os = "linux")] +use std::ffi::CString; +use std::io::{BufRead, BufReader, Write}; +#[cfg(target_os = "linux")] +use std::net::ToSocketAddrs; +#[cfg(target_os = "linux")] +use std::os::fd::AsRawFd; use std::{ - io::Write, net::TcpStream, path::Path, sync::{ atomic::{AtomicBool, Ordering}, - Arc, + Arc, RwLock, }, thread, - time::{Duration, Instant}, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; const CCC_MANNHEIM_LOGO: &str = "Logo2.png"; @@ -48,6 +52,27 @@ struct Args { #[arg(long)] once: bool, + + /// Move the image around the server canvas and bounce off the SIZE boundaries. + #[arg(long)] + moving: bool, +} + +#[derive(Debug, Clone, Copy)] +struct DrawablePixel { + dx: i32, + dy: i32, + r: u8, + g: u8, + b: u8, + a: u8, +} + +#[derive(Debug, Clone)] +struct LoadedImage { + width: i32, + height: i32, + pixels: Vec, } fn load_image_bytes(src: &str) -> Result> { @@ -64,41 +89,87 @@ fn load_image_bytes(src: &str) -> Result> { } } -fn build_commands(args: &Args) -> Result>> { - let bytes = load_image_bytes(&args.image)?; +fn load_image(src: &str, send_alpha: bool) -> Result { + let bytes = load_image_bytes(src)?; let img = image::load_from_memory(&bytes) .context("failed to decode image")? .to_rgba8(); let (w, h) = img.dimensions(); - let mut cmds = Vec::with_capacity((w * h) as usize); + let mut pixels = Vec::with_capacity((w * h) as usize); for yy in 0..h { for xx in 0..w { let p = img.get_pixel(xx, yy); let [r, g, b, a] = p.0; - if a == 0 && !args.send_alpha { + if a == 0 && !send_alpha { continue; } - let x = args.x + xx as i32; - let y = args.y + yy as i32; - - if x < 0 || y < 0 { - continue; - } - - let line = if args.send_alpha && a < 255 { - format!("PX {x} {y} {r:02X}{g:02X}{b:02X}{a:02X}\n") - } else { - format!("PX {x} {y} {r:02X}{g:02X}{b:02X}\n") - }; - - cmds.push(line.into_bytes()); + pixels.push(DrawablePixel { + dx: xx as i32, + dy: yy as i32, + r, + g, + b, + a, + }); } } + anyhow::ensure!(!pixels.is_empty(), "no drawable pixels found"); + + Ok(LoadedImage { + width: w as i32, + height: h as i32, + pixels, + }) +} + +fn build_commands_at( + image: &LoadedImage, + x_offset: i32, + y_offset: i32, + send_alpha: bool, +) -> Vec> { + let mut cmds = Vec::with_capacity(image.pixels.len()); + + for p in &image.pixels { + let x = x_offset + p.dx; + let y = y_offset + p.dy; + + if x < 0 || y < 0 { + continue; + } + + let line = if send_alpha && p.a < 255 { + format!( + "PX {x} {y} {r:02X}{g:02X}{b:02X}{a:02X}\n", + r = p.r, + g = p.g, + b = p.b, + a = p.a + ) + } else { + format!( + "PX {x} {y} {r:02X}{g:02X}{b:02X}\n", + r = p.r, + g = p.g, + b = p.b + ) + }; + + cmds.push(line.into_bytes()); + } + + cmds +} + +fn build_commands(args: &Args) -> Result>> { + let image = load_image(&args.image, args.send_alpha)?; + let cmds = build_commands_at(&image, args.x, args.y, args.send_alpha); + anyhow::ensure!(!cmds.is_empty(), "no drawable pixels found"); Ok(cmds) } @@ -116,44 +187,93 @@ fn shard_commands(commands: Vec>, connections: usize) -> Vec> { fn connect_tcp(endpoint: &str, interface: Option<&str>) -> Result { if let Some(iface) = interface { - let socket = Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP)) - .context("failed to create TCP socket")?; - - let iface_cstr = CString::new(iface) - .context("interface name contains invalid null byte")?; - - let rc = unsafe { - libc::setsockopt( - socket.as_raw_fd(), - libc::SOL_SOCKET, - libc::SO_BINDTODEVICE, - iface_cstr.as_ptr() as *const libc::c_void, - iface_cstr.as_bytes_with_nul().len() as libc::socklen_t, - ) - }; - - if rc != 0 { - return Err(std::io::Error::last_os_error()) - .with_context(|| format!("failed to bind socket to interface {iface}")); - } - - let addr = endpoint - .to_socket_addrs() - .with_context(|| format!("failed to resolve endpoint {endpoint}"))? - .next() - .with_context(|| format!("no socket address found for endpoint {endpoint}"))?; - - socket - .connect(&addr.into()) - .with_context(|| format!("failed to connect to {endpoint} via {iface}"))?; - - Ok(socket.into()) + connect_tcp_bound_to_interface(endpoint, iface) } else { - TcpStream::connect(endpoint) - .with_context(|| format!("failed to connect to {endpoint}")) + TcpStream::connect(endpoint).with_context(|| format!("failed to connect to {endpoint}")) } } +#[cfg(target_os = "linux")] +fn connect_tcp_bound_to_interface(endpoint: &str, iface: &str) -> Result { + let socket = Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP)) + .context("failed to create TCP socket")?; + + let iface_cstr = CString::new(iface).context("interface name contains invalid null byte")?; + + let rc = unsafe { + libc::setsockopt( + socket.as_raw_fd(), + libc::SOL_SOCKET, + libc::SO_BINDTODEVICE, + iface_cstr.as_ptr() as *const libc::c_void, + iface_cstr.as_bytes_with_nul().len() as libc::socklen_t, + ) + }; + + if rc != 0 { + return Err(std::io::Error::last_os_error()) + .with_context(|| format!("failed to bind socket to interface {iface}")); + } + + let addr = endpoint + .to_socket_addrs() + .with_context(|| format!("failed to resolve endpoint {endpoint}"))? + .next() + .with_context(|| format!("no socket address found for endpoint {endpoint}"))?; + + socket + .connect(&addr.into()) + .with_context(|| format!("failed to connect to {endpoint} via {iface}"))?; + + Ok(socket.into()) +} + +#[cfg(not(target_os = "linux"))] +fn connect_tcp_bound_to_interface(_endpoint: &str, _iface: &str) -> Result { + anyhow::bail!("--interface is only supported on Linux"); +} + +fn query_screen_size(endpoint: &str, interface: Option<&str>) -> Result<(i32, i32)> { + let mut stream = connect_tcp(endpoint, interface) + .with_context(|| format!("failed to connect to {endpoint} for SIZE query"))?; + + stream + .set_read_timeout(Some(Duration::from_secs(5))) + .context("failed to set SIZE query read timeout")?; + stream + .set_write_timeout(Some(Duration::from_secs(5))) + .context("failed to set SIZE query write timeout")?; + + stream + .write_all(b"SIZE\n") + .context("failed to send SIZE query")?; + + let mut reader = BufReader::new(stream); + let mut line = String::new(); + reader + .read_line(&mut line) + .context("failed to read SIZE response")?; + + let parts = line.split_whitespace().collect::>(); + anyhow::ensure!( + parts.len() == 3 && parts[0].eq_ignore_ascii_case("SIZE"), + "unexpected SIZE response: {line:?}" + ); + + let width = parts[1] + .parse::() + .with_context(|| format!("invalid SIZE width in response: {line:?}"))?; + let height = parts[2] + .parse::() + .with_context(|| format!("invalid SIZE height in response: {line:?}"))?; + + anyhow::ensure!( + width > 0 && height > 0, + "server returned invalid screen size {width}x{height}" + ); + Ok((width, height)) +} + fn write_with_limit(stream: &mut TcpStream, buf: &[u8], bytes_per_sec: Option) -> Result<()> { match bytes_per_sec { None => { @@ -186,6 +306,50 @@ fn write_with_limit(stream: &mut TcpStream, buf: &[u8], bytes_per_sec: Option, + shared_shards: Arc>>>, + bytes_per_sec: Option, + running: Arc, +) { + while running.load(Ordering::Relaxed) { + match connect_tcp(&endpoint, interface.as_deref()) { + Ok(mut stream) => { + let _ = stream.set_nodelay(true); + + loop { + if !running.load(Ordering::Relaxed) { + return; + } + + let shard = match shared_shards.read() { + Ok(shards) => shards.get(id).cloned().unwrap_or_default(), + Err(_) => return, + }; + + if shard.is_empty() { + thread::sleep(Duration::from_millis(10)); + continue; + } + + if let Err(err) = write_with_limit(&mut stream, &shard, bytes_per_sec) { + let _ = id; + let _ = err; + break; + } + } + } + Err(err) => { + let _ = id; + let _ = err; + thread::sleep(Duration::from_millis(500)); + } + } + } +} + fn worker( id: usize, endpoint: String, @@ -225,9 +389,73 @@ fn worker( } } -fn main() -> Result<()> { - let args = Args::parse(); +#[derive(Debug, Clone)] +struct SimpleRng { + state: u64, +} +impl SimpleRng { + fn from_entropy() -> Self { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_nanos() as u64) + .unwrap_or(0x9E37_79B9_7F4A_7C15); + + Self { + state: nanos ^ 0xA5A5_A5A5_5A5A_5A5A, + } + } + + fn next_u32(&mut self) -> u32 { + let mut x = self.state; + x ^= x << 13; + x ^= x >> 7; + x ^= x << 17; + self.state = x; + (x >> 32) as u32 + } + + fn bool(&mut self) -> bool { + self.next_u32() & 1 == 0 + } + + fn range_i32(&mut self, min: i32, max: i32) -> i32 { + debug_assert!(min <= max); + let span = (max - min + 1) as u32; + min + (self.next_u32() % span) as i32 + } +} + +fn random_speed_component(rng: &mut SimpleRng, max_step: i32) -> i32 { + rng.range_i32(1, max_step.max(1)) +} + +fn random_velocity(rng: &mut SimpleRng, can_move: bool, max_step: i32) -> i32 { + if !can_move { + return 0; + } + + let speed = random_speed_component(rng, max_step); + if rng.bool() { + speed + } else { + -speed + } +} + +fn install_ctrlc_handler() -> Result> { + let running = Arc::new(AtomicBool::new(true)); + { + let running = running.clone(); + ctrlc::set_handler(move || { + running.store(false, Ordering::Relaxed); + })?; + } + + Ok(running) +} + +fn run_static(args: Args) -> Result<()> { anyhow::ensure!(args.connections > 0, "--connections must be >= 1"); let commands = build_commands(&args)?; @@ -239,13 +467,7 @@ fn main() -> Result<()> { let _ = total_bytes; - let running = Arc::new(AtomicBool::new(true)); - { - let running = running.clone(); - ctrlc::set_handler(move || { - running.store(false, Ordering::Relaxed); - })?; - } + let running = install_ctrlc_handler()?; let mut handles = Vec::new(); @@ -266,3 +488,114 @@ fn main() -> Result<()> { Ok(()) } + +fn run_moving(args: Args) -> Result<()> { + anyhow::ensure!(args.connections > 0, "--connections must be >= 1"); + + let image = load_image(&args.image, args.send_alpha)?; + let (screen_w, screen_h) = query_screen_size(&args.endpoint, args.interface.as_deref())?; + + anyhow::ensure!( + image.width <= screen_w && image.height <= screen_h, + "image is {}x{}, but server screen is {screen_w}x{screen_h}", + image.width, + image.height + ); + + let max_x = screen_w - image.width; + let max_y = screen_h - image.height; + let mut rng = SimpleRng::from_entropy(); + let max_step = (screen_w.min(screen_h) / 160).clamp(1, 24); + let mut x = args.x.clamp(0, max_x); + let mut y = args.y.clamp(0, max_y); + let mut dx = random_velocity(&mut rng, max_x > 0, max_step); + let mut dy = random_velocity(&mut rng, max_y > 0, max_step); + + let initial_commands = build_commands_at(&image, x, y, args.send_alpha); + anyhow::ensure!(!initial_commands.is_empty(), "no drawable pixels found"); + + let initial_shards = shard_commands(initial_commands, args.connections); + let global_bps = args.mbps.map(|mbps| mbps * 1_000_000.0 / 8.0); + let per_conn_bps = global_bps.map(|bps| bps / initial_shards.len() as f64); + let shared_shards = Arc::new(RwLock::new(initial_shards)); + let running = install_ctrlc_handler()?; + + let mut handles = Vec::new(); + let worker_count = shared_shards.read().map(|shards| shards.len()).unwrap_or(0); + + for id in 0..worker_count { + let endpoint = args.endpoint.clone(); + let interface = args.interface.clone(); + let running = running.clone(); + let shared_shards = shared_shards.clone(); + + handles.push(thread::spawn(move || { + worker_moving( + id, + endpoint, + interface, + shared_shards, + per_conn_bps, + running, + ); + })); + } + + while running.load(Ordering::Relaxed) { + thread::sleep(Duration::from_millis(50)); + + x += dx; + y += dy; + + if max_x == 0 { + x = 0; + dx = 0; + } else if x <= 0 { + x = 0; + dx = random_speed_component(&mut rng, max_step); + } else if x >= max_x { + x = max_x; + dx = -random_speed_component(&mut rng, max_step); + } + + if max_y == 0 { + y = 0; + dy = 0; + } else if y <= 0 { + y = 0; + dy = random_speed_component(&mut rng, max_step); + } else if y >= max_y { + y = max_y; + dy = -random_speed_component(&mut rng, max_step); + } + + let commands = build_commands_at(&image, x, y, args.send_alpha); + let shards = shard_commands(commands, args.connections); + + match shared_shards.write() { + Ok(mut shared) => *shared = shards, + Err(_) => break, + } + } + + for h in handles { + let _ = h.join(); + } + + Ok(()) +} + +fn main() -> Result<()> { + let args = Args::parse(); + + anyhow::ensure!( + !(args.once && args.moving), + "--once and --moving cannot be used together" + ); + + if args.moving { + run_moving(args) + } else { + run_static(args) + } +}