Add moving pixelflut mode

This commit is contained in:
2026-06-07 01:53:23 +02:00
parent 8c26fcf228
commit eb8b2c27b3

View File

@@ -1,19 +1,23 @@
use anyhow::{Context, Result};
use socket2::{Domain, Protocol, Socket, Type};
use std::ffi::CString;
use std::os::fd::AsRawFd;
use std::net::ToSocketAddrs;
use clap::Parser;
#[cfg(target_os = "linux")]
use socket2::{Domain, Protocol, Socket, Type};
#[cfg(target_os = "linux")]
use std::ffi::CString;
use std::io::{BufRead, BufReader, Write};
#[cfg(target_os = "linux")]
use std::net::ToSocketAddrs;
#[cfg(target_os = "linux")]
use std::os::fd::AsRawFd;
use std::{
io::Write,
net::TcpStream,
path::Path,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
Arc, RwLock,
},
thread,
time::{Duration, Instant},
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};
const CCC_MANNHEIM_LOGO: &str = "Logo2.png";
@@ -48,6 +52,27 @@ struct Args {
#[arg(long)]
once: bool,
/// Move the image around the server canvas and bounce off the SIZE boundaries.
#[arg(long)]
moving: bool,
}
#[derive(Debug, Clone, Copy)]
struct DrawablePixel {
dx: i32,
dy: i32,
r: u8,
g: u8,
b: u8,
a: u8,
}
#[derive(Debug, Clone)]
struct LoadedImage {
width: i32,
height: i32,
pixels: Vec<DrawablePixel>,
}
fn load_image_bytes(src: &str) -> Result<Vec<u8>> {
@@ -64,41 +89,87 @@ fn load_image_bytes(src: &str) -> Result<Vec<u8>> {
}
}
fn build_commands(args: &Args) -> Result<Vec<Vec<u8>>> {
let bytes = load_image_bytes(&args.image)?;
fn load_image(src: &str, send_alpha: bool) -> Result<LoadedImage> {
let bytes = load_image_bytes(src)?;
let img = image::load_from_memory(&bytes)
.context("failed to decode image")?
.to_rgba8();
let (w, h) = img.dimensions();
let mut cmds = Vec::with_capacity((w * h) as usize);
let mut pixels = Vec::with_capacity((w * h) as usize);
for yy in 0..h {
for xx in 0..w {
let p = img.get_pixel(xx, yy);
let [r, g, b, a] = p.0;
if a == 0 && !args.send_alpha {
if a == 0 && !send_alpha {
continue;
}
let x = args.x + xx as i32;
let y = args.y + yy as i32;
if x < 0 || y < 0 {
continue;
}
let line = if args.send_alpha && a < 255 {
format!("PX {x} {y} {r:02X}{g:02X}{b:02X}{a:02X}\n")
} else {
format!("PX {x} {y} {r:02X}{g:02X}{b:02X}\n")
};
cmds.push(line.into_bytes());
pixels.push(DrawablePixel {
dx: xx as i32,
dy: yy as i32,
r,
g,
b,
a,
});
}
}
anyhow::ensure!(!pixels.is_empty(), "no drawable pixels found");
Ok(LoadedImage {
width: w as i32,
height: h as i32,
pixels,
})
}
fn build_commands_at(
image: &LoadedImage,
x_offset: i32,
y_offset: i32,
send_alpha: bool,
) -> Vec<Vec<u8>> {
let mut cmds = Vec::with_capacity(image.pixels.len());
for p in &image.pixels {
let x = x_offset + p.dx;
let y = y_offset + p.dy;
if x < 0 || y < 0 {
continue;
}
let line = if send_alpha && p.a < 255 {
format!(
"PX {x} {y} {r:02X}{g:02X}{b:02X}{a:02X}\n",
r = p.r,
g = p.g,
b = p.b,
a = p.a
)
} else {
format!(
"PX {x} {y} {r:02X}{g:02X}{b:02X}\n",
r = p.r,
g = p.g,
b = p.b
)
};
cmds.push(line.into_bytes());
}
cmds
}
fn build_commands(args: &Args) -> Result<Vec<Vec<u8>>> {
let image = load_image(&args.image, args.send_alpha)?;
let cmds = build_commands_at(&image, args.x, args.y, args.send_alpha);
anyhow::ensure!(!cmds.is_empty(), "no drawable pixels found");
Ok(cmds)
}
@@ -116,44 +187,93 @@ fn shard_commands(commands: Vec<Vec<u8>>, connections: usize) -> Vec<Vec<u8>> {
fn connect_tcp(endpoint: &str, interface: Option<&str>) -> Result<TcpStream> {
if let Some(iface) = interface {
let socket = Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP))
.context("failed to create TCP socket")?;
let iface_cstr = CString::new(iface)
.context("interface name contains invalid null byte")?;
let rc = unsafe {
libc::setsockopt(
socket.as_raw_fd(),
libc::SOL_SOCKET,
libc::SO_BINDTODEVICE,
iface_cstr.as_ptr() as *const libc::c_void,
iface_cstr.as_bytes_with_nul().len() as libc::socklen_t,
)
};
if rc != 0 {
return Err(std::io::Error::last_os_error())
.with_context(|| format!("failed to bind socket to interface {iface}"));
}
let addr = endpoint
.to_socket_addrs()
.with_context(|| format!("failed to resolve endpoint {endpoint}"))?
.next()
.with_context(|| format!("no socket address found for endpoint {endpoint}"))?;
socket
.connect(&addr.into())
.with_context(|| format!("failed to connect to {endpoint} via {iface}"))?;
Ok(socket.into())
connect_tcp_bound_to_interface(endpoint, iface)
} else {
TcpStream::connect(endpoint)
.with_context(|| format!("failed to connect to {endpoint}"))
TcpStream::connect(endpoint).with_context(|| format!("failed to connect to {endpoint}"))
}
}
#[cfg(target_os = "linux")]
fn connect_tcp_bound_to_interface(endpoint: &str, iface: &str) -> Result<TcpStream> {
let socket = Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP))
.context("failed to create TCP socket")?;
let iface_cstr = CString::new(iface).context("interface name contains invalid null byte")?;
let rc = unsafe {
libc::setsockopt(
socket.as_raw_fd(),
libc::SOL_SOCKET,
libc::SO_BINDTODEVICE,
iface_cstr.as_ptr() as *const libc::c_void,
iface_cstr.as_bytes_with_nul().len() as libc::socklen_t,
)
};
if rc != 0 {
return Err(std::io::Error::last_os_error())
.with_context(|| format!("failed to bind socket to interface {iface}"));
}
let addr = endpoint
.to_socket_addrs()
.with_context(|| format!("failed to resolve endpoint {endpoint}"))?
.next()
.with_context(|| format!("no socket address found for endpoint {endpoint}"))?;
socket
.connect(&addr.into())
.with_context(|| format!("failed to connect to {endpoint} via {iface}"))?;
Ok(socket.into())
}
#[cfg(not(target_os = "linux"))]
fn connect_tcp_bound_to_interface(_endpoint: &str, _iface: &str) -> Result<TcpStream> {
anyhow::bail!("--interface is only supported on Linux");
}
fn query_screen_size(endpoint: &str, interface: Option<&str>) -> Result<(i32, i32)> {
let mut stream = connect_tcp(endpoint, interface)
.with_context(|| format!("failed to connect to {endpoint} for SIZE query"))?;
stream
.set_read_timeout(Some(Duration::from_secs(5)))
.context("failed to set SIZE query read timeout")?;
stream
.set_write_timeout(Some(Duration::from_secs(5)))
.context("failed to set SIZE query write timeout")?;
stream
.write_all(b"SIZE\n")
.context("failed to send SIZE query")?;
let mut reader = BufReader::new(stream);
let mut line = String::new();
reader
.read_line(&mut line)
.context("failed to read SIZE response")?;
let parts = line.split_whitespace().collect::<Vec<_>>();
anyhow::ensure!(
parts.len() == 3 && parts[0].eq_ignore_ascii_case("SIZE"),
"unexpected SIZE response: {line:?}"
);
let width = parts[1]
.parse::<i32>()
.with_context(|| format!("invalid SIZE width in response: {line:?}"))?;
let height = parts[2]
.parse::<i32>()
.with_context(|| format!("invalid SIZE height in response: {line:?}"))?;
anyhow::ensure!(
width > 0 && height > 0,
"server returned invalid screen size {width}x{height}"
);
Ok((width, height))
}
fn write_with_limit(stream: &mut TcpStream, buf: &[u8], bytes_per_sec: Option<f64>) -> Result<()> {
match bytes_per_sec {
None => {
@@ -186,6 +306,50 @@ fn write_with_limit(stream: &mut TcpStream, buf: &[u8], bytes_per_sec: Option<f6
Ok(())
}
fn worker_moving(
id: usize,
endpoint: String,
interface: Option<String>,
shared_shards: Arc<RwLock<Vec<Vec<u8>>>>,
bytes_per_sec: Option<f64>,
running: Arc<AtomicBool>,
) {
while running.load(Ordering::Relaxed) {
match connect_tcp(&endpoint, interface.as_deref()) {
Ok(mut stream) => {
let _ = stream.set_nodelay(true);
loop {
if !running.load(Ordering::Relaxed) {
return;
}
let shard = match shared_shards.read() {
Ok(shards) => shards.get(id).cloned().unwrap_or_default(),
Err(_) => return,
};
if shard.is_empty() {
thread::sleep(Duration::from_millis(10));
continue;
}
if let Err(err) = write_with_limit(&mut stream, &shard, bytes_per_sec) {
let _ = id;
let _ = err;
break;
}
}
}
Err(err) => {
let _ = id;
let _ = err;
thread::sleep(Duration::from_millis(500));
}
}
}
}
fn worker(
id: usize,
endpoint: String,
@@ -225,9 +389,73 @@ fn worker(
}
}
fn main() -> Result<()> {
let args = Args::parse();
#[derive(Debug, Clone)]
struct SimpleRng {
state: u64,
}
impl SimpleRng {
fn from_entropy() -> Self {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0x9E37_79B9_7F4A_7C15);
Self {
state: nanos ^ 0xA5A5_A5A5_5A5A_5A5A,
}
}
fn next_u32(&mut self) -> u32 {
let mut x = self.state;
x ^= x << 13;
x ^= x >> 7;
x ^= x << 17;
self.state = x;
(x >> 32) as u32
}
fn bool(&mut self) -> bool {
self.next_u32() & 1 == 0
}
fn range_i32(&mut self, min: i32, max: i32) -> i32 {
debug_assert!(min <= max);
let span = (max - min + 1) as u32;
min + (self.next_u32() % span) as i32
}
}
fn random_speed_component(rng: &mut SimpleRng, max_step: i32) -> i32 {
rng.range_i32(1, max_step.max(1))
}
fn random_velocity(rng: &mut SimpleRng, can_move: bool, max_step: i32) -> i32 {
if !can_move {
return 0;
}
let speed = random_speed_component(rng, max_step);
if rng.bool() {
speed
} else {
-speed
}
}
fn install_ctrlc_handler() -> Result<Arc<AtomicBool>> {
let running = Arc::new(AtomicBool::new(true));
{
let running = running.clone();
ctrlc::set_handler(move || {
running.store(false, Ordering::Relaxed);
})?;
}
Ok(running)
}
fn run_static(args: Args) -> Result<()> {
anyhow::ensure!(args.connections > 0, "--connections must be >= 1");
let commands = build_commands(&args)?;
@@ -239,13 +467,7 @@ fn main() -> Result<()> {
let _ = total_bytes;
let running = Arc::new(AtomicBool::new(true));
{
let running = running.clone();
ctrlc::set_handler(move || {
running.store(false, Ordering::Relaxed);
})?;
}
let running = install_ctrlc_handler()?;
let mut handles = Vec::new();
@@ -266,3 +488,114 @@ fn main() -> Result<()> {
Ok(())
}
fn run_moving(args: Args) -> Result<()> {
anyhow::ensure!(args.connections > 0, "--connections must be >= 1");
let image = load_image(&args.image, args.send_alpha)?;
let (screen_w, screen_h) = query_screen_size(&args.endpoint, args.interface.as_deref())?;
anyhow::ensure!(
image.width <= screen_w && image.height <= screen_h,
"image is {}x{}, but server screen is {screen_w}x{screen_h}",
image.width,
image.height
);
let max_x = screen_w - image.width;
let max_y = screen_h - image.height;
let mut rng = SimpleRng::from_entropy();
let max_step = (screen_w.min(screen_h) / 160).clamp(1, 24);
let mut x = args.x.clamp(0, max_x);
let mut y = args.y.clamp(0, max_y);
let mut dx = random_velocity(&mut rng, max_x > 0, max_step);
let mut dy = random_velocity(&mut rng, max_y > 0, max_step);
let initial_commands = build_commands_at(&image, x, y, args.send_alpha);
anyhow::ensure!(!initial_commands.is_empty(), "no drawable pixels found");
let initial_shards = shard_commands(initial_commands, args.connections);
let global_bps = args.mbps.map(|mbps| mbps * 1_000_000.0 / 8.0);
let per_conn_bps = global_bps.map(|bps| bps / initial_shards.len() as f64);
let shared_shards = Arc::new(RwLock::new(initial_shards));
let running = install_ctrlc_handler()?;
let mut handles = Vec::new();
let worker_count = shared_shards.read().map(|shards| shards.len()).unwrap_or(0);
for id in 0..worker_count {
let endpoint = args.endpoint.clone();
let interface = args.interface.clone();
let running = running.clone();
let shared_shards = shared_shards.clone();
handles.push(thread::spawn(move || {
worker_moving(
id,
endpoint,
interface,
shared_shards,
per_conn_bps,
running,
);
}));
}
while running.load(Ordering::Relaxed) {
thread::sleep(Duration::from_millis(50));
x += dx;
y += dy;
if max_x == 0 {
x = 0;
dx = 0;
} else if x <= 0 {
x = 0;
dx = random_speed_component(&mut rng, max_step);
} else if x >= max_x {
x = max_x;
dx = -random_speed_component(&mut rng, max_step);
}
if max_y == 0 {
y = 0;
dy = 0;
} else if y <= 0 {
y = 0;
dy = random_speed_component(&mut rng, max_step);
} else if y >= max_y {
y = max_y;
dy = -random_speed_component(&mut rng, max_step);
}
let commands = build_commands_at(&image, x, y, args.send_alpha);
let shards = shard_commands(commands, args.connections);
match shared_shards.write() {
Ok(mut shared) => *shared = shards,
Err(_) => break,
}
}
for h in handles {
let _ = h.join();
}
Ok(())
}
fn main() -> Result<()> {
let args = Args::parse();
anyhow::ensure!(
!(args.once && args.moving),
"--once and --moving cannot be used together"
);
if args.moving {
run_moving(args)
} else {
run_static(args)
}
}