From e58cfc115d75b3f0f404130d7b8e20d4d44ffc74 Mon Sep 17 00:00:00 2001 From: Fabian Weik Date: Thu, 15 Jan 2026 20:18:21 +0100 Subject: [PATCH 1/7] fix error related to reading a frame --- client/src/display/mod.rs | 2 + server/src/device/connection.rs | 2 +- server/src/device/process.rs | 16 ++++++-- server/src/device/reading.rs | 69 ++++++++++++++++----------------- 4 files changed, 49 insertions(+), 40 deletions(-) diff --git a/client/src/display/mod.rs b/client/src/display/mod.rs index 4f2f06e..7d46cbe 100644 --- a/client/src/display/mod.rs +++ b/client/src/display/mod.rs @@ -35,6 +35,8 @@ pub async fn gstreamer_thread(opts: ClientOptions) -> Result<(), Error> { let mut decoded_video_data = FrameDecoder::new(stream); + sleep(Duration::from_secs(1)); + let (pipeline, appsrc) = build_pipeline().context("could not build gstreamer pipeline")?; pipeline .set_state(gstreamer::State::Playing) diff --git a/server/src/device/connection.rs b/server/src/device/connection.rs index a3e5c16..bb6ac6a 100644 --- a/server/src/device/connection.rs +++ b/server/src/device/connection.rs @@ -87,7 +87,7 @@ async fn open_connection( let mut buffer = vec![0u8; frame_reader.frame_length()]; loop { frame_reader - .read_exact(&mut buffer) + .read_one_frame(&mut buffer) .context("error reading frame from file")?; debug!("read exactly {} bytes from frame reader", buffer.len()); diff --git a/server/src/device/process.rs b/server/src/device/process.rs index 06c9bc0..f4e7882 100644 --- a/server/src/device/process.rs +++ b/server/src/device/process.rs @@ -1,9 +1,11 @@ use std::fs::File; use anyhow::{Context, Error, anyhow}; -use procfs::process::{MMapPath, Process, all_processes}; +use itertools::Itertools; +use procfs::process::{MMapPath, MemoryMap, Process, all_processes}; +use tracing::trace; -pub fn get_memory_file() -> Result<(File, usize), Error> { +pub fn get_xochitl_memory_file() -> Result<(File, usize), Error> { let process = get_process().context("could not get xochitl process")?; let memory_file = process.mem().context("could not get xochitl memory file")?; @@ -30,16 +32,22 @@ fn get_process() -> Result { Ok(process) } +/// Get the offset of the framebuffer in the process memory +/// +/// The framebuffer is the next mapped file after /dev/fb0 fn get_framebuffer_offset_in_process_memory(process: &Process) -> Result { let framebuffer_path_name = MMapPath::from("/dev/fb0").context("could not build framebuffer path name")?; + let matches_path_name = |m: &&MemoryMap| m.pathname == framebuffer_path_name; + let maps = process.maps().context("could not get process maps")?; - let mut maps = maps.iter().filter(|m| m.pathname == framebuffer_path_name); + + let mut maps = maps.iter().skip_while(|m| !matches_path_name(m)).skip(1); let framebuffer_map = maps.next().expect("found no framebuffer map"); - if let Some(_) = maps.next() { + if let Some(_) = maps.filter(matches_path_name).next() { return Err(anyhow!("found more than one framebuffer map")); } diff --git a/server/src/device/reading.rs b/server/src/device/reading.rs index 928af1d..e45eb3a 100644 --- a/server/src/device/reading.rs +++ b/server/src/device/reading.rs @@ -3,16 +3,16 @@ use std::{ io::{Read, Seek, SeekFrom}, }; -use anyhow::{Context, Error}; +use anyhow::{Context, Error, anyhow}; +use tracing::trace; -use super::process::get_memory_file; +use super::process::get_xochitl_memory_file; use crate::config::{self, VideoConfig}; #[derive(Debug)] pub struct FrameReader { file: File, offset: usize, - current: usize, width: usize, height: usize, bytes_per_pixel: usize, @@ -25,52 +25,51 @@ impl FrameReader { File::open(path).context("could not open framebuffer file")?, 0, ), - config::VideoDataSource::ProcessMemory => { - get_memory_file().context("could not get file and offset for xochitl process")? - } + config::VideoDataSource::ProcessMemory => get_xochitl_memory_file() + .context("could not get file and offset for xochitl process")?, }; - let mut frame_reader = Self { + trace!( + "file offset: {}, extra skip: {}", + offset, video_config.internal.skip + ); + let offset = offset + video_config.internal.skip; + + Ok(Self { file, offset, - current: 0, width: video_config.shared.width, height: video_config.shared.height, bytes_per_pixel: video_config.shared.bytes_per_pixel, - }; - frame_reader - .point_file_to_framebuffer_memory_start() - .context("could not initialize file to offset")?; - Ok(frame_reader) + }) } pub fn frame_length(&self) -> usize { self.width * self.height * self.bytes_per_pixel } - // TODO: anyhow error handling instead of io Errors - fn point_file_to_framebuffer_memory_start(&mut self) -> std::io::Result<()> { - self.file.seek(SeekFrom::Start(self.offset as u64))?; - self.current = 0; - - Ok(()) - } -} + pub fn read_one_frame(&mut self, buf: &mut [u8]) -> Result<(), Error> { + trace!("attempting to read one frame"); + if buf.len() != self.frame_length() { + return Err(anyhow!( + "frame is {} bytes long, but buffer is only {} bytes long", + self.frame_length(), + buf.len(), + )); + } -impl Read for FrameReader { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - let requested = buf.len(); - let bytes_read = if self.current + requested < self.frame_length() { - self.file.read(buf)? - } else { - let rest = self.frame_length() - self.current; - self.file.read(&mut buf[0..rest])? - }; + trace!("pointing file to start of frame: {}", self.offset); + self.file + .seek(SeekFrom::Start(self.offset as u64)) + .context("could not point file to beginning of frame")?; + trace!("reading one frame from memory: {} bytes", buf.len()); + self.file + .read_exact(buf) + .context(format!("could not read {} bytes from memory", buf.len())) + .map(|_| { + trace!("successfully read one frame from memory"); - self.current += bytes_read; - if self.current == self.frame_length() { - self.point_file_to_framebuffer_memory_start()?; - } - Ok(bytes_read) + () + }) } } From bfce555ab4e0835fb91189d2708090a1a8e05763 Mon Sep 17 00:00:00 2001 From: Fabian Weik Date: Thu, 15 Jan 2026 21:58:42 +0100 Subject: [PATCH 2/7] go back to using io::copy and Read --- client/src/config.rs | 13 ------ client/src/display/mod.rs | 38 +++++++++++----- server/src/device/connection.rs | 8 ++-- server/src/device/reading.rs | 77 +++++++++++++++++++++++---------- 4 files changed, 87 insertions(+), 49 deletions(-) diff --git a/client/src/config.rs b/client/src/config.rs index caca6d3..fa4e0aa 100644 --- a/client/src/config.rs +++ b/client/src/config.rs @@ -5,19 +5,6 @@ use std::{ use anyhow::{Context, Error}; use clap::Parser; -use gstreamer_video::VideoFormat; - -// TODO: get this info from reMarkable tablet directly -// first parse firmware version (on server) -// second set all settings accordingly -// third send out height, width, pixel format, and bytes per pixel to client -pub const HEIGHT: u32 = 1872; -pub const WIDTH: u32 = 1404; -pub const PIXEL_FORMAT: &str = "bgra"; -pub const VIDEO_FORMAT: VideoFormat = VideoFormat::Bgra; -pub const BYTES_PER_PIXEL: u32 = 4; -pub const FILE: &str = ":mem:"; -pub const SKIP_OFFSET: usize = 2629636; const DEFAULT_IP: &str = "10.11.99.1"; const DEFAULT_SSH_PORT: u16 = 22; diff --git a/client/src/display/mod.rs b/client/src/display/mod.rs index 7d46cbe..2cdc64e 100644 --- a/client/src/display/mod.rs +++ b/client/src/display/mod.rs @@ -5,8 +5,9 @@ use std::{io::Read as _, thread::sleep, time::Duration}; use anyhow::{Context, Error}; use futures::stream::StreamExt; use gstreamer_app::AppSrc; +use gstreamer_video::VideoFormat; use lz4_flex::frame::FrameDecoder; -use review_server::config::CommunicatedConfig; +use review_server::config::{CommunicatedConfig, PixelFormat}; use tokio::net::TcpStream; use tokio_util::codec::{Framed, LengthDelimitedCodec}; use tracing::{debug, info}; @@ -37,12 +38,18 @@ pub async fn gstreamer_thread(opts: ClientOptions) -> Result<(), Error> { sleep(Duration::from_secs(1)); - let (pipeline, appsrc) = build_pipeline().context("could not build gstreamer pipeline")?; + let (pipeline, appsrc) = + build_pipeline(&communicated_config).context("could not build gstreamer pipeline")?; pipeline .set_state(gstreamer::State::Playing) .context("could not start playing gstreamer pipeline")?; - let mut buffer = vec![0u8; (BYTES_PER_PIXEL * HEIGHT * WIDTH) as usize]; + let mut buffer = vec![ + 0u8; + (communicated_config.video_config.height + * communicated_config.video_config.width + * communicated_config.video_config.bytes_per_pixel) as usize + ]; loop { let n = decoded_video_data .read(&mut buffer) @@ -50,14 +57,12 @@ pub async fn gstreamer_thread(opts: ClientOptions) -> Result<(), Error> { let slice = buffer[..n].to_vec(); - debug!("read {} bytes:\n\n{:?}", n, &slice); + debug!("read {} bytes", n); let buffer = gstreamer::Buffer::from_slice(slice); appsrc .push_buffer(buffer) .context("could not push buffer to app source")?; - - sleep(Duration::from_secs_f64(1.)); } } @@ -78,10 +83,23 @@ async fn get_communicated_config( Ok((framed_stream.into_inner(), config)) } -fn build_pipeline() -> Result<(Pipeline, AppSrc), Error> { - let video_info = gstreamer_video::VideoInfo::builder(VIDEO_FORMAT, WIDTH, HEIGHT) - .build() - .context("could not build video info")?; +fn to_video_format(pixel_format: &PixelFormat) -> VideoFormat { + match pixel_format { + PixelFormat::Rgb565le => todo!("not sure what the video format for RGB 565 LE is"), + PixelFormat::Gray8 => VideoFormat::Gray8, + PixelFormat::Gray16be => VideoFormat::Gray16Be, + PixelFormat::Bgra => VideoFormat::Bgra, + } +} + +fn build_pipeline(communicated_config: &CommunicatedConfig) -> Result<(Pipeline, AppSrc), Error> { + let video_info = gstreamer_video::VideoInfo::builder( + to_video_format(&communicated_config.video_config.pixel_format), + communicated_config.video_config.width as u32, + communicated_config.video_config.height as u32, + ) + .build() + .context("could not build video info")?; let appsrc = gstreamer_app::AppSrc::builder() .caps( diff --git a/server/src/device/connection.rs b/server/src/device/connection.rs index bb6ac6a..7648e14 100644 --- a/server/src/device/connection.rs +++ b/server/src/device/connection.rs @@ -1,4 +1,4 @@ -use std::io::{Read, Write}; +use std::io::{self, Read, Write}; use std::time::Duration; use anyhow::{Context, Error}; @@ -78,12 +78,13 @@ async fn open_connection( debug!("created frame reader, starting loop to send data"); - /* io::copy(&mut frame_reader, &mut encoded_video_data) .context("error while copying frame buffer data to stream")?; - */ + + Ok(()) // TODO: this problably can be a simple io::copy + /* let mut buffer = vec![0u8; frame_reader.frame_length()]; loop { frame_reader @@ -106,4 +107,5 @@ async fn open_connection( sleep(Duration::from_secs_f64(1. / (opts.framerate as f64))).await; } + */ } diff --git a/server/src/device/reading.rs b/server/src/device/reading.rs index e45eb3a..83695ea 100644 --- a/server/src/device/reading.rs +++ b/server/src/device/reading.rs @@ -13,6 +13,7 @@ use crate::config::{self, VideoConfig}; pub struct FrameReader { file: File, offset: usize, + current: usize, width: usize, height: usize, bytes_per_pixel: usize, @@ -35,41 +36,71 @@ impl FrameReader { ); let offset = offset + video_config.internal.skip; - Ok(Self { + let mut frame_reader = Self { file, offset, + current: 0, width: video_config.shared.width, height: video_config.shared.height, bytes_per_pixel: video_config.shared.bytes_per_pixel, - }) + }; + + frame_reader + .point_file_to_beginning_of_frame() + .context("could not point file to beginning of frame")?; + + Ok(frame_reader) } pub fn frame_length(&self) -> usize { self.width * self.height * self.bytes_per_pixel } - pub fn read_one_frame(&mut self, buf: &mut [u8]) -> Result<(), Error> { - trace!("attempting to read one frame"); - if buf.len() != self.frame_length() { - return Err(anyhow!( - "frame is {} bytes long, but buffer is only {} bytes long", + fn point_file_to_beginning_of_frame(&mut self) -> std::io::Result<()> { + self.file.seek(SeekFrom::Start(self.offset as u64))?; + self.current = 0; + + Ok(()) + } +} + +impl Read for FrameReader { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let n = buf.len(); + + trace!("requested to read {} bytes from process", n); + + let n = if self.current + n < self.frame_length() { + trace!( + "reading next {} bytes from process (current: {}, end: {})", + n, + self.current, self.frame_length(), - buf.len(), - )); - } + ); - trace!("pointing file to start of frame: {}", self.offset); - self.file - .seek(SeekFrom::Start(self.offset as u64)) - .context("could not point file to beginning of frame")?; - trace!("reading one frame from memory: {} bytes", buf.len()); - self.file - .read_exact(buf) - .context(format!("could not read {} bytes from memory", buf.len())) - .map(|_| { - trace!("successfully read one frame from memory"); - - () - }) + self.file.read_exact(buf)?; + self.current += n; + + n + } else { + let n = self.frame_length() - self.current; + + trace!( + "reading last {} bytes from process (current: {}, end: {})", + n, + self.current, + self.frame_length(), + ); + + self.file.read_exact(&mut buf[..n])?; + + self.point_file_to_beginning_of_frame()?; + + n + }; + + trace!("read exactly {} bytes from process", n); + + Ok(n) } } From 5d59815837955838371907ca6111d521630d2b0f Mon Sep 17 00:00:00 2001 From: Fabian Weik Date: Fri, 16 Jan 2026 20:17:29 +0100 Subject: [PATCH 3/7] speed up dagger builds by caching and not uploading target directory --- dagger/main.go | 21 ++++++++++++++------- server/src/device/reading.rs | 2 +- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/dagger/main.go b/dagger/main.go index e2d94e4..43d4265 100644 --- a/dagger/main.go +++ b/dagger/main.go @@ -34,7 +34,10 @@ func (m *ReView) CheckAndTestAll(ctx context.Context, source *dagger.Directory) return "ok", nil } -func (m *ReView) BuildClient(source *dagger.Directory) *dagger.File { +func (m *ReView) BuildClient( + // +ignore=["target"] + source *dagger.Directory, +) *dagger.File { return linuxContainer(source). WithExec([]string{ "cargo", "build", "--release", @@ -59,17 +62,19 @@ func linuxContainer(source *dagger.Directory) *dagger.Container { WithWorkdir("/source"). // Cache - WithMountedCache("/cache/cargo", dag.CacheVolume("rust-packages")). - WithEnvVariable("CARGO_HOME", "/cache/cargo"). + WithMountedCache("/root/.cargo/registry", dag.CacheVolume("rust-packages")). WithMountedCache("target", dag.CacheVolume("rust-target")) } -func (m *ReView) BuildServer(source *dagger.Directory) *dagger.File { +func (m *ReView) BuildServer( + // +ignore=["target"] + source *dagger.Directory, +) *dagger.File { return toltecContainer(source). WithExec([]string{ "bash", "-c", "source /opt/x-tools/switch-arm.sh; " + - "cargo build --release --bin review-server --target " + RemarkableTarget, + "cargo build --release --bin review-server", }). WithExec( []string{"cp", "target/" + RemarkableTarget + "/release/review-server", "review-server"}, @@ -83,7 +88,9 @@ func toltecContainer(source *dagger.Directory) *dagger.Container { // Sources WithDirectory("/source", source). - WithWorkdir("/source") + WithWorkdir("/source"). - // Sadly caching breaks compile :( + // Cache + WithMountedCache("/root/.cargo/registry", dag.CacheVolume("rust-packages-toltec")). + WithMountedCache("target", dag.CacheVolume("rust-target-toltec")) } diff --git a/server/src/device/reading.rs b/server/src/device/reading.rs index 83695ea..8b30086 100644 --- a/server/src/device/reading.rs +++ b/server/src/device/reading.rs @@ -3,7 +3,7 @@ use std::{ io::{Read, Seek, SeekFrom}, }; -use anyhow::{Context, Error, anyhow}; +use anyhow::{Context, Error}; use tracing::trace; use super::process::get_xochitl_memory_file; From 2d18cd63860a4e6777058caf9a9e50e2fde8d55e Mon Sep 17 00:00:00 2001 From: Fabian Weik Date: Sat, 17 Jan 2026 21:05:35 +0100 Subject: [PATCH 4/7] send frames out individually in parts --- client/src/display/mod.rs | 47 ++++++++++++------------ server/src/device/connection.rs | 63 +++++++++++++++++++-------------- server/src/main.rs | 1 + 3 files changed, 63 insertions(+), 48 deletions(-) diff --git a/client/src/display/mod.rs b/client/src/display/mod.rs index 2cdc64e..f14b41e 100644 --- a/client/src/display/mod.rs +++ b/client/src/display/mod.rs @@ -10,7 +10,7 @@ use lz4_flex::frame::FrameDecoder; use review_server::config::{CommunicatedConfig, PixelFormat}; use tokio::net::TcpStream; use tokio_util::codec::{Framed, LengthDelimitedCodec}; -use tracing::{debug, info}; +use tracing::{debug, info, trace}; use gstreamer::{Pipeline, prelude::*}; @@ -24,19 +24,21 @@ pub async fn gstreamer_thread(opts: ClientOptions) -> Result<(), Error> { .await .context("could not connect to TCP stream")?; - let (stream, communicated_config) = get_communicated_config(stream) + let mut framed_stream = Framed::new(stream, LengthDelimitedCodec::new()); + let communicated_config = get_communicated_config(&mut framed_stream) .await .context("could not get communicated config from TCP stream")?; debug!("received communicated config: {:?}", &communicated_config); - let stream = stream + /* + let mut stream = stream .into_std() .context("could not convert stream into std")?; - let mut decoded_video_data = FrameDecoder::new(stream); + // let mut decoded_video_data = FrameDecoder::new(stream); - sleep(Duration::from_secs(1)); + */ let (pipeline, appsrc) = build_pipeline(&communicated_config).context("could not build gstreamer pipeline")?; @@ -44,22 +46,25 @@ pub async fn gstreamer_thread(opts: ClientOptions) -> Result<(), Error> { .set_state(gstreamer::State::Playing) .context("could not start playing gstreamer pipeline")?; - let mut buffer = vec![ - 0u8; - (communicated_config.video_config.height - * communicated_config.video_config.width - * communicated_config.video_config.bytes_per_pixel) as usize - ]; + let n = 4; + loop { - let n = decoded_video_data - .read(&mut buffer) - .context("could not read from TCP stream")?; + let mut full_frame = vec![]; + for _ in 0..n { + debug!("attempting to read data from TCP stream"); + + let frame = framed_stream + .next() + .await + .context("TCP stream was closed")? + .context("could not read from TCP stream")?; - let slice = buffer[..n].to_vec(); + debug!("received {} bytes from TCP stream", frame.len()); - debug!("read {} bytes", n); + full_frame.append(&mut frame.iter().copied().collect()); + } - let buffer = gstreamer::Buffer::from_slice(slice); + let buffer = gstreamer::Buffer::from_slice(full_frame); appsrc .push_buffer(buffer) .context("could not push buffer to app source")?; @@ -67,10 +72,8 @@ pub async fn gstreamer_thread(opts: ClientOptions) -> Result<(), Error> { } async fn get_communicated_config( - tcp_stream: TcpStream, -) -> Result<(TcpStream, CommunicatedConfig), Error> { - let mut framed_stream = Framed::new(tcp_stream, LengthDelimitedCodec::new()); - + framed_stream: &mut Framed, +) -> Result { let config_bytes = framed_stream .next() .await @@ -80,7 +83,7 @@ async fn get_communicated_config( let config = bson::deserialize_from_slice(&config_bytes) .context("could not deserialize config from bytes")?; - Ok((framed_stream.into_inner(), config)) + Ok(config) } fn to_video_format(pixel_format: &PixelFormat) -> VideoFormat { diff --git a/server/src/device/connection.rs b/server/src/device/connection.rs index 7648e14..f92f364 100644 --- a/server/src/device/connection.rs +++ b/server/src/device/connection.rs @@ -5,9 +5,10 @@ use anyhow::{Context, Error}; use futures::sink::SinkExt; use lz4_flex::frame::FrameEncoder; use tokio::net::{TcpListener, TcpStream}; -use tokio::time::sleep; +use tokio::time::{Interval, MissedTickBehavior, interval, sleep}; +use tokio_util::bytes::Bytes; use tokio_util::codec::{Framed, LengthDelimitedCodec}; -use tracing::{debug, info}; +use tracing::{debug, info, trace}; use crate::device::reading::FrameReader; use crate::{ @@ -37,12 +38,14 @@ pub async fn listen_for_clients( let (stream, addr) = listener.accept().await?; info!("new connection from {}", addr); - tokio::spawn(open_connection( + open_connection( stream, opts.clone(), video_config.clone(), communicated_config.clone(), - )); + ) + .await + .context("error while handling TCP connections")?; } } @@ -54,16 +57,23 @@ async fn open_connection( ) -> Result<(), Error> { let mut framed = Framed::new(stream, LengthDelimitedCodec::new()); - let bytes = bson::serialize_to_vec(&communicated_config) + let bytes: Bytes = bson::serialize_to_vec(&communicated_config) .context("could not serialize communicated config")? .into(); + framed + .send(bytes.iter().copied().collect()) + .await + .context("could not send out config")?; + + debug!("sending second frame"); framed .send(bytes) .await .context("could not send out config")?; - let stream = framed + /* + let mut stream = framed .into_inner() .into_std() .context("could not turn stream into std")?; @@ -71,41 +81,42 @@ async fn open_connection( stream .set_write_timeout(Some(Duration::from_secs(1))) .context("could not set write timeout")?; + */ - let mut encoded_video_data = FrameEncoder::new(stream); + //let mut video_data_encoder = FrameEncoder::new(stream); let mut frame_reader = FrameReader::new(video_config).context("could not create frame reader")?; debug!("created frame reader, starting loop to send data"); - io::copy(&mut frame_reader, &mut encoded_video_data) - .context("error while copying frame buffer data to stream")?; + /* + io::copy(&mut frame_reader, &mut stream) + .context("error while copying from frame reader to connection")?; Ok(()) + */ - // TODO: this problably can be a simple io::copy - /* - let mut buffer = vec![0u8; frame_reader.frame_length()]; + let n = 4; + + let mut interval = interval(Duration::from_secs_f64(1. / 50.)); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + + let mut buffer = vec![0u8; frame_reader.frame_length() / n]; loop { + interval.tick().await; + frame_reader - .read_one_frame(&mut buffer) + .read_exact(&mut buffer) .context("error reading frame from file")?; - debug!("read exactly {} bytes from frame reader", buffer.len()); + debug!("read {} bytes from frame reader", buffer.len()); + trace!("writing to output stream"); - encoded_video_data - .write_all(&buffer) + framed + .send(buffer.iter().cloned().collect()) + .await .context("could not write frame to encoder")?; - debug!("wrote the data to the encoder"); - - encoded_video_data - .flush() - .context("failed to flush encoder")?; - - debug!("flushed the encoded video data"); - - sleep(Duration::from_secs_f64(1. / (opts.framerate as f64))).await; + debug!("wrote the data to the output stream"); } - */ } diff --git a/server/src/main.rs b/server/src/main.rs index 1d581e4..ffc2e5d 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -35,6 +35,7 @@ fn initialize_logging() -> Result<(), Error> { let log_file = OpenOptions::new() .write(true) .create(true) + .truncate(true) .open("./review.log") .context("could not open log file")?; From 22b6a4f1e1a09b505e2134a1e239cb30752509b1 Mon Sep 17 00:00:00 2001 From: Fabian Weik Date: Sat, 17 Jan 2026 21:30:30 +0100 Subject: [PATCH 5/7] compress each frame individually --- client/src/display/mod.rs | 47 ++++++++------------- client/src/start/mod.rs | 2 +- server/src/device/connection.rs | 73 +++++++++++---------------------- server/src/device/reading.rs | 72 ++++++++------------------------ 4 files changed, 62 insertions(+), 132 deletions(-) diff --git a/client/src/display/mod.rs b/client/src/display/mod.rs index f14b41e..5fdbd5a 100644 --- a/client/src/display/mod.rs +++ b/client/src/display/mod.rs @@ -6,11 +6,11 @@ use anyhow::{Context, Error}; use futures::stream::StreamExt; use gstreamer_app::AppSrc; use gstreamer_video::VideoFormat; -use lz4_flex::frame::FrameDecoder; +use lz4_flex::decompress_size_prepended; use review_server::config::{CommunicatedConfig, PixelFormat}; use tokio::net::TcpStream; use tokio_util::codec::{Framed, LengthDelimitedCodec}; -use tracing::{debug, info, trace}; +use tracing::{debug, info}; use gstreamer::{Pipeline, prelude::*}; @@ -31,40 +31,32 @@ pub async fn gstreamer_thread(opts: ClientOptions) -> Result<(), Error> { debug!("received communicated config: {:?}", &communicated_config); - /* - let mut stream = stream - .into_std() - .context("could not convert stream into std")?; - - // let mut decoded_video_data = FrameDecoder::new(stream); - - */ - let (pipeline, appsrc) = build_pipeline(&communicated_config).context("could not build gstreamer pipeline")?; pipeline .set_state(gstreamer::State::Playing) .context("could not start playing gstreamer pipeline")?; - let n = 4; - loop { - let mut full_frame = vec![]; - for _ in 0..n { - debug!("attempting to read data from TCP stream"); + debug!("attempting to read data from TCP stream"); + + let compressed_frame = framed_stream + .next() + .await + .context("TCP stream was closed")? + .context("could not read from TCP stream")?; - let frame = framed_stream - .next() - .await - .context("TCP stream was closed")? - .context("could not read from TCP stream")?; + debug!( + "read one compressed frame from TCP stream ({} bytes)", + compressed_frame.len(), + ); - debug!("received {} bytes from TCP stream", frame.len()); + let frame = decompress_size_prepended(&compressed_frame) + .context("could not decompress received frame")?; - full_frame.append(&mut frame.iter().copied().collect()); - } + debug!("decompressed: {} bytes", frame.len()); - let buffer = gstreamer::Buffer::from_slice(full_frame); + let buffer = gstreamer::Buffer::from_mut_slice(frame); appsrc .push_buffer(buffer) .context("could not push buffer to app source")?; @@ -80,10 +72,7 @@ async fn get_communicated_config( .context("received None as config bytes")? .context("could not receive config bytes")?; - let config = bson::deserialize_from_slice(&config_bytes) - .context("could not deserialize config from bytes")?; - - Ok(config) + bson::deserialize_from_slice(&config_bytes).context("could not deserialize config from bytes") } fn to_video_format(pixel_format: &PixelFormat) -> VideoFormat { diff --git a/client/src/start/mod.rs b/client/src/start/mod.rs index 24c7dee..0bccd39 100644 --- a/client/src/start/mod.rs +++ b/client/src/start/mod.rs @@ -36,7 +36,7 @@ pub async fn start_server( let server_options = ServerOptions { port: 6680, show_cursor: false, - framerate: 10, + framerate: 120, }; let restream_command = Box::leak(Box::new(format!( diff --git a/server/src/device/connection.rs b/server/src/device/connection.rs index f92f364..d20d9fd 100644 --- a/server/src/device/connection.rs +++ b/server/src/device/connection.rs @@ -1,11 +1,10 @@ -use std::io::{self, Read, Write}; use std::time::Duration; use anyhow::{Context, Error}; use futures::sink::SinkExt; -use lz4_flex::frame::FrameEncoder; +use lz4_flex::compress_prepend_size; use tokio::net::{TcpListener, TcpStream}; -use tokio::time::{Interval, MissedTickBehavior, interval, sleep}; +use tokio::time::{MissedTickBehavior, interval}; use tokio_util::bytes::Bytes; use tokio_util::codec::{Framed, LengthDelimitedCodec}; use tracing::{debug, info, trace}; @@ -34,19 +33,19 @@ pub async fn listen_for_clients( .await .context(format!("could not bind to port {}", opts.port))?; - loop { - let (stream, addr) = listener.accept().await?; - info!("new connection from {}", addr); - - open_connection( - stream, - opts.clone(), - video_config.clone(), - communicated_config.clone(), - ) - .await - .context("error while handling TCP connections")?; - } + let (stream, addr) = listener.accept().await?; + info!("new connection from {}", addr); + + open_connection( + stream, + opts.clone(), + video_config.clone(), + communicated_config.clone(), + ) + .await + .context("error while handling TCP connections")?; + + Ok(()) } async fn open_connection( @@ -66,54 +65,32 @@ async fn open_connection( .await .context("could not send out config")?; - debug!("sending second frame"); - framed - .send(bytes) - .await - .context("could not send out config")?; - - /* - let mut stream = framed - .into_inner() - .into_std() - .context("could not turn stream into std")?; - - stream - .set_write_timeout(Some(Duration::from_secs(1))) - .context("could not set write timeout")?; - */ - - //let mut video_data_encoder = FrameEncoder::new(stream); let mut frame_reader = FrameReader::new(video_config).context("could not create frame reader")?; debug!("created frame reader, starting loop to send data"); - /* - io::copy(&mut frame_reader, &mut stream) - .context("error while copying from frame reader to connection")?; - - Ok(()) - */ - - let n = 4; - - let mut interval = interval(Duration::from_secs_f64(1. / 50.)); + let mut interval = interval(Duration::from_secs_f64(1. / (opts.framerate as f64))); interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - let mut buffer = vec![0u8; frame_reader.frame_length() / n]; + let mut buffer = vec![0u8; frame_reader.frame_length()]; loop { interval.tick().await; frame_reader - .read_exact(&mut buffer) + .read_frame(&mut buffer) .context("error reading frame from file")?; debug!("read {} bytes from frame reader", buffer.len()); - trace!("writing to output stream"); + + let encoded_buffer = compress_prepend_size(&buffer); + trace!( + "writing encoded bytes to stream (length {})", + encoded_buffer.len(), + ); framed - .send(buffer.iter().cloned().collect()) + .send(encoded_buffer.into()) .await .context("could not write frame to encoder")?; diff --git a/server/src/device/reading.rs b/server/src/device/reading.rs index 8b30086..3ced49c 100644 --- a/server/src/device/reading.rs +++ b/server/src/device/reading.rs @@ -3,7 +3,7 @@ use std::{ io::{Read, Seek, SeekFrom}, }; -use anyhow::{Context, Error}; +use anyhow::{Context, Error, anyhow}; use tracing::trace; use super::process::get_xochitl_memory_file; @@ -13,7 +13,6 @@ use crate::config::{self, VideoConfig}; pub struct FrameReader { file: File, offset: usize, - current: usize, width: usize, height: usize, bytes_per_pixel: usize, @@ -36,71 +35,36 @@ impl FrameReader { ); let offset = offset + video_config.internal.skip; - let mut frame_reader = Self { + Ok(Self { file, offset, - current: 0, width: video_config.shared.width, height: video_config.shared.height, bytes_per_pixel: video_config.shared.bytes_per_pixel, - }; - - frame_reader - .point_file_to_beginning_of_frame() - .context("could not point file to beginning of frame")?; - - Ok(frame_reader) + }) } + #[inline] pub fn frame_length(&self) -> usize { self.width * self.height * self.bytes_per_pixel } - fn point_file_to_beginning_of_frame(&mut self) -> std::io::Result<()> { - self.file.seek(SeekFrom::Start(self.offset as u64))?; - self.current = 0; - - Ok(()) - } -} - -impl Read for FrameReader { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - let n = buf.len(); - - trace!("requested to read {} bytes from process", n); - - let n = if self.current + n < self.frame_length() { - trace!( - "reading next {} bytes from process (current: {}, end: {})", - n, - self.current, + pub fn read_frame(&mut self, buf: &mut [u8]) -> Result<(), Error> { + if buf.len() != self.frame_length() { + return Err(anyhow!( + "called read_frame with buffer of length {}, expected length {}", + buf.len(), self.frame_length(), - ); + )); + } - self.file.read_exact(buf)?; - self.current += n; - - n - } else { - let n = self.frame_length() - self.current; - - trace!( - "reading last {} bytes from process (current: {}, end: {})", - n, - self.current, - self.frame_length(), - ); - - self.file.read_exact(&mut buf[..n])?; - - self.point_file_to_beginning_of_frame()?; - - n - }; - - trace!("read exactly {} bytes from process", n); + self.file + .seek(SeekFrom::Start(self.offset as u64)) + .context("could not point file to beginning of frame")?; + self.file + .read_exact(buf) + .context("could not read frame from file")?; - Ok(n) + Ok(()) } } From 6a01a0e89d6a0eda6e9b51904957185903e7a634 Mon Sep 17 00:00:00 2001 From: Fabian Weik Date: Sat, 17 Jan 2026 21:58:11 +0100 Subject: [PATCH 6/7] add more cli options --- client/src/config.rs | 46 +++++++++++++++++++++++++++++++++++++--- client/src/start/mod.rs | 7 +----- server/src/config/mod.rs | 5 +++-- 3 files changed, 47 insertions(+), 11 deletions(-) diff --git a/client/src/config.rs b/client/src/config.rs index fa4e0aa..f4afd76 100644 --- a/client/src/config.rs +++ b/client/src/config.rs @@ -5,6 +5,7 @@ use std::{ use anyhow::{Context, Error}; use clap::Parser; +use review_server::config::ServerOptions; const DEFAULT_IP: &str = "10.11.99.1"; const DEFAULT_SSH_PORT: u16 = 22; @@ -29,9 +30,17 @@ pub struct CliOptions { #[arg(long, name = "tcp-port")] tcp_port: Option, + /// Framerate (default: 120) + #[arg(long, name = "framerate")] + framerate: Option, + /// Dark mode - invert colors (default: false) #[arg(long, name = "dark-mode")] dark_mode: bool, + + /// Show cursor (default: false) + #[arg(long, name = "show-cursor")] + show_cursor: bool, } #[derive(Debug, Clone)] @@ -39,8 +48,10 @@ pub struct ClientOptions { pub remarkable_ip: String, pub ssh_port: u16, pub ssh_key: PathBuf, - pub tcp_port: u16, pub dark_mode: bool, + pub tcp_port: u16, + pub show_cursor: bool, + pub framerate: f32, } impl From for ClientOptions { @@ -54,17 +65,46 @@ impl From for ClientOptions { ssh_port: resolve_with( value.ssh_port, "REMARKABLE_SSH_PORT", - |string| string.parse().context("could not parse"), + |string| { + string + .parse() + .context("could not parse SSH port from environment") + }, DEFAULT_SSH_PORT, ), ssh_key: must_resolve_option(value.ssh_key, "REMARKABLE_SSH_KEY_PATH"), tcp_port: resolve_with( value.tcp_port, "REMARKABLE_TCP_PORT", - |string| string.parse().context("could not parse"), + |string| { + string + .parse() + .context("could not parse TCP port from environment") + }, DEFAULT_TCP_PORT, ), + framerate: resolve_with( + value.framerate, + "REMARKABLE_FRAMERATE", + |string| { + string + .parse() + .context("could not parse framerate from environment") + }, + 120., + ), dark_mode: resolve_boolean_option(value.dark_mode, "REMARKABLE_DARK_MODE", false), + show_cursor: resolve_boolean_option(value.show_cursor, "REMARKABLE_SHOW_CURSOR", false), + } + } +} + +impl Into for ClientOptions { + fn into(self) -> ServerOptions { + ServerOptions { + port: self.tcp_port, + show_cursor: self.show_cursor, + framerate: self.framerate, } } } diff --git a/client/src/start/mod.rs b/client/src/start/mod.rs index 0bccd39..d7d8944 100644 --- a/client/src/start/mod.rs +++ b/client/src/start/mod.rs @@ -33,12 +33,7 @@ pub async fn start_server( > { let (stdout_tx, stdout_rx) = mpsc::channel(10); - let server_options = ServerOptions { - port: 6680, - show_cursor: false, - framerate: 120, - }; - + let server_options: ServerOptions = opts.into(); let restream_command = Box::leak(Box::new(format!( "RUST_LOG=trace ./review-server '{}'", serde_json::to_string(&server_options) diff --git a/server/src/config/mod.rs b/server/src/config/mod.rs index 5b36335..8483b10 100644 --- a/server/src/config/mod.rs +++ b/server/src/config/mod.rs @@ -1,10 +1,11 @@ use std::path::PathBuf; -use crate::version::{ConfigVersion, FirmwareVersion, HardwareVersion, get_config_version}; use anyhow::{Context, Error, anyhow}; use clap::Parser; use serde::{Deserialize, Serialize}; +use crate::version::{ConfigVersion, FirmwareVersion, HardwareVersion, get_config_version}; + #[derive(Parser, Debug)] #[command(author, version)] pub struct CliOptions { @@ -16,7 +17,7 @@ pub struct CliOptions { pub struct ServerOptions { pub port: u16, pub show_cursor: bool, - pub framerate: u8, + pub framerate: f32, } impl TryFrom for ServerOptions { From d2663fba8f0fa2d515e58eaa730fbd4fe85d66ad Mon Sep 17 00:00:00 2001 From: Fabian Weik Date: Sat, 17 Jan 2026 21:59:50 +0100 Subject: [PATCH 7/7] add options to readme --- README.org | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.org b/README.org index d1feb61..a44af79 100644 --- a/README.org +++ b/README.org @@ -41,7 +41,9 @@ Options: --ssh-port SSH Port used by the reMarkable tablet (default: 22) --ssh-key Private SSH key file path --tcp-port TCP port for video stream (default: 6680) + --framerate Framerate (default: 120) --dark-mode Dark mode - invert colors (default: false) + --show-cursor Show cursor (default: false) -h, --help Print help -V, --version Print version #+end_src @@ -53,7 +55,9 @@ Corresponding keys: - =REMARKABLE_SSH_PORT= - =REMARKABLE_SSH_KEY_PATH= - =REMARKABLE_TCP_PORT= +- =REMARKABLE_FRAMERATE= - =REMARKABLE_DARK_MODE= +- =REMARKABLE_SHOW_CURSOR= * Building