diff --git a/README.org b/README.org index d1feb61..a44af79 100644 --- a/README.org +++ b/README.org @@ -41,7 +41,9 @@ Options: --ssh-port SSH Port used by the reMarkable tablet (default: 22) --ssh-key Private SSH key file path --tcp-port TCP port for video stream (default: 6680) + --framerate Framerate (default: 120) --dark-mode Dark mode - invert colors (default: false) + --show-cursor Show cursor (default: false) -h, --help Print help -V, --version Print version #+end_src @@ -53,7 +55,9 @@ Corresponding keys: - =REMARKABLE_SSH_PORT= - =REMARKABLE_SSH_KEY_PATH= - =REMARKABLE_TCP_PORT= +- =REMARKABLE_FRAMERATE= - =REMARKABLE_DARK_MODE= +- =REMARKABLE_SHOW_CURSOR= * Building diff --git a/client/src/config.rs b/client/src/config.rs index caca6d3..f4afd76 100644 --- a/client/src/config.rs +++ b/client/src/config.rs @@ -5,19 +5,7 @@ use std::{ use anyhow::{Context, Error}; use clap::Parser; -use gstreamer_video::VideoFormat; - -// TODO: get this info from reMarkable tablet directly -// first parse firmware version (on server) -// second set all settings accordingly -// third send out height, width, pixel format, and bytes per pixel to client -pub const HEIGHT: u32 = 1872; -pub const WIDTH: u32 = 1404; -pub const PIXEL_FORMAT: &str = "bgra"; -pub const VIDEO_FORMAT: VideoFormat = VideoFormat::Bgra; -pub const BYTES_PER_PIXEL: u32 = 4; -pub const FILE: &str = ":mem:"; -pub const SKIP_OFFSET: usize = 2629636; +use review_server::config::ServerOptions; const DEFAULT_IP: &str = "10.11.99.1"; const DEFAULT_SSH_PORT: u16 = 22; @@ -42,9 +30,17 @@ pub struct CliOptions { #[arg(long, name = "tcp-port")] tcp_port: Option, + /// Framerate (default: 120) + #[arg(long, name = "framerate")] + framerate: Option, + /// Dark mode - invert colors (default: false) #[arg(long, name = "dark-mode")] dark_mode: bool, + + /// Show cursor (default: false) + #[arg(long, name = "show-cursor")] + show_cursor: bool, } #[derive(Debug, Clone)] @@ -52,8 +48,10 @@ pub struct ClientOptions { pub remarkable_ip: String, pub ssh_port: u16, pub ssh_key: PathBuf, - pub tcp_port: u16, pub dark_mode: bool, + pub tcp_port: u16, + pub show_cursor: bool, + pub framerate: f32, } impl From for ClientOptions { @@ -67,17 +65,46 @@ impl From for ClientOptions { ssh_port: resolve_with( value.ssh_port, "REMARKABLE_SSH_PORT", - |string| string.parse().context("could not parse"), + |string| { + string + .parse() + .context("could not parse SSH port from environment") + }, DEFAULT_SSH_PORT, ), ssh_key: must_resolve_option(value.ssh_key, "REMARKABLE_SSH_KEY_PATH"), tcp_port: resolve_with( value.tcp_port, "REMARKABLE_TCP_PORT", - |string| string.parse().context("could not parse"), + |string| { + string + .parse() + .context("could not parse TCP port from environment") + }, DEFAULT_TCP_PORT, ), + framerate: resolve_with( + value.framerate, + "REMARKABLE_FRAMERATE", + |string| { + string + .parse() + .context("could not parse framerate from environment") + }, + 120., + ), dark_mode: resolve_boolean_option(value.dark_mode, "REMARKABLE_DARK_MODE", false), + show_cursor: resolve_boolean_option(value.show_cursor, "REMARKABLE_SHOW_CURSOR", false), + } + } +} + +impl Into for ClientOptions { + fn into(self) -> ServerOptions { + ServerOptions { + port: self.tcp_port, + show_cursor: self.show_cursor, + framerate: self.framerate, } } } diff --git a/client/src/display/mod.rs b/client/src/display/mod.rs index 4f2f06e..5fdbd5a 100644 --- a/client/src/display/mod.rs +++ b/client/src/display/mod.rs @@ -5,8 +5,9 @@ use std::{io::Read as _, thread::sleep, time::Duration}; use anyhow::{Context, Error}; use futures::stream::StreamExt; use gstreamer_app::AppSrc; -use lz4_flex::frame::FrameDecoder; -use review_server::config::CommunicatedConfig; +use gstreamer_video::VideoFormat; +use lz4_flex::decompress_size_prepended; +use review_server::config::{CommunicatedConfig, PixelFormat}; use tokio::net::TcpStream; use tokio_util::codec::{Framed, LengthDelimitedCodec}; use tracing::{debug, info}; @@ -23,63 +24,74 @@ pub async fn gstreamer_thread(opts: ClientOptions) -> Result<(), Error> { .await .context("could not connect to TCP stream")?; - let (stream, communicated_config) = get_communicated_config(stream) + let mut framed_stream = Framed::new(stream, LengthDelimitedCodec::new()); + let communicated_config = get_communicated_config(&mut framed_stream) .await .context("could not get communicated config from TCP stream")?; debug!("received communicated config: {:?}", &communicated_config); - let stream = stream - .into_std() - .context("could not convert stream into std")?; - - let mut decoded_video_data = FrameDecoder::new(stream); - - let (pipeline, appsrc) = build_pipeline().context("could not build gstreamer pipeline")?; + let (pipeline, appsrc) = + build_pipeline(&communicated_config).context("could not build gstreamer pipeline")?; pipeline .set_state(gstreamer::State::Playing) .context("could not start playing gstreamer pipeline")?; - let mut buffer = vec![0u8; (BYTES_PER_PIXEL * HEIGHT * WIDTH) as usize]; loop { - let n = decoded_video_data - .read(&mut buffer) + debug!("attempting to read data from TCP stream"); + + let compressed_frame = framed_stream + .next() + .await + .context("TCP stream was closed")? .context("could not read from TCP stream")?; - let slice = buffer[..n].to_vec(); + debug!( + "read one compressed frame from TCP stream ({} bytes)", + compressed_frame.len(), + ); + + let frame = decompress_size_prepended(&compressed_frame) + .context("could not decompress received frame")?; - debug!("read {} bytes:\n\n{:?}", n, &slice); + debug!("decompressed: {} bytes", frame.len()); - let buffer = gstreamer::Buffer::from_slice(slice); + let buffer = gstreamer::Buffer::from_mut_slice(frame); appsrc .push_buffer(buffer) .context("could not push buffer to app source")?; - - sleep(Duration::from_secs_f64(1.)); } } async fn get_communicated_config( - tcp_stream: TcpStream, -) -> Result<(TcpStream, CommunicatedConfig), Error> { - let mut framed_stream = Framed::new(tcp_stream, LengthDelimitedCodec::new()); - + framed_stream: &mut Framed, +) -> Result { let config_bytes = framed_stream .next() .await .context("received None as config bytes")? .context("could not receive config bytes")?; - let config = bson::deserialize_from_slice(&config_bytes) - .context("could not deserialize config from bytes")?; + bson::deserialize_from_slice(&config_bytes).context("could not deserialize config from bytes") +} - Ok((framed_stream.into_inner(), config)) +fn to_video_format(pixel_format: &PixelFormat) -> VideoFormat { + match pixel_format { + PixelFormat::Rgb565le => todo!("not sure what the video format for RGB 565 LE is"), + PixelFormat::Gray8 => VideoFormat::Gray8, + PixelFormat::Gray16be => VideoFormat::Gray16Be, + PixelFormat::Bgra => VideoFormat::Bgra, + } } -fn build_pipeline() -> Result<(Pipeline, AppSrc), Error> { - let video_info = gstreamer_video::VideoInfo::builder(VIDEO_FORMAT, WIDTH, HEIGHT) - .build() - .context("could not build video info")?; +fn build_pipeline(communicated_config: &CommunicatedConfig) -> Result<(Pipeline, AppSrc), Error> { + let video_info = gstreamer_video::VideoInfo::builder( + to_video_format(&communicated_config.video_config.pixel_format), + communicated_config.video_config.width as u32, + communicated_config.video_config.height as u32, + ) + .build() + .context("could not build video info")?; let appsrc = gstreamer_app::AppSrc::builder() .caps( diff --git a/client/src/start/mod.rs b/client/src/start/mod.rs index 24c7dee..d7d8944 100644 --- a/client/src/start/mod.rs +++ b/client/src/start/mod.rs @@ -33,12 +33,7 @@ pub async fn start_server( > { let (stdout_tx, stdout_rx) = mpsc::channel(10); - let server_options = ServerOptions { - port: 6680, - show_cursor: false, - framerate: 10, - }; - + let server_options: ServerOptions = opts.into(); let restream_command = Box::leak(Box::new(format!( "RUST_LOG=trace ./review-server '{}'", serde_json::to_string(&server_options) diff --git a/dagger/main.go b/dagger/main.go index e2d94e4..43d4265 100644 --- a/dagger/main.go +++ b/dagger/main.go @@ -34,7 +34,10 @@ func (m *ReView) CheckAndTestAll(ctx context.Context, source *dagger.Directory) return "ok", nil } -func (m *ReView) BuildClient(source *dagger.Directory) *dagger.File { +func (m *ReView) BuildClient( + // +ignore=["target"] + source *dagger.Directory, +) *dagger.File { return linuxContainer(source). WithExec([]string{ "cargo", "build", "--release", @@ -59,17 +62,19 @@ func linuxContainer(source *dagger.Directory) *dagger.Container { WithWorkdir("/source"). // Cache - WithMountedCache("/cache/cargo", dag.CacheVolume("rust-packages")). - WithEnvVariable("CARGO_HOME", "/cache/cargo"). + WithMountedCache("/root/.cargo/registry", dag.CacheVolume("rust-packages")). WithMountedCache("target", dag.CacheVolume("rust-target")) } -func (m *ReView) BuildServer(source *dagger.Directory) *dagger.File { +func (m *ReView) BuildServer( + // +ignore=["target"] + source *dagger.Directory, +) *dagger.File { return toltecContainer(source). WithExec([]string{ "bash", "-c", "source /opt/x-tools/switch-arm.sh; " + - "cargo build --release --bin review-server --target " + RemarkableTarget, + "cargo build --release --bin review-server", }). WithExec( []string{"cp", "target/" + RemarkableTarget + "/release/review-server", "review-server"}, @@ -83,7 +88,9 @@ func toltecContainer(source *dagger.Directory) *dagger.Container { // Sources WithDirectory("/source", source). - WithWorkdir("/source") + WithWorkdir("/source"). - // Sadly caching breaks compile :( + // Cache + WithMountedCache("/root/.cargo/registry", dag.CacheVolume("rust-packages-toltec")). + WithMountedCache("target", dag.CacheVolume("rust-target-toltec")) } diff --git a/server/src/config/mod.rs b/server/src/config/mod.rs index 5b36335..8483b10 100644 --- a/server/src/config/mod.rs +++ b/server/src/config/mod.rs @@ -1,10 +1,11 @@ use std::path::PathBuf; -use crate::version::{ConfigVersion, FirmwareVersion, HardwareVersion, get_config_version}; use anyhow::{Context, Error, anyhow}; use clap::Parser; use serde::{Deserialize, Serialize}; +use crate::version::{ConfigVersion, FirmwareVersion, HardwareVersion, get_config_version}; + #[derive(Parser, Debug)] #[command(author, version)] pub struct CliOptions { @@ -16,7 +17,7 @@ pub struct CliOptions { pub struct ServerOptions { pub port: u16, pub show_cursor: bool, - pub framerate: u8, + pub framerate: f32, } impl TryFrom for ServerOptions { diff --git a/server/src/device/connection.rs b/server/src/device/connection.rs index a3e5c16..d20d9fd 100644 --- a/server/src/device/connection.rs +++ b/server/src/device/connection.rs @@ -1,13 +1,13 @@ -use std::io::{Read, Write}; use std::time::Duration; use anyhow::{Context, Error}; use futures::sink::SinkExt; -use lz4_flex::frame::FrameEncoder; +use lz4_flex::compress_prepend_size; use tokio::net::{TcpListener, TcpStream}; -use tokio::time::sleep; +use tokio::time::{MissedTickBehavior, interval}; +use tokio_util::bytes::Bytes; use tokio_util::codec::{Framed, LengthDelimitedCodec}; -use tracing::{debug, info}; +use tracing::{debug, info, trace}; use crate::device::reading::FrameReader; use crate::{ @@ -33,17 +33,19 @@ pub async fn listen_for_clients( .await .context(format!("could not bind to port {}", opts.port))?; - loop { - let (stream, addr) = listener.accept().await?; - info!("new connection from {}", addr); - - tokio::spawn(open_connection( - stream, - opts.clone(), - video_config.clone(), - communicated_config.clone(), - )); - } + let (stream, addr) = listener.accept().await?; + info!("new connection from {}", addr); + + open_connection( + stream, + opts.clone(), + video_config.clone(), + communicated_config.clone(), + ) + .await + .context("error while handling TCP connections")?; + + Ok(()) } async fn open_connection( @@ -54,56 +56,44 @@ async fn open_connection( ) -> Result<(), Error> { let mut framed = Framed::new(stream, LengthDelimitedCodec::new()); - let bytes = bson::serialize_to_vec(&communicated_config) + let bytes: Bytes = bson::serialize_to_vec(&communicated_config) .context("could not serialize communicated config")? .into(); framed - .send(bytes) + .send(bytes.iter().copied().collect()) .await .context("could not send out config")?; - let stream = framed - .into_inner() - .into_std() - .context("could not turn stream into std")?; - - stream - .set_write_timeout(Some(Duration::from_secs(1))) - .context("could not set write timeout")?; - - let mut encoded_video_data = FrameEncoder::new(stream); let mut frame_reader = FrameReader::new(video_config).context("could not create frame reader")?; debug!("created frame reader, starting loop to send data"); - /* - io::copy(&mut frame_reader, &mut encoded_video_data) - .context("error while copying frame buffer data to stream")?; - */ + let mut interval = interval(Duration::from_secs_f64(1. / (opts.framerate as f64))); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - // TODO: this problably can be a simple io::copy let mut buffer = vec![0u8; frame_reader.frame_length()]; loop { + interval.tick().await; + frame_reader - .read_exact(&mut buffer) + .read_frame(&mut buffer) .context("error reading frame from file")?; - debug!("read exactly {} bytes from frame reader", buffer.len()); - - encoded_video_data - .write_all(&buffer) - .context("could not write frame to encoder")?; + debug!("read {} bytes from frame reader", buffer.len()); - debug!("wrote the data to the encoder"); + let encoded_buffer = compress_prepend_size(&buffer); + trace!( + "writing encoded bytes to stream (length {})", + encoded_buffer.len(), + ); - encoded_video_data - .flush() - .context("failed to flush encoder")?; - - debug!("flushed the encoded video data"); + framed + .send(encoded_buffer.into()) + .await + .context("could not write frame to encoder")?; - sleep(Duration::from_secs_f64(1. / (opts.framerate as f64))).await; + debug!("wrote the data to the output stream"); } } diff --git a/server/src/device/process.rs b/server/src/device/process.rs index 06c9bc0..f4e7882 100644 --- a/server/src/device/process.rs +++ b/server/src/device/process.rs @@ -1,9 +1,11 @@ use std::fs::File; use anyhow::{Context, Error, anyhow}; -use procfs::process::{MMapPath, Process, all_processes}; +use itertools::Itertools; +use procfs::process::{MMapPath, MemoryMap, Process, all_processes}; +use tracing::trace; -pub fn get_memory_file() -> Result<(File, usize), Error> { +pub fn get_xochitl_memory_file() -> Result<(File, usize), Error> { let process = get_process().context("could not get xochitl process")?; let memory_file = process.mem().context("could not get xochitl memory file")?; @@ -30,16 +32,22 @@ fn get_process() -> Result { Ok(process) } +/// Get the offset of the framebuffer in the process memory +/// +/// The framebuffer is the next mapped file after /dev/fb0 fn get_framebuffer_offset_in_process_memory(process: &Process) -> Result { let framebuffer_path_name = MMapPath::from("/dev/fb0").context("could not build framebuffer path name")?; + let matches_path_name = |m: &&MemoryMap| m.pathname == framebuffer_path_name; + let maps = process.maps().context("could not get process maps")?; - let mut maps = maps.iter().filter(|m| m.pathname == framebuffer_path_name); + + let mut maps = maps.iter().skip_while(|m| !matches_path_name(m)).skip(1); let framebuffer_map = maps.next().expect("found no framebuffer map"); - if let Some(_) = maps.next() { + if let Some(_) = maps.filter(matches_path_name).next() { return Err(anyhow!("found more than one framebuffer map")); } diff --git a/server/src/device/reading.rs b/server/src/device/reading.rs index 928af1d..3ced49c 100644 --- a/server/src/device/reading.rs +++ b/server/src/device/reading.rs @@ -3,16 +3,16 @@ use std::{ io::{Read, Seek, SeekFrom}, }; -use anyhow::{Context, Error}; +use anyhow::{Context, Error, anyhow}; +use tracing::trace; -use super::process::get_memory_file; +use super::process::get_xochitl_memory_file; use crate::config::{self, VideoConfig}; #[derive(Debug)] pub struct FrameReader { file: File, offset: usize, - current: usize, width: usize, height: usize, bytes_per_pixel: usize, @@ -25,52 +25,46 @@ impl FrameReader { File::open(path).context("could not open framebuffer file")?, 0, ), - config::VideoDataSource::ProcessMemory => { - get_memory_file().context("could not get file and offset for xochitl process")? - } + config::VideoDataSource::ProcessMemory => get_xochitl_memory_file() + .context("could not get file and offset for xochitl process")?, }; - let mut frame_reader = Self { + trace!( + "file offset: {}, extra skip: {}", + offset, video_config.internal.skip + ); + let offset = offset + video_config.internal.skip; + + Ok(Self { file, offset, - current: 0, width: video_config.shared.width, height: video_config.shared.height, bytes_per_pixel: video_config.shared.bytes_per_pixel, - }; - frame_reader - .point_file_to_framebuffer_memory_start() - .context("could not initialize file to offset")?; - Ok(frame_reader) + }) } + #[inline] pub fn frame_length(&self) -> usize { self.width * self.height * self.bytes_per_pixel } - // TODO: anyhow error handling instead of io Errors - fn point_file_to_framebuffer_memory_start(&mut self) -> std::io::Result<()> { - self.file.seek(SeekFrom::Start(self.offset as u64))?; - self.current = 0; - - Ok(()) - } -} + pub fn read_frame(&mut self, buf: &mut [u8]) -> Result<(), Error> { + if buf.len() != self.frame_length() { + return Err(anyhow!( + "called read_frame with buffer of length {}, expected length {}", + buf.len(), + self.frame_length(), + )); + } -impl Read for FrameReader { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - let requested = buf.len(); - let bytes_read = if self.current + requested < self.frame_length() { - self.file.read(buf)? - } else { - let rest = self.frame_length() - self.current; - self.file.read(&mut buf[0..rest])? - }; + self.file + .seek(SeekFrom::Start(self.offset as u64)) + .context("could not point file to beginning of frame")?; + self.file + .read_exact(buf) + .context("could not read frame from file")?; - self.current += bytes_read; - if self.current == self.frame_length() { - self.point_file_to_framebuffer_memory_start()?; - } - Ok(bytes_read) + Ok(()) } } diff --git a/server/src/main.rs b/server/src/main.rs index 1d581e4..ffc2e5d 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -35,6 +35,7 @@ fn initialize_logging() -> Result<(), Error> { let log_file = OpenOptions::new() .write(true) .create(true) + .truncate(true) .open("./review.log") .context("could not open log file")?;