diff --git a/README.org b/README.org index a44af79..5099a80 100644 --- a/README.org +++ b/README.org @@ -24,6 +24,24 @@ sudo zypper install gstreamer gstreamer-plugins-base gstreamer-plugins-good * Usage +** Server + +The must be running. + +To install it, first build (see later in this document) and then copy it to your reMarkable. + +#+begin_src shell +scp review-server.arm.static root@${REMARKABLE_IP}:review-server +#+end_src + +And then start it manually. + +#+begin_src shell +ssh root@${REMARKABLE_IP} 'RUST_LOG=trace ./review-server --port 6680' +#+end_src + +TODO: Install it as service + ** Client Options for the client can be set as command line options or environment variables. @@ -38,10 +56,8 @@ Usage: review-client [OPTIONS] Options: --remarkable-ip IP of the reMarkable tablet (default: 10.11.99.1) - --ssh-port SSH Port used by the reMarkable tablet (default: 22) - --ssh-key Private SSH key file path --tcp-port TCP port for video stream (default: 6680) - --framerate Framerate (default: 120) + --framerate Framerate (default: 50) --dark-mode Dark mode - invert colors (default: false) --show-cursor Show cursor (default: false) -h, --help Print help @@ -52,9 +68,8 @@ Options: Corresponding keys: - =REMARKABLE_IP= -- =REMARKABLE_SSH_PORT= -- =REMARKABLE_SSH_KEY_PATH= - =REMARKABLE_TCP_PORT= +# - =REMARKABLE_SSH_KEY_PATH= - =REMARKABLE_FRAMERATE= - =REMARKABLE_DARK_MODE= - =REMARKABLE_SHOW_CURSOR= diff --git a/client/src/config.rs b/client/src/config.rs index f4afd76..9365a5f 100644 --- a/client/src/config.rs +++ b/client/src/config.rs @@ -1,16 +1,14 @@ -use std::{ - env::{self, VarError}, - path::PathBuf, -}; +use std::env::{self, VarError}; use anyhow::{Context, Error}; use clap::Parser; -use review_server::config::ServerOptions; const DEFAULT_IP: &str = "10.11.99.1"; const DEFAULT_SSH_PORT: u16 = 22; const DEFAULT_TCP_PORT: u16 = 6680; +const DEFAULT_FRAMERATE: f32 = 50.; + #[derive(Parser, Debug)] #[command(author, version)] pub struct CliOptions { @@ -18,19 +16,16 @@ pub struct CliOptions { #[arg(long, name = "remarkable-ip")] remarkable_ip: Option, - /// SSH Port used by the reMarkable tablet (default: 22) - #[arg(long, name = "ssh-port")] - ssh_port: Option, - - /// Private SSH key file path - #[arg(long, name = "ssh-key")] - ssh_key: Option, - /// TCP port for video stream (default: 6680) #[arg(long, name = "tcp-port")] tcp_port: Option, - /// Framerate (default: 120) + /* + /// Private SSH key file path + #[arg(long, name = "ssh-key")] + ssh_key: Option, + */ + /// Framerate (default: 50) #[arg(long, name = "framerate")] framerate: Option, @@ -46,8 +41,7 @@ pub struct CliOptions { #[derive(Debug, Clone)] pub struct ClientOptions { pub remarkable_ip: String, - pub ssh_port: u16, - pub ssh_key: PathBuf, + //pub ssh_key: PathBuf, pub dark_mode: bool, pub tcp_port: u16, pub show_cursor: bool, @@ -62,17 +56,7 @@ impl From for ClientOptions { "REMARKABLE_IP", DEFAULT_IP.to_string(), ), - ssh_port: resolve_with( - value.ssh_port, - "REMARKABLE_SSH_PORT", - |string| { - string - .parse() - .context("could not parse SSH port from environment") - }, - DEFAULT_SSH_PORT, - ), - ssh_key: must_resolve_option(value.ssh_key, "REMARKABLE_SSH_KEY_PATH"), + //ssh_key: must_resolve_option(value.ssh_key, "REMARKABLE_SSH_KEY_PATH"), tcp_port: resolve_with( value.tcp_port, "REMARKABLE_TCP_PORT", @@ -91,7 +75,7 @@ impl From for ClientOptions { .parse() .context("could not parse framerate from environment") }, - 120., + DEFAULT_FRAMERATE, ), dark_mode: resolve_boolean_option(value.dark_mode, "REMARKABLE_DARK_MODE", false), show_cursor: resolve_boolean_option(value.show_cursor, "REMARKABLE_SHOW_CURSOR", false), @@ -99,16 +83,6 @@ impl From for ClientOptions { } } -impl Into for ClientOptions { - fn into(self) -> ServerOptions { - ServerOptions { - port: self.tcp_port, - show_cursor: self.show_cursor, - framerate: self.framerate, - } - } -} - fn resolve_option>(cli_value: Option, variable_name: &str, default: T) -> T { resolve_with( cli_value, diff --git a/client/src/connection/mod.rs b/client/src/connection/mod.rs new file mode 100644 index 0000000..a30afb2 --- /dev/null +++ b/client/src/connection/mod.rs @@ -0,0 +1,59 @@ +pub mod video; + +use anyhow::{Context, Error}; +use futures::{SinkExt, StreamExt}; +use review_server::{ + config::{StreamConfig, device::DeviceConfig}, + version::VersionInfo, +}; +use tokio::net::TcpStream; +use tokio_util::codec::{Framed, LengthDelimitedCodec}; +use tracing::info; + +use crate::config::ClientOptions; + +#[derive(Debug)] +pub struct Connection { + framed: Framed, +} + +impl Connection { + pub async fn new(client_options: ClientOptions) -> Result { + info!("setting up TCP connection"); + let stream = TcpStream::connect(format!( + "{}:{}", + client_options.remarkable_ip, client_options.tcp_port + )) + .await + .context("could not connect to TCP stream")?; + + let framed = Framed::new(stream, LengthDelimitedCodec::new()); + + Ok(Connection { framed }) + } + + pub async fn receive_version_info(&mut self) -> Result { + let msg = self + .framed + .next() + .await + .context("connection was dropped before version info was communicated")? + .context("could not receive version info message")?; + + let version_info = + bson::deserialize_from_slice(&msg).context("could not deserialize version info")?; + + Ok(version_info) + } + + pub async fn send_stream_config(&mut self, stream_config: StreamConfig) -> Result<(), Error> { + let msg = + bson::serialize_to_vec(&stream_config).context("could not serialize stream config")?; + + self.framed + .send(msg.into()) + .await + .context("could not send stream config") + .map(|_| ()) + } +} diff --git a/client/src/connection/video.rs b/client/src/connection/video.rs new file mode 100644 index 0000000..f16cf57 --- /dev/null +++ b/client/src/connection/video.rs @@ -0,0 +1,51 @@ +use anyhow::{Context, Error}; +use futures::stream::StreamExt; +use lz4_flex::decompress_size_prepended; +use tracing::debug; + +use crate::display::Display; + +use super::Connection; +use review_server::config::device::VideoConfig; + +#[derive(Debug)] +pub struct VideoConnection { + conn: Connection, + display: Display, +} + +impl VideoConnection { + pub fn new(conn: Connection, video_config: VideoConfig) -> Result { + let display = Display::new(video_config).context("could not initialize display")?; + + Ok(Self { conn, display }) + } + + pub async fn run(&mut self) -> Result<(), Error> { + loop { + debug!("attempting to read data from TCP stream"); + + let compressed_frame = self + .conn + .framed + .next() + .await + .context("TCP stream was closed")? + .context("could not read from TCP stream")?; + + debug!( + "read one compressed frame from TCP stream ({} bytes)", + compressed_frame.len(), + ); + + let frame = decompress_size_prepended(&compressed_frame) + .context("could not decompress received frame")?; + + debug!("decompressed: {} bytes", frame.len()); + + self.display + .push_frame(frame) + .context("could not push frame to display")?; + } + } +} diff --git a/client/src/display/mod.rs b/client/src/display/mod.rs index 5fdbd5a..67e06b7 100644 --- a/client/src/display/mod.rs +++ b/client/src/display/mod.rs @@ -3,92 +3,46 @@ use super::config::*; use std::{io::Read as _, thread::sleep, time::Duration}; use anyhow::{Context, Error}; -use futures::stream::StreamExt; use gstreamer_app::AppSrc; use gstreamer_video::VideoFormat; -use lz4_flex::decompress_size_prepended; -use review_server::config::{CommunicatedConfig, PixelFormat}; -use tokio::net::TcpStream; -use tokio_util::codec::{Framed, LengthDelimitedCodec}; +use review_server::config::device::{PixelFormat, VideoConfig}; use tracing::{debug, info}; use gstreamer::{Pipeline, prelude::*}; -pub async fn gstreamer_thread(opts: ClientOptions) -> Result<(), Error> { - gstreamer::init().context("could not init gstreamer")?; - - sleep(Duration::from_millis(100)); - - info!("setting up TCP connection"); - let stream = TcpStream::connect(format!("{}:{}", opts.remarkable_ip, opts.tcp_port)) - .await - .context("could not connect to TCP stream")?; - - let mut framed_stream = Framed::new(stream, LengthDelimitedCodec::new()); - let communicated_config = get_communicated_config(&mut framed_stream) - .await - .context("could not get communicated config from TCP stream")?; - - debug!("received communicated config: {:?}", &communicated_config); - - let (pipeline, appsrc) = - build_pipeline(&communicated_config).context("could not build gstreamer pipeline")?; - pipeline - .set_state(gstreamer::State::Playing) - .context("could not start playing gstreamer pipeline")?; - - loop { - debug!("attempting to read data from TCP stream"); - - let compressed_frame = framed_stream - .next() - .await - .context("TCP stream was closed")? - .context("could not read from TCP stream")?; +#[derive(Debug)] +pub struct Display { + pipeline: Pipeline, + appsrc: AppSrc, +} - debug!( - "read one compressed frame from TCP stream ({} bytes)", - compressed_frame.len(), - ); +impl Display { + pub fn new(video_config: VideoConfig) -> Result { + gstreamer::init().context("could not init gstreamer")?; - let frame = decompress_size_prepended(&compressed_frame) - .context("could not decompress received frame")?; + let (pipeline, appsrc) = + build_pipeline(&video_config).context("could not build gstreamer pipeline")?; + pipeline + .set_state(gstreamer::State::Playing) + .context("could not start playing gstreamer pipeline")?; - debug!("decompressed: {} bytes", frame.len()); + Ok(Self { pipeline, appsrc }) + } + pub fn push_frame(&mut self, frame: Vec) -> Result<(), Error> { let buffer = gstreamer::Buffer::from_mut_slice(frame); - appsrc + self.appsrc .push_buffer(buffer) - .context("could not push buffer to app source")?; + .context("could not push buffer to app source") + .map(|_| ()) } } -async fn get_communicated_config( - framed_stream: &mut Framed, -) -> Result { - let config_bytes = framed_stream - .next() - .await - .context("received None as config bytes")? - .context("could not receive config bytes")?; - - bson::deserialize_from_slice(&config_bytes).context("could not deserialize config from bytes") -} - -fn to_video_format(pixel_format: &PixelFormat) -> VideoFormat { - match pixel_format { - PixelFormat::Rgb565le => todo!("not sure what the video format for RGB 565 LE is"), - PixelFormat::Gray8 => VideoFormat::Gray8, - PixelFormat::Gray16be => VideoFormat::Gray16Be, - PixelFormat::Bgra => VideoFormat::Bgra, - } -} - -fn build_pipeline(communicated_config: &CommunicatedConfig) -> Result<(Pipeline, AppSrc), Error> { +fn build_pipeline(video_config: &VideoConfig) -> Result<(Pipeline, AppSrc), Error> { let video_info = gstreamer_video::VideoInfo::builder( - to_video_format(&communicated_config.video_config.pixel_format), - communicated_config.video_config.width as u32, - communicated_config.video_config.height as u32, + to_video_format(&video_config.pixel_format), + video_config.width as u32, + video_config.height as u32, ) .build() .context("could not build video info")?; @@ -115,3 +69,12 @@ fn build_pipeline(communicated_config: &CommunicatedConfig) -> Result<(Pipeline, Ok((pipeline, appsrc)) } + +fn to_video_format(pixel_format: &PixelFormat) -> VideoFormat { + match pixel_format { + PixelFormat::Rgb565le => VideoFormat::Rgb16, // TODO: not sure + PixelFormat::Gray8 => VideoFormat::Gray8, + PixelFormat::Gray16be => VideoFormat::Gray16Be, + PixelFormat::Bgra => VideoFormat::Bgra, + } +} diff --git a/client/src/main.rs b/client/src/main.rs index 6db725d..3a646d2 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -1,44 +1,62 @@ mod config; +mod connection; mod display; -mod start; use anyhow::{Context, Error}; use clap::Parser; use config::{CliOptions, ClientOptions}; -use start::{connect_ssh, receive_output, start_server}; -use tracing::{debug, error}; - -use display::gstreamer_thread; +use connection::{Connection, video::VideoConnection}; +use review_server::config::{StreamConfig, device::DeviceConfig}; +use tracing::{debug, info}; #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt::init(); - let opts = CliOptions::parse(); - debug!("cli options: {:?}", opts); - let opts = ClientOptions::from(opts); - debug!("resolved options: {:?}", opts); + let cli_options = CliOptions::parse(); + debug!("cli options: {:?}", cli_options); + let client_options = ClientOptions::from(cli_options); + debug!("resolved options: {:?}", &client_options); + + info!( + "connecting to reMarkable tablet at {}:{}", + client_options.remarkable_ip, client_options.tcp_port, + ); + + let mut conn = Connection::new(client_options.clone()) + .await + .context("could not initialize TCP connection")?; - let client = connect_ssh(opts.clone()) + let version_info = conn + .receive_version_info() .await - .context("could not connect to reMarkable")?; - let (restream_command_future, mut restream_command_stdout) = - start_server(&client, opts.clone()).await?; - - let mut tcp_task = tokio::spawn(gstreamer_thread(opts)); - - tokio::pin!(restream_command_future); - loop { - tokio::select! { - restream_exit_code = &mut restream_command_future => { - error!("restream command exited with code {}", restream_exit_code.context("could not execute restream command")?); - - let restream_output = receive_output(&mut restream_command_stdout).await?; - error!("stdout+stderr: (next line)\n\n{}\n", restream_output); - }, - gstreamer_result = &mut tcp_task => { - error!("gstreamer exited with result: {:?}", gstreamer_result); - } - } - } + .context("could not receive version info")?; + + info!("received version information: {}", version_info); + + let device_config = DeviceConfig::new(version_info).context(format!( + "could not get device configuration for version {}", + version_info, + ))?; + + let stream_config = StreamConfig { + device_config: device_config.clone(), + framerate: client_options.framerate, + show_cursor: client_options.show_cursor, + }; + + info!("sending out stream config {:?}", &stream_config); + + conn.send_stream_config(stream_config) + .await + .context("could not send device config")?; + + let mut video_connection = VideoConnection::new(conn, device_config.video_config) + .context("could not initialize video connection")?; + video_connection + .run() + .await + .context("error while streaming")?; + + Ok(()) } diff --git a/client/src/start/mod.rs b/client/src/start/mod.rs deleted file mode 100644 index d7d8944..0000000 --- a/client/src/start/mod.rs +++ /dev/null @@ -1,55 +0,0 @@ -use super::config::*; - -use anyhow::{Context, Error}; -use async_ssh2_tokio::{AuthMethod, Client, ServerCheckMethod}; -use review_server::config::ServerOptions; -use tokio::sync::mpsc::{self, Receiver}; -use tracing::{debug, info}; - -pub async fn connect_ssh(opts: ClientOptions) -> Result { - info!("connecting to reMarkable"); - Client::connect( - (opts.remarkable_ip.clone(), opts.ssh_port), - "root", - AuthMethod::PrivateKeyFile { - key_file_path: opts.ssh_key.clone(), - key_pass: None, - }, - ServerCheckMethod::NoCheck, - ) - .await - .context("could not connect to reMarkable tablet") -} - -pub async fn start_server( - client: &Client, - opts: ClientOptions, -) -> Result< - ( - impl Future>, - Receiver>, - ), - Error, -> { - let (stdout_tx, stdout_rx) = mpsc::channel(10); - - let server_options: ServerOptions = opts.into(); - let restream_command = Box::leak(Box::new(format!( - "RUST_LOG=trace ./review-server '{}'", - serde_json::to_string(&server_options) - .context("could not convert server options to json")?, - ))); - - debug!("spawning restream"); - let exec_future = client.execute_io(restream_command, stdout_tx, None, None, false, None); - - Ok((exec_future, stdout_rx)) -} - -pub async fn receive_output(stdout: &mut Receiver>) -> Result { - let mut buf = vec![]; - while let Some(mut data) = stdout.recv().await { - buf.append(&mut data); - } - return Ok(String::from_utf8_lossy(&buf).to_string()); -} diff --git a/server/src/config/device.rs b/server/src/config/device.rs new file mode 100644 index 0000000..3259678 --- /dev/null +++ b/server/src/config/device.rs @@ -0,0 +1,167 @@ +use std::path::PathBuf; + +use anyhow::{Error, anyhow}; +use serde::{Deserialize, Serialize}; + +use crate::version::{FirmwareVersion, HardwareVersion, VersionInfo}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum FramebufferDataSource { + File { path: PathBuf }, + ProcessMemory, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FramebufferConfig { + pub source: FramebufferDataSource, + pub skip: usize, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum PixelFormat { + Rgb565le, + Gray8, + Gray16be, + Bgra, +} + +impl PixelFormat { + pub fn bytes_per_pixel(&self) -> usize { + match self { + PixelFormat::Rgb565le => 2, + PixelFormat::Gray8 => 1, + PixelFormat::Gray16be => 2, + PixelFormat::Bgra => 4, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct VideoConfig { + pub height: usize, + pub width: usize, + // TODO: redundant, use function on PixelFormat struct to determine number of bytes used + pub bytes_per_pixel: usize, + pub pixel_format: PixelFormat, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DeviceConfig { + pub framebuffer_config: FramebufferConfig, + pub video_config: VideoConfig, +} + +impl DeviceConfig { + pub fn new(version_info: VersionInfo) -> Result { + let height = 1872; + let width = 1404; + + match version_info.hardware { + HardwareVersion::Rm1 => Ok(DeviceConfig { + framebuffer_config: FramebufferConfig { + source: FramebufferDataSource::File { + path: PathBuf::from("/dev/fb0"), + }, + skip: 8, + }, + + video_config: VideoConfig { + height, + width, + bytes_per_pixel: 2, + pixel_format: PixelFormat::Rgb565le, + }, + }), + HardwareVersion::Rm2 => { + let rm2_config_version = DeviceConfigVersion::from(version_info.firmware); + + match rm2_config_version { + DeviceConfigVersion::Ancient => Err(anyhow!( + "no known configuration values for reMarkable 2 with firmware version {}", + version_info.firmware, + )), + DeviceConfigVersion::V3 => Ok(DeviceConfig { + framebuffer_config: FramebufferConfig { + source: FramebufferDataSource::ProcessMemory, + skip: 8, + }, + video_config: VideoConfig { + height, + width, + bytes_per_pixel: 1, + pixel_format: PixelFormat::Gray8, + }, + }), + DeviceConfigVersion::V3P7 => Ok(DeviceConfig { + framebuffer_config: FramebufferConfig { + source: FramebufferDataSource::ProcessMemory, + skip: 8, + }, + video_config: VideoConfig { + height, + width, + bytes_per_pixel: 2, + pixel_format: PixelFormat::Gray16be, + }, + }), + DeviceConfigVersion::V3P24 => Ok(DeviceConfig { + framebuffer_config: FramebufferConfig { + source: FramebufferDataSource::ProcessMemory, + skip: 2629636, + }, + video_config: VideoConfig { + height, + width, + bytes_per_pixel: 4, + pixel_format: PixelFormat::Bgra, + }, + }), + } + } + HardwareVersion::Ferrari => Err(anyhow!( + "no known configuration values known for reMarkable Paper Pro" + )), + } + } +} + +const VERSION_3_0: FirmwareVersion = FirmwareVersion { + version: 3, + major: 0, + minor: 0, + patch: 0, +}; +const VERSION_3_7: FirmwareVersion = FirmwareVersion { + version: 3, + major: 7, + minor: 0, + patch: 1930, +}; +const VERSION_3_24: FirmwareVersion = FirmwareVersion { + version: 3, + major: 24, + minor: 0, + patch: 0, +}; + +#[derive(Debug, PartialEq, Eq)] +pub enum DeviceConfigVersion { + Ancient, + V3, + V3P7, + V3P24, +} + +impl From for DeviceConfigVersion { + fn from(value: FirmwareVersion) -> Self { + if value >= VERSION_3_24 { + DeviceConfigVersion::V3P24 + } else if value >= VERSION_3_7 { + DeviceConfigVersion::V3P7 + } else if value >= VERSION_3_0 { + DeviceConfigVersion::V3 + } else { + DeviceConfigVersion::Ancient + } + } +} diff --git a/server/src/config/mod.rs b/server/src/config/mod.rs index 8483b10..ec0b961 100644 --- a/server/src/config/mod.rs +++ b/server/src/config/mod.rs @@ -1,148 +1,20 @@ -use std::path::PathBuf; +pub mod device; -use anyhow::{Context, Error, anyhow}; use clap::Parser; +use device::DeviceConfig; use serde::{Deserialize, Serialize}; -use crate::version::{ConfigVersion, FirmwareVersion, HardwareVersion, get_config_version}; - #[derive(Parser, Debug)] #[command(author, version)] pub struct CliOptions { - /// JSON object containing the server configuration - payload: String, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ServerOptions { + /// Port to listen for the TCP connections + #[arg(long, name = "port")] pub port: u16, - pub show_cursor: bool, - pub framerate: f32, } -impl TryFrom for ServerOptions { - type Error = Error; - - fn try_from(value: CliOptions) -> Result { - serde_json::from_str(&value.payload) - .context("could not parse JSON payload for server options") - } -} - -#[derive(Debug, Clone)] -pub enum VideoDataSource { - File { path: PathBuf }, - ProcessMemory, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum PixelFormat { - Rgb565le, - Gray8, - Gray16be, - Bgra, -} - -#[derive(Debug, Clone)] -pub struct InternalVideoConfig { - pub source: VideoDataSource, - pub skip: usize, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct SharedVideoConfig { - pub height: usize, - pub width: usize, - pub bytes_per_pixel: usize, - pub pixel_format: PixelFormat, -} - -#[derive(Debug, Clone)] -pub struct VideoConfig { - pub internal: InternalVideoConfig, - pub shared: SharedVideoConfig, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct VersionInfo { - pub hardware: HardwareVersion, - pub firmware: FirmwareVersion, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct CommunicatedConfig { - pub version: VersionInfo, - pub video_config: SharedVideoConfig, -} - -pub fn get_video_config( - hardware_version: &HardwareVersion, - firmware_version: &FirmwareVersion, -) -> Result { - match hardware_version { - HardwareVersion::Rm1 => Ok(VideoConfig { - internal: InternalVideoConfig { - source: VideoDataSource::File { - path: PathBuf::from("/dev/fb0"), - }, - skip: 8, - }, - shared: SharedVideoConfig { - height: 1408, - width: 1872, - bytes_per_pixel: 2, - pixel_format: PixelFormat::Rgb565le, - }, - }), - HardwareVersion::Rm2 => { - let height = 1872; - let width = 1404; - - match get_config_version(&firmware_version).context("could not get config version")? { - ConfigVersion::Ancient => Err(anyhow!( - "no known configuration values for reMarkable 2 with firmware version {}", - firmware_version, - )), - ConfigVersion::V3 => Ok(VideoConfig { - internal: InternalVideoConfig { - source: VideoDataSource::ProcessMemory, - skip: 8, - }, - shared: SharedVideoConfig { - height, - width, - bytes_per_pixel: 4, - pixel_format: PixelFormat::Gray8, - }, - }), - ConfigVersion::V3P7 => Ok(VideoConfig { - internal: InternalVideoConfig { - source: VideoDataSource::ProcessMemory, - skip: 8, - }, - shared: SharedVideoConfig { - height, - width, - bytes_per_pixel: 2, - pixel_format: PixelFormat::Gray16be, - }, - }), - ConfigVersion::V3P24 => Ok(VideoConfig { - internal: InternalVideoConfig { - source: VideoDataSource::ProcessMemory, - skip: 2629636, - }, - shared: SharedVideoConfig { - height, - width, - bytes_per_pixel: 4, - pixel_format: PixelFormat::Bgra, - }, - }), - } - } - HardwareVersion::Ferrari => Err(anyhow!( - "no known configuration values known for reMarkable Paper Pro" - )), - } +#[derive(Debug, Serialize, Deserialize)] +pub struct StreamConfig { + pub device_config: DeviceConfig, + pub framerate: f32, + pub show_cursor: bool, } diff --git a/server/src/connection/mod.rs b/server/src/connection/mod.rs new file mode 100644 index 0000000..65fd6ce --- /dev/null +++ b/server/src/connection/mod.rs @@ -0,0 +1,58 @@ +pub mod video; + +use anyhow::{Context, Error}; +use futures::{StreamExt, sink::SinkExt}; +use tokio::net::{TcpListener, TcpStream}; +use tokio_util::codec::{Framed, LengthDelimitedCodec}; +use tracing::info; + +use crate::config::{CliOptions, StreamConfig}; +use crate::version::VersionInfo; + +#[derive(Debug)] +pub struct Connection { + framed: Framed, +} + +impl Connection { + pub async fn new(cli_options: CliOptions) -> Result { + let listener = TcpListener::bind(&format!("0.0.0.0:{}", cli_options.port)) + .await + .context(format!("could not bind to port {}", cli_options.port))?; + + let (stream, addr) = listener + .accept() + .await + .context("error while waiting for a TCP connection")?; + info!("new connection from {}", addr); + + let framed = Framed::new(stream, LengthDelimitedCodec::new()); + + Ok(Connection { framed }) + } + + pub async fn send_version_info(&mut self, version_info: VersionInfo) -> Result<(), Error> { + let msg = bson::serialize_to_vec(&version_info) + .context("could not serialize version information")?; + + self.framed + .send(msg.into()) + .await + .context("could not send serialized version information") + .map(|_| ()) + } + + pub async fn receive_stream_config(&mut self) -> Result { + let msg = self + .framed + .next() + .await + .context("connection closed before stream configuration was sent")? + .context("could not message with stream configuration")?; + + let stream_config = + bson::deserialize_from_slice(&msg).context("could not deserialize stream config")?; + + Ok(stream_config) + } +} diff --git a/server/src/connection/video.rs b/server/src/connection/video.rs new file mode 100644 index 0000000..dbe63d1 --- /dev/null +++ b/server/src/connection/video.rs @@ -0,0 +1,60 @@ +use std::time::Duration; + +use anyhow::{Context, Error}; +use futures::SinkExt; +use lz4_flex::compress_prepend_size; +use tokio::time::{MissedTickBehavior, interval}; +use tracing::{debug, trace}; + +use super::Connection; +use crate::{config::StreamConfig, framebuffer::reading::FrameReader}; + +#[derive(Debug)] +pub struct VideoConnection { + conn: Connection, + stream_config: StreamConfig, + frame_reader: FrameReader, +} + +impl VideoConnection { + pub fn new(conn: Connection, stream_config: StreamConfig) -> Result { + let frame_reader = FrameReader::new(stream_config.device_config.clone()) + .context("could not create frame reader")?; + + Ok(Self { + conn, + stream_config, + frame_reader, + }) + } + + pub async fn run(&mut self) -> Result<(), Error> { + let mut interval = interval(Duration::from_secs_f32(1. / self.stream_config.framerate)); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + + let mut buffer = vec![0u8; self.frame_reader.frame_length()]; + loop { + interval.tick().await; + + self.frame_reader + .read_frame(&mut buffer) + .context("error reading frame from file")?; + + debug!("read {} bytes from frame reader", buffer.len()); + + let encoded_buffer = compress_prepend_size(&buffer); + trace!( + "writing encoded bytes to stream (length {})", + encoded_buffer.len(), + ); + + self.conn + .framed + .send(encoded_buffer.into()) + .await + .context("could not write frame to the stream")?; + + debug!("wrote the data to the output stream"); + } + } +} diff --git a/server/src/device/connection.rs b/server/src/device/connection.rs deleted file mode 100644 index d20d9fd..0000000 --- a/server/src/device/connection.rs +++ /dev/null @@ -1,99 +0,0 @@ -use std::time::Duration; - -use anyhow::{Context, Error}; -use futures::sink::SinkExt; -use lz4_flex::compress_prepend_size; -use tokio::net::{TcpListener, TcpStream}; -use tokio::time::{MissedTickBehavior, interval}; -use tokio_util::bytes::Bytes; -use tokio_util::codec::{Framed, LengthDelimitedCodec}; -use tracing::{debug, info, trace}; - -use crate::device::reading::FrameReader; -use crate::{ - config::{CommunicatedConfig, ServerOptions, VersionInfo, VideoConfig}, - version::{FirmwareVersion, HardwareVersion}, -}; - -pub async fn listen_for_clients( - hardware_version: HardwareVersion, - firmware_version: FirmwareVersion, - video_config: VideoConfig, - opts: ServerOptions, -) -> Result<(), Error> { - let communicated_config = CommunicatedConfig { - version: VersionInfo { - hardware: hardware_version, - firmware: firmware_version, - }, - video_config: video_config.shared.clone(), - }; - - let listener = TcpListener::bind(&format!("0.0.0.0:{}", opts.port)) - .await - .context(format!("could not bind to port {}", opts.port))?; - - let (stream, addr) = listener.accept().await?; - info!("new connection from {}", addr); - - open_connection( - stream, - opts.clone(), - video_config.clone(), - communicated_config.clone(), - ) - .await - .context("error while handling TCP connections")?; - - Ok(()) -} - -async fn open_connection( - stream: TcpStream, - opts: ServerOptions, - video_config: VideoConfig, - communicated_config: CommunicatedConfig, -) -> Result<(), Error> { - let mut framed = Framed::new(stream, LengthDelimitedCodec::new()); - - let bytes: Bytes = bson::serialize_to_vec(&communicated_config) - .context("could not serialize communicated config")? - .into(); - - framed - .send(bytes.iter().copied().collect()) - .await - .context("could not send out config")?; - - let mut frame_reader = - FrameReader::new(video_config).context("could not create frame reader")?; - - debug!("created frame reader, starting loop to send data"); - - let mut interval = interval(Duration::from_secs_f64(1. / (opts.framerate as f64))); - interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - - let mut buffer = vec![0u8; frame_reader.frame_length()]; - loop { - interval.tick().await; - - frame_reader - .read_frame(&mut buffer) - .context("error reading frame from file")?; - - debug!("read {} bytes from frame reader", buffer.len()); - - let encoded_buffer = compress_prepend_size(&buffer); - trace!( - "writing encoded bytes to stream (length {})", - encoded_buffer.len(), - ); - - framed - .send(encoded_buffer.into()) - .await - .context("could not write frame to encoder")?; - - debug!("wrote the data to the output stream"); - } -} diff --git a/server/src/device/mod.rs b/server/src/framebuffer/mod.rs similarity index 62% rename from server/src/device/mod.rs rename to server/src/framebuffer/mod.rs index 4e862f8..2a359f8 100644 --- a/server/src/device/mod.rs +++ b/server/src/framebuffer/mod.rs @@ -1,3 +1,2 @@ -pub mod connection; pub mod process; pub mod reading; diff --git a/server/src/device/process.rs b/server/src/framebuffer/process.rs similarity index 100% rename from server/src/device/process.rs rename to server/src/framebuffer/process.rs diff --git a/server/src/device/reading.rs b/server/src/framebuffer/reading.rs similarity index 67% rename from server/src/device/reading.rs rename to server/src/framebuffer/reading.rs index 3ced49c..5710e2e 100644 --- a/server/src/device/reading.rs +++ b/server/src/framebuffer/reading.rs @@ -7,7 +7,7 @@ use anyhow::{Context, Error, anyhow}; use tracing::trace; use super::process::get_xochitl_memory_file; -use crate::config::{self, VideoConfig}; +use crate::config::device::{DeviceConfig, FramebufferDataSource}; #[derive(Debug)] pub struct FrameReader { @@ -19,28 +19,28 @@ pub struct FrameReader { } impl FrameReader { - pub fn new(video_config: VideoConfig) -> Result { - let (file, offset) = match video_config.internal.source { - config::VideoDataSource::File { path } => ( + pub fn new(device_config: DeviceConfig) -> Result { + let (file, offset) = match device_config.framebuffer_config.source { + FramebufferDataSource::File { path } => ( File::open(path).context("could not open framebuffer file")?, 0, ), - config::VideoDataSource::ProcessMemory => get_xochitl_memory_file() + FramebufferDataSource::ProcessMemory => get_xochitl_memory_file() .context("could not get file and offset for xochitl process")?, }; trace!( "file offset: {}, extra skip: {}", - offset, video_config.internal.skip + offset, device_config.framebuffer_config.skip, ); - let offset = offset + video_config.internal.skip; + let offset = offset + device_config.framebuffer_config.skip; Ok(Self { file, offset, - width: video_config.shared.width, - height: video_config.shared.height, - bytes_per_pixel: video_config.shared.bytes_per_pixel, + width: device_config.video_config.width, + height: device_config.video_config.height, + bytes_per_pixel: device_config.video_config.pixel_format.bytes_per_pixel(), }) } diff --git a/server/src/main.rs b/server/src/main.rs index ffc2e5d..39d7c3f 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,36 +1,59 @@ mod config; -mod device; +mod connection; +mod framebuffer; mod version; use std::fs::OpenOptions; use anyhow::{Context, Error}; use clap::Parser; -use config::{CliOptions, ServerOptions, VideoConfig, get_video_config}; -use tracing::{debug, info}; +use config::CliOptions; +use tracing::info; use tracing_subscriber::{Registry, fmt, layer::SubscriberExt}; -use version::{get_firmware_version, get_hardware_version}; -use crate::{ - device::connection::listen_for_clients, - version::{FirmwareVersion, HardwareVersion}, -}; +use connection::{Connection, video::VideoConnection}; +use version::VersionInfo; #[tokio::main] async fn main() -> Result<(), Error> { initialize_logging().context("could not initialize logging")?; - let opts = get_command_line_options().context("could not read command line options")?; - let (hardware_version, firmware_version, video_config) = - get_versions_and_config().context("could not gather version infos and video config")?; + let cli_options = CliOptions::parse(); - listen_for_clients(hardware_version, firmware_version, video_config, opts) + let version_info = + VersionInfo::get_from_device().context("could not get version information")?; + + info!( + "got version information: hardware {:?}, firmware {}", + version_info.hardware, version_info.firmware, + ); + info!("initializing TCP connection"); + + let mut conn = Connection::new(cli_options) + .await + .context("error while handling TCP connection")?; + + info!("sending out version information"); + + conn.send_version_info(version_info) .await - .context("problem while listening for connections")?; + .context("could not send out version information")?; + + let stream_config = conn + .receive_stream_config() + .await + .context("could not receive stream config")?; + + info!("received stream config {:?}", &stream_config); + + let mut video_conn = VideoConnection::new(conn, stream_config) + .context("could not initialize video connection")?; + video_conn.run().await.context("error while streaming")?; Ok(()) } +// TODO: since now running as service, needed? fn initialize_logging() -> Result<(), Error> { let log_file = OpenOptions::new() .write(true) @@ -48,26 +71,3 @@ fn initialize_logging() -> Result<(), Error> { Ok(()) } - -fn get_command_line_options() -> Result { - let opts = CliOptions::parse(); - debug!("cli options: {:?}", opts); - let opts = ServerOptions::try_from(opts).context("could not get server options")?; - debug!("resolved options: {:?}", opts); - - Ok(opts) -} - -fn get_versions_and_config() -> Result<(HardwareVersion, FirmwareVersion, VideoConfig), Error> { - let hardware_version = get_hardware_version().context("could not get hardware version")?; - info!("Detected hardware version {:?}", hardware_version); - - let firmware_version = get_firmware_version().context("could not get version")?; - info!("Detected firmware version {:?}", firmware_version); - - let video_config = - get_video_config(&hardware_version, &firmware_version).context("could not get config")?; - info!("using video config: {:?}", video_config); - - Ok((hardware_version, firmware_version, video_config)) -} diff --git a/server/src/version/mod.rs b/server/src/version/mod.rs index 032e3db..12be871 100644 --- a/server/src/version/mod.rs +++ b/server/src/version/mod.rs @@ -2,26 +2,54 @@ mod tests; use core::fmt; -use std::fs; +use std::{fmt::Display, fs, str::FromStr}; use anyhow::{Context, Error, Result, anyhow}; use serde::{Deserialize, Serialize}; use tracing::trace; -#[derive(Debug, PartialEq, Eq)] -pub enum ConfigVersion { - Ancient, - V3, - V3P7, - V3P24, +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub enum HardwareVersion { + Rm1, + Rm2, + Ferrari, +} + +const HARDWARE_VERSION_FILE: &str = "/sys/devices/soc0/machine"; + +impl HardwareVersion { + pub fn get_from_device() -> Result { + let content = fs::read_to_string(HARDWARE_VERSION_FILE) + .context("could not read framebuffer version file")?; + let content = content.trim(); + + content.parse().context("could not parse hardware version") + } } -#[derive(Debug, Clone, PartialEq, Eq, Ord, Serialize, Deserialize)] +impl FromStr for HardwareVersion { + type Err = Error; + + fn from_str(s: &str) -> std::result::Result { + let (_, version_string) = s + .split_once(' ') + .context(format!("could not split input '{}'", s))?; + + match version_string { + "1.0" => Ok(HardwareVersion::Rm1), + "2.0" => Ok(HardwareVersion::Rm2), + "Ferrari" => Ok(HardwareVersion::Ferrari), + version_string => Err(anyhow!("unknown version string '{}'", version_string)), + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Ord, Serialize, Deserialize)] pub struct FirmwareVersion { - version: usize, - major: usize, - minor: usize, - patch: usize, + pub version: usize, + pub major: usize, + pub minor: usize, + pub patch: usize, } impl PartialOrd for FirmwareVersion { @@ -51,98 +79,74 @@ impl fmt::Display for FirmwareVersion { } } -pub fn get_firmware_version() -> Result { - let content = fs::read_to_string("/usr/share/remarkable/update.conf") - .context("could not read framebuffer version file")?; - let content = content.trim(); +const FIRMWARE_VERSION_FILE: &str = "/usr/share/remarkable/update.conf"; - parse_version(&content).context("could not parse version") -} +impl FirmwareVersion { + pub fn get_from_device() -> Result { + let content = fs::read_to_string(FIRMWARE_VERSION_FILE) + .context("could not read framebuffer version file")?; + let content = content.trim(); -const VERSION_3_0: FirmwareVersion = FirmwareVersion { - version: 3, - major: 0, - minor: 0, - patch: 0, -}; -const VERSION_3_7: FirmwareVersion = FirmwareVersion { - version: 3, - major: 7, - minor: 0, - patch: 1930, -}; -const VERSION_3_24: FirmwareVersion = FirmwareVersion { - version: 3, - major: 24, - minor: 0, - patch: 0, -}; - -pub fn get_config_version(version: &FirmwareVersion) -> Result { - Ok(if version >= &VERSION_3_24 { - ConfigVersion::V3P24 - } else if version >= &VERSION_3_7 { - ConfigVersion::V3P7 - } else if version >= &VERSION_3_0 { - ConfigVersion::V3 - } else { - ConfigVersion::Ancient - }) + content + .parse() + .context(format!("could not parse firmware version {}", content)) + } } -fn parse_version(input: &str) -> Result { - let (_, version_string) = input - .split_once('=') - .context(format!("could not split input '{}'", input))?; +impl FromStr for FirmwareVersion { + type Err = Error; - trace!("version string: {}", version_string); + fn from_str(s: &str) -> std::result::Result { + let (_, version_string) = s + .split_once('=') + .context(format!("could not split input '{}'", s))?; - let version_parts = version_string - .split('.') - .map(|s| { - s.parse() - .context(format!("could not parse '{}' as usize", s)) - }) - .collect::, Error>>()?; + trace!("version string: {}", version_string); - trace!("got parts: {:?}", version_parts); + let version_parts = version_string + .split('.') + .map(|s| { + s.parse() + .context(format!("could not parse '{}' as usize", s)) + }) + .collect::, Error>>()?; - if version_parts.len() > 4 { - return Err(anyhow!("too many version parts: {:?}", version_parts)); - } + trace!("got parts: {:?}", version_parts); - Ok(FirmwareVersion { - version: version_parts[0], - major: version_parts[1], - minor: version_parts[2], - patch: version_parts[3], - }) -} + if version_parts.len() > 4 { + return Err(anyhow!("too many version parts: {:?}", version_parts)); + } -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum HardwareVersion { - Rm1, - Rm2, - Ferrari, + Ok(FirmwareVersion { + version: version_parts[0], + major: version_parts[1], + minor: version_parts[2], + patch: version_parts[3], + }) + } } -pub fn get_hardware_version() -> Result { - let content = fs::read_to_string("/sys/devices/soc0/machine") - .context("could not read framebuffer version file")?; - let content = content.trim(); - - parse_hardware_version(&content).context("could not parse hardware version") +// TODO: include server version? +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct VersionInfo { + pub hardware: HardwareVersion, + pub firmware: FirmwareVersion, } -fn parse_hardware_version(input: &str) -> Result { - let (_, version_string) = input - .split_once(' ') - .context(format!("could not split input '{}'", input))?; +impl VersionInfo { + pub fn get_from_device() -> Result { + Ok(Self { + hardware: HardwareVersion::get_from_device()?, + firmware: FirmwareVersion::get_from_device()?, + }) + } +} - match version_string { - "1.0" => Ok(HardwareVersion::Rm1), - "2.0" => Ok(HardwareVersion::Rm2), - "Ferrari" => Ok(HardwareVersion::Ferrari), - version_string => Err(anyhow!("unknown version string '{}'", version_string)), +impl Display for VersionInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!( + "[ hardware: {:?}, firmware: {} ]", + self.hardware, self.firmware, + )) } } diff --git a/server/src/version/tests.rs b/server/src/version/tests.rs index a2b0e01..bb6a910 100644 --- a/server/src/version/tests.rs +++ b/server/src/version/tests.rs @@ -1,12 +1,55 @@ +use crate::config::device::DeviceConfigVersion; + use super::*; use test_log::test; +#[test] +fn parse_framebuffer_version_from_string() { + struct TestCase<'a> { + input: &'a str, + expected: FirmwareVersion, + } + + for TestCase { input, expected } in [ + TestCase { + input: "REMARKABLE_RELEASE_VERSION=2.0.0.0", + expected: FirmwareVersion { + version: 2, + major: 0, + minor: 0, + patch: 0, + }, + }, + TestCase { + input: "REMARKABLE_RELEASE_VERSION=3.7.0.1930", + expected: FirmwareVersion { + version: 3, + major: 7, + minor: 0, + patch: 1930, + }, + }, + TestCase { + input: "REMARKABLE_RELEASE_VERSION=3.25.0.119", + expected: FirmwareVersion { + version: 3, + major: 25, + minor: 0, + patch: 119, + }, + }, + ] { + let result = input.parse().expect("could not parse version"); + assert_eq!(expected, result, "version input '{}'", input); + } +} + #[test] fn parse_config_version_from_string() { struct TestCase { input: FirmwareVersion, - expected: ConfigVersion, + expected: DeviceConfigVersion, } for TestCase { input, expected } in [ @@ -17,7 +60,7 @@ fn parse_config_version_from_string() { minor: 0, patch: 0, }, - expected: ConfigVersion::Ancient, + expected: DeviceConfigVersion::Ancient, }, TestCase { input: FirmwareVersion { @@ -26,7 +69,7 @@ fn parse_config_version_from_string() { minor: 0, patch: 0, }, - expected: ConfigVersion::Ancient, + expected: DeviceConfigVersion::Ancient, }, TestCase { input: FirmwareVersion { @@ -35,7 +78,7 @@ fn parse_config_version_from_string() { minor: 0, patch: 0, }, - expected: ConfigVersion::V3, + expected: DeviceConfigVersion::V3, }, TestCase { input: FirmwareVersion { @@ -44,7 +87,7 @@ fn parse_config_version_from_string() { minor: 0, patch: 0, }, - expected: ConfigVersion::V3, + expected: DeviceConfigVersion::V3, }, TestCase { input: FirmwareVersion { @@ -53,7 +96,7 @@ fn parse_config_version_from_string() { minor: 0, patch: 1929, }, - expected: ConfigVersion::V3, + expected: DeviceConfigVersion::V3, }, TestCase { input: FirmwareVersion { @@ -62,7 +105,7 @@ fn parse_config_version_from_string() { minor: 0, patch: 1930, }, - expected: ConfigVersion::V3P7, + expected: DeviceConfigVersion::V3P7, }, TestCase { input: FirmwareVersion { @@ -71,7 +114,7 @@ fn parse_config_version_from_string() { minor: 1, patch: 0, }, - expected: ConfigVersion::V3P7, + expected: DeviceConfigVersion::V3P7, }, TestCase { input: FirmwareVersion { @@ -80,7 +123,7 @@ fn parse_config_version_from_string() { minor: 0, patch: 0, }, - expected: ConfigVersion::V3P7, + expected: DeviceConfigVersion::V3P7, }, TestCase { input: FirmwareVersion { @@ -89,7 +132,7 @@ fn parse_config_version_from_string() { minor: 0, patch: 0, }, - expected: ConfigVersion::V3P24, + expected: DeviceConfigVersion::V3P24, }, TestCase { input: FirmwareVersion { @@ -98,7 +141,7 @@ fn parse_config_version_from_string() { minor: 1, patch: 0, }, - expected: ConfigVersion::V3P24, + expected: DeviceConfigVersion::V3P24, }, TestCase { input: FirmwareVersion { @@ -107,51 +150,10 @@ fn parse_config_version_from_string() { minor: 0, patch: 119, }, - expected: ConfigVersion::V3P24, + expected: DeviceConfigVersion::V3P24, }, ] { - let result = get_config_version(&input).expect("could not parse version"); + let result = DeviceConfigVersion::from(input); assert_eq!(expected, result, "version input: {:?}", input); } } - -#[test] -fn parse_framebuffer_version_from_string() { - struct TestCase<'a> { - input: &'a str, - expected: FirmwareVersion, - } - - for TestCase { input, expected } in [ - TestCase { - input: "REMARKABLE_RELEASE_VERSION=2.0.0.0", - expected: FirmwareVersion { - version: 2, - major: 0, - minor: 0, - patch: 0, - }, - }, - TestCase { - input: "REMARKABLE_RELEASE_VERSION=3.7.0.1930", - expected: FirmwareVersion { - version: 3, - major: 7, - minor: 0, - patch: 1930, - }, - }, - TestCase { - input: "REMARKABLE_RELEASE_VERSION=3.25.0.119", - expected: FirmwareVersion { - version: 3, - major: 25, - minor: 0, - patch: 119, - }, - }, - ] { - let result = parse_version(input).expect("could not parse version"); - assert_eq!(expected, result, "version input '{}'", input); - } -}