Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ lazy_static = "1.5.0"
maplit = "1.0.2"
memmap2 = "0.9.9"
rustyline = "17.0.2"
serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.149"
static_assertions = "1.1.0"
strum_macros = "0.27.2"
termios = "0.3.3"
Expand Down
3 changes: 3 additions & 0 deletions crates/vm-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ edition = "2024"
applevisor = { workspace = true, optional = true }
applevisor-sys = { workspace = true, optional = true }
anyhow = { workspace = true }
async-trait = { workspace = true }
bitflags = { workspace = true }
kvm-bindings = { workspace = true, optional = true }
kvm-ioctls = { workspace = true, optional = true }
memmap2 = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
vm-fdt = { workspace = true }
vm-mm = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions crates/vm-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ pub mod arch;
pub mod debug;
pub mod device;
pub mod error;
pub mod monitor;
pub mod virt;
167 changes: 167 additions & 0 deletions crates/vm-core/src/monitor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
use std::collections::HashMap;
use std::io;
use std::sync::Arc;

use async_trait::async_trait;
use tokio::io::AsyncWriteExt;
use tokio::net::UnixListener;
use tokio::net::UnixStream;

const PATH: &str = "/tmp/vm.sock";

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("{0}")]
Stream(#[from] std::io::Error),

#[error("{0}")]
CommandHandlerConflicat(String),
Comment on lines +17 to +18
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Typo in error variant name.

CommandHandlerConflicat should be CommandHandlerConflict.

✏️ Proposed fix
     #[error("{0}")]
-    CommandHandlerConflicat(String),
+    CommandHandlerConflict(String),

Also update the usage on line 147:

-            return Err(Error::CommandHandlerConflicat(name));
+            return Err(Error::CommandHandlerConflict(name));
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[error("{0}")]
CommandHandlerConflicat(String),
#[error("{0}")]
CommandHandlerConflict(String),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/vm-core/src/monitor.rs` around lines 17 - 18, Rename the typo'd error
enum variant CommandHandlerConflicat to CommandHandlerConflict in the monitor.rs
definition and update every usage/constructor and match arm that references
CommandHandlerConflicat (e.g., places creating the error or matching on it) to
use CommandHandlerConflict instead so symbol names remain consistent across the
codebase.


#[error("{0}")]
Serde(#[from] serde_json::Error),

#[error("Unknown cmd {0}")]
UnknownCmd(String),

#[error("unknown subcommand {0:?}")]
UnknownSubcommand(Vec<String>),

#[error("{0}")]
Error(String),
}

#[async_trait]
pub trait MonitorCommand: Send + Sync {
async fn handle_command(&self, subcommands: &[&str]) -> Result<String, Error>;
}

struct MonitorConnection {
components: Arc<HashMap<String, Box<dyn MonitorCommand>>>,
}

impl MonitorConnection {
fn start(&self, mut stream: UnixStream) {
tokio::spawn({
let components = self.components.clone();

async move {
loop {
stream.readable().await?;

let mut buf = vec![0u8; 1024];
match stream.try_read(&mut buf) {
Ok(0) => break,
Ok(n) => {
let line = match str::from_utf8(&buf[..n]) {
Ok(line) => line.trim(),
Err(err) => {
stream.write_all(format!("ERR {err}\n").as_bytes()).await?;

continue;
}
};
Comment on lines +48 to +62
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fix command parsing to handle stream framing correctly.

A Unix stream is byte-oriented; one try_read chunk is not guaranteed to be exactly one command. Fragmented/coalesced reads will mis-parse commands.

🛠️ Proposed fix (line-based reader)
+use tokio::io::AsyncBufReadExt;
+use tokio::io::BufReader;
@@
-    fn start(&self, mut stream: UnixStream) {
+    fn start(&self, stream: UnixStream) {
         tokio::spawn({
             let components = self.components.clone();
 
             async move {
+                let (read_half, mut write_half) = stream.into_split();
+                let mut reader = BufReader::new(read_half);
+                let mut line = String::new();
+
                 loop {
-                    stream.readable().await?;
-
-                    let mut buf = vec![0u8; 1024];
-                    match stream.try_read(&mut buf) {
-                        Ok(0) => break,
-                        Ok(n) => {
-                            let line = match str::from_utf8(&buf[..n]) {
-                                Ok(line) => line.trim(),
-                                Err(err) => {
-                                    stream.write_all(format!("ERR {err}\n").as_bytes()).await?;
-
-                                    continue;
-                                }
-                            };
-                            if line.is_empty() {
-                                continue;
-                            }
+                    line.clear();
+                    let n = reader.read_line(&mut line).await?;
+                    if n == 0 {
+                        break;
+                    }
+                    let line = line.trim();
+                    if line.is_empty() {
+                        continue;
+                    }
@@
-                                        stream.writable().await?;
-
-                                        stream.write_all(resp.as_bytes()).await?;
+                                        write_half.write_all(resp.as_bytes()).await?;
                                     }
                                     Err(e) => {
-                                        stream.write_all(format!("ERR {e}\n").as_bytes()).await?;
+                                        write_half
+                                            .write_all(format!("ERR {e}\n").as_bytes())
+                                            .await?;
                                     }
                                 },
                                 None => {
-                                    stream
+                                    write_half
                                         .write_all(
                                             format!("ERR unknown command {command}\n").as_bytes(),
                                         )
                                         .await?;
                                 }
                             }
-                        }
-                        Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
-                            continue;
-                        }
-                        Err(e) => {
-                            return Err(e.into());
-                        }
-                    }
                 }
 
                 Ok::<(), Error>(())
             }
         });

Also applies to: 59-65

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/vm-core/src/monitor.rs` around lines 39 - 53, The current loop using
stream.try_read and treating each chunk as a single command mis-parses when
reads are fragmented or coalesced; replace chunk-based parsing with a
line-oriented reader: buffer and accumulate bytes until newline(s) and then
split into complete commands (or wrap the stream with an AsyncBufRead + lines()
iterator) before calling the existing command handling and respond via
stream.write_all; apply the same change to the second occurrence around the
59-65 block so both places use line-based framing instead of
one-read-per-command parsing (refer to the variables/functions stream, try_read,
buf, and the utf8 conversion/stream.write_all calls to locate the code).

if line.is_empty() {
continue;
}

let mut tokens = line.split_whitespace();
let command = match tokens.next() {
Some(f) => f,
None => continue,
};
let subcommands: Vec<&str> = tokens.collect();

match components.get(command) {
Some(handler) => match handler.handle_command(&subcommands).await {
Ok(resp) => {
stream.writable().await?;

stream.write_all(resp.as_bytes()).await?;
}
Err(e) => {
stream.write_all(format!("ERR {e}\n").as_bytes()).await?;
}
},
None => {
stream
.write_all(
format!("ERR unknown command {command}\n").as_bytes(),
)
.await?;
}
}
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e.into());
}
}
}

Ok::<(), Error>(())
}
});
}
}

pub struct MonitorServer {
components: Arc<HashMap<String, Box<dyn MonitorCommand>>>,
}

impl MonitorServer {
pub fn start(&self) {
let components = self.components.clone();

tokio::spawn(async move {
let Ok(listener) = UnixListener::bind(PATH) else {
return;
};
Comment on lines +117 to +120
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Silent failure when socket bind fails.

If a stale socket file exists from a previous crashed run, UnixListener::bind will fail and the server silently exits without any indication to the caller or logs. This makes debugging difficult.

Consider removing the stale socket before binding and logging/returning the error.

🛠️ Proposed fix
         tokio::spawn(async move {
+            // Remove stale socket file if it exists
+            let _ = std::fs::remove_file(PATH);
+
-            let Ok(listener) = UnixListener::bind(PATH) else {
-                return;
-            };
+            let listener = match UnixListener::bind(PATH) {
+                Ok(l) => l,
+                Err(e) => {
+                    tracing::error!("Failed to bind monitor socket at {PATH}: {e}");
+                    return;
+                }
+            };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
tokio::spawn(async move {
let Ok(listener) = UnixListener::bind(PATH) else {
return;
};
tokio::spawn(async move {
// Remove stale socket file if it exists
let _ = std::fs::remove_file(PATH);
let listener = match UnixListener::bind(PATH) {
Ok(l) => l,
Err(e) => {
tracing::error!("Failed to bind monitor socket at {PATH}: {e}");
return;
}
};
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/vm-core/src/monitor.rs` around lines 110 - 113, The UnixListener::bind
call inside the tokio::spawn currently silently returns on error; update the
block around UnixListener::bind(PATH) so that on Err you log the bind error
(including the error value) and, if the error looks like a stale socket (e.g.,
file exists / AddrInUse), attempt to remove PATH via std::fs::remove_file and
then retry UnixListener::bind once, logging any removal or retry failures;
ensure you still return/exit the task if the retry fails. Refer to
UnixListener::bind, PATH, and the spawned tokio::spawn listener block when
making the changes.


loop {
let stream = match listener.accept().await {
Ok((stream, _)) => stream,
Err(_err) => {
continue;
}
Comment on lines +123 to +127
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Handle repeated accept failures with observability/backoff.

Blind continue on accept errors can create a noisy tight loop and hides operational issues.

🛠️ Proposed fix
                     Ok((stream, _)) => stream,
-                    Err(_err) => {
+                    Err(err) => {
+                        eprintln!("monitor accept failed: {err}");
+                        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
                         continue;
                     }
                 };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let stream = match listener.accept().await {
Ok((stream, _)) => stream,
Err(_err) => {
continue;
}
let stream = match listener.accept().await {
Ok((stream, _)) => stream,
Err(err) => {
eprintln!("monitor accept failed: {err}");
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
continue;
}
};
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@crates/vm-core/src/monitor.rs` around lines 116 - 120, The accept branch
currently discards errors; change the Err(_err) arm that handles
listener.accept().await to capture the error (e.g., Err(err)), emit an
observable log/metric (using your project's logger/tracing) including err and
context, and implement a backoff before retrying (use tokio::time::sleep with an
exponential/backoff delay capped at a max and reset the delay on successful
accept). Ensure you reference listener.accept().await and the stream handling so
the code resets backoff on Ok((stream, _)) and sleeps on Err(err) rather than
immediately continuing.

};

let monitor_connection = MonitorConnection {
components: components.clone(),
};

monitor_connection.start(stream);
}
});
}
}

#[derive(Default)]
pub struct MonitorServerBuilder {
components: HashMap<String, Box<dyn MonitorCommand>>,
}

impl MonitorServerBuilder {
pub fn register_command_handler(
&mut self,
name: &str,
handler: Box<dyn MonitorCommand>,
) -> Result<(), Error> {
let name = name.to_string();

if self.components.contains_key(&name) {
return Err(Error::CommandHandlerConflicat(name));
}

self.components.insert(name, handler);

Ok(())
}

pub fn build(self) -> MonitorServer {
MonitorServer {
components: self.components.into(),
}
}
}
3 changes: 3 additions & 0 deletions crates/vm-device/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ edition = "2024"

[dependencies]
anyhow.workspace = true
async-trait.workspace = true
bitflags.workspace = true
lazy_static.workspace = true
maplit.workspace = true
serde.workspace = true
serde_json.workspace = true
strum_macros.workspace = true
thiserror.workspace = true
tokio.workspace = true
Expand Down
Loading
Loading