-
Notifications
You must be signed in to change notification settings - Fork 0
feat: Add monitor server #102
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,4 +4,5 @@ pub mod arch; | |
| pub mod debug; | ||
| pub mod device; | ||
| pub mod error; | ||
| pub mod monitor; | ||
| pub mod virt; | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,167 @@ | ||||||||||||||||||||||||||||||||
| use std::collections::HashMap; | ||||||||||||||||||||||||||||||||
| use std::io; | ||||||||||||||||||||||||||||||||
| use std::sync::Arc; | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| use async_trait::async_trait; | ||||||||||||||||||||||||||||||||
| use tokio::io::AsyncWriteExt; | ||||||||||||||||||||||||||||||||
| use tokio::net::UnixListener; | ||||||||||||||||||||||||||||||||
| use tokio::net::UnixStream; | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| const PATH: &str = "/tmp/vm.sock"; | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| #[derive(Debug, thiserror::Error)] | ||||||||||||||||||||||||||||||||
| pub enum Error { | ||||||||||||||||||||||||||||||||
| #[error("{0}")] | ||||||||||||||||||||||||||||||||
| Stream(#[from] std::io::Error), | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| #[error("{0}")] | ||||||||||||||||||||||||||||||||
| CommandHandlerConflicat(String), | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| #[error("{0}")] | ||||||||||||||||||||||||||||||||
| Serde(#[from] serde_json::Error), | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| #[error("Unknown cmd {0}")] | ||||||||||||||||||||||||||||||||
| UnknownCmd(String), | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| #[error("unknown subcommand {0:?}")] | ||||||||||||||||||||||||||||||||
| UnknownSubcommand(Vec<String>), | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| #[error("{0}")] | ||||||||||||||||||||||||||||||||
| Error(String), | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| #[async_trait] | ||||||||||||||||||||||||||||||||
| pub trait MonitorCommand: Send + Sync { | ||||||||||||||||||||||||||||||||
| async fn handle_command(&self, subcommands: &[&str]) -> Result<String, Error>; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| struct MonitorConnection { | ||||||||||||||||||||||||||||||||
| components: Arc<HashMap<String, Box<dyn MonitorCommand>>>, | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| impl MonitorConnection { | ||||||||||||||||||||||||||||||||
| fn start(&self, mut stream: UnixStream) { | ||||||||||||||||||||||||||||||||
| tokio::spawn({ | ||||||||||||||||||||||||||||||||
| let components = self.components.clone(); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| async move { | ||||||||||||||||||||||||||||||||
| loop { | ||||||||||||||||||||||||||||||||
| stream.readable().await?; | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| let mut buf = vec![0u8; 1024]; | ||||||||||||||||||||||||||||||||
| match stream.try_read(&mut buf) { | ||||||||||||||||||||||||||||||||
| Ok(0) => break, | ||||||||||||||||||||||||||||||||
| Ok(n) => { | ||||||||||||||||||||||||||||||||
| let line = match str::from_utf8(&buf[..n]) { | ||||||||||||||||||||||||||||||||
| Ok(line) => line.trim(), | ||||||||||||||||||||||||||||||||
| Err(err) => { | ||||||||||||||||||||||||||||||||
| stream.write_all(format!("ERR {err}\n").as_bytes()).await?; | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| continue; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||
|
Comment on lines
+48
to
+62
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix command parsing to handle stream framing correctly. A Unix stream is byte-oriented; one 🛠️ Proposed fix (line-based reader)+use tokio::io::AsyncBufReadExt;
+use tokio::io::BufReader;
@@
- fn start(&self, mut stream: UnixStream) {
+ fn start(&self, stream: UnixStream) {
tokio::spawn({
let components = self.components.clone();
async move {
+ let (read_half, mut write_half) = stream.into_split();
+ let mut reader = BufReader::new(read_half);
+ let mut line = String::new();
+
loop {
- stream.readable().await?;
-
- let mut buf = vec![0u8; 1024];
- match stream.try_read(&mut buf) {
- Ok(0) => break,
- Ok(n) => {
- let line = match str::from_utf8(&buf[..n]) {
- Ok(line) => line.trim(),
- Err(err) => {
- stream.write_all(format!("ERR {err}\n").as_bytes()).await?;
-
- continue;
- }
- };
- if line.is_empty() {
- continue;
- }
+ line.clear();
+ let n = reader.read_line(&mut line).await?;
+ if n == 0 {
+ break;
+ }
+ let line = line.trim();
+ if line.is_empty() {
+ continue;
+ }
@@
- stream.writable().await?;
-
- stream.write_all(resp.as_bytes()).await?;
+ write_half.write_all(resp.as_bytes()).await?;
}
Err(e) => {
- stream.write_all(format!("ERR {e}\n").as_bytes()).await?;
+ write_half
+ .write_all(format!("ERR {e}\n").as_bytes())
+ .await?;
}
},
None => {
- stream
+ write_half
.write_all(
format!("ERR unknown command {command}\n").as_bytes(),
)
.await?;
}
}
- }
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
- continue;
- }
- Err(e) => {
- return Err(e.into());
- }
- }
}
Ok::<(), Error>(())
}
});Also applies to: 59-65 🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||
| if line.is_empty() { | ||||||||||||||||||||||||||||||||
| continue; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| let mut tokens = line.split_whitespace(); | ||||||||||||||||||||||||||||||||
| let command = match tokens.next() { | ||||||||||||||||||||||||||||||||
| Some(f) => f, | ||||||||||||||||||||||||||||||||
| None => continue, | ||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||
| let subcommands: Vec<&str> = tokens.collect(); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| match components.get(command) { | ||||||||||||||||||||||||||||||||
| Some(handler) => match handler.handle_command(&subcommands).await { | ||||||||||||||||||||||||||||||||
| Ok(resp) => { | ||||||||||||||||||||||||||||||||
| stream.writable().await?; | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| stream.write_all(resp.as_bytes()).await?; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| Err(e) => { | ||||||||||||||||||||||||||||||||
| stream.write_all(format!("ERR {e}\n").as_bytes()).await?; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||
| None => { | ||||||||||||||||||||||||||||||||
| stream | ||||||||||||||||||||||||||||||||
| .write_all( | ||||||||||||||||||||||||||||||||
| format!("ERR unknown command {command}\n").as_bytes(), | ||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||
| .await?; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { | ||||||||||||||||||||||||||||||||
| continue; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| Err(e) => { | ||||||||||||||||||||||||||||||||
| return Err(e.into()); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| Ok::<(), Error>(()) | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| pub struct MonitorServer { | ||||||||||||||||||||||||||||||||
| components: Arc<HashMap<String, Box<dyn MonitorCommand>>>, | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| impl MonitorServer { | ||||||||||||||||||||||||||||||||
| pub fn start(&self) { | ||||||||||||||||||||||||||||||||
| let components = self.components.clone(); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| tokio::spawn(async move { | ||||||||||||||||||||||||||||||||
| let Ok(listener) = UnixListener::bind(PATH) else { | ||||||||||||||||||||||||||||||||
| return; | ||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||
|
Comment on lines
+117
to
+120
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Silent failure when socket bind fails. If a stale socket file exists from a previous crashed run, Consider removing the stale socket before binding and logging/returning the error. 🛠️ Proposed fix tokio::spawn(async move {
+ // Remove stale socket file if it exists
+ let _ = std::fs::remove_file(PATH);
+
- let Ok(listener) = UnixListener::bind(PATH) else {
- return;
- };
+ let listener = match UnixListener::bind(PATH) {
+ Ok(l) => l,
+ Err(e) => {
+ tracing::error!("Failed to bind monitor socket at {PATH}: {e}");
+ return;
+ }
+ };📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| loop { | ||||||||||||||||||||||||||||||||
| let stream = match listener.accept().await { | ||||||||||||||||||||||||||||||||
| Ok((stream, _)) => stream, | ||||||||||||||||||||||||||||||||
| Err(_err) => { | ||||||||||||||||||||||||||||||||
| continue; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
Comment on lines
+123
to
+127
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Handle repeated accept failures with observability/backoff. Blind 🛠️ Proposed fix Ok((stream, _)) => stream,
- Err(_err) => {
+ Err(err) => {
+ eprintln!("monitor accept failed: {err}");
+ tokio::time::sleep(std::time::Duration::from_millis(100)).await;
continue;
}
};📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| let monitor_connection = MonitorConnection { | ||||||||||||||||||||||||||||||||
| components: components.clone(), | ||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| monitor_connection.start(stream); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| #[derive(Default)] | ||||||||||||||||||||||||||||||||
| pub struct MonitorServerBuilder { | ||||||||||||||||||||||||||||||||
| components: HashMap<String, Box<dyn MonitorCommand>>, | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| impl MonitorServerBuilder { | ||||||||||||||||||||||||||||||||
| pub fn register_command_handler( | ||||||||||||||||||||||||||||||||
| &mut self, | ||||||||||||||||||||||||||||||||
| name: &str, | ||||||||||||||||||||||||||||||||
| handler: Box<dyn MonitorCommand>, | ||||||||||||||||||||||||||||||||
| ) -> Result<(), Error> { | ||||||||||||||||||||||||||||||||
| let name = name.to_string(); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| if self.components.contains_key(&name) { | ||||||||||||||||||||||||||||||||
| return Err(Error::CommandHandlerConflicat(name)); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| self.components.insert(name, handler); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| Ok(()) | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| pub fn build(self) -> MonitorServer { | ||||||||||||||||||||||||||||||||
| MonitorServer { | ||||||||||||||||||||||||||||||||
| components: self.components.into(), | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo in error variant name.
CommandHandlerConflicatshould beCommandHandlerConflict.✏️ Proposed fix
#[error("{0}")] - CommandHandlerConflicat(String), + CommandHandlerConflict(String),Also update the usage on line 147:
📝 Committable suggestion
🤖 Prompt for AI Agents