diff --git a/machine-config.toml b/machine-config.toml index 274b904..9154492 100644 --- a/machine-config.toml +++ b/machine-config.toml @@ -1,7 +1,7 @@ [cluster_manager] -id52 = "qkm0c2a05f6ke2qa31h85cto32hupvnnqlou0515pf2m0qgmkotg" +id52 = "b53nq9ob3o556etiioi55jkkn85n39g1h3q4kvfsbrifvpkr47j0" cluster_name = "test" [machine.server1] -id52 = "dm3l1t9cuskovaumoqioe246k4fk6c65e4sllagt2uhsv0e2geag" +id52 = "u8e1n91bcqsf77397sj50bb6gm7hcfs5bm6jnct2la6453gajkgg" allow_from = "*" diff --git a/malai/src/daemon.rs b/malai/src/daemon.rs index adf9709..1f78d0f 100644 --- a/malai/src/daemon.rs +++ b/malai/src/daemon.rs @@ -5,6 +5,28 @@ use eyre::Result; use futures_util::stream::StreamExt; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; + +/// Global daemon state for managing P2P listeners +#[derive(Debug)] +struct DaemonState { + cluster_listeners: HashMap>, + malai_home: std::path::PathBuf, +} + +impl DaemonState { + fn new(malai_home: std::path::PathBuf) -> Self { + Self { + cluster_listeners: HashMap::new(), + malai_home, + } + } +} + +/// Global daemon state - accessible for rescan operations +static DAEMON_STATE: tokio::sync::OnceCell>> = tokio::sync::OnceCell::const_new(); /// Start the real malai daemon - MVP implementation pub async fn start_real_daemon(foreground: bool) -> Result<()> { @@ -12,6 +34,10 @@ pub async fn start_real_daemon(foreground: bool) -> Result<()> { println!("🔥 Starting malai daemon (MVP)"); println!("📁 MALAI_HOME: {}", malai_home.display()); + // Initialize global daemon state + let daemon_state = Arc::new(RwLock::new(DaemonState::new(malai_home.clone()))); + DAEMON_STATE.set(daemon_state.clone()).map_err(|_| eyre::eyre!("Daemon state already initialized"))?; + // File locking (proven working pattern) let lock_path = malai_home.join("malai.lock"); let lock_file = std::fs::OpenOptions::new() @@ -37,7 +63,24 @@ pub async fn start_real_daemon(foreground: bool) -> Result<()> { println!("📋 Running in foreground mode (daemonization TODO)"); } - // Scan all cluster identities and roles + // Start Unix socket listener for daemon-CLI communication (wait for it to be ready) + let _socket_handle = crate::daemon_socket::start_daemon_socket_listener(malai_home.clone()).await?; + + // Initial cluster scan and startup + start_all_cluster_listeners().await?; + + println!("✅ malai daemon started - all cluster listeners active"); + println!("📨 Press Ctrl+C to stop gracefully"); + + // Wait for graceful shutdown + fastn_p2p::cancelled().await; + println!("👋 malai daemon stopped gracefully"); + + Ok(()) +} + +/// Start all cluster listeners (called at startup and during rescan) +async fn start_all_cluster_listeners() -> Result<()> { let cluster_roles = crate::config_manager::scan_cluster_roles().await?; if cluster_roles.is_empty() { @@ -48,27 +91,103 @@ pub async fn start_real_daemon(foreground: bool) -> Result<()> { println!("✅ Found {} cluster identities", cluster_roles.len()); - // Start Unix socket listener for daemon-CLI communication (wait for it to be ready) - let _socket_handle = crate::daemon_socket::start_daemon_socket_listener(malai_home.clone()).await?; + let daemon_state = DAEMON_STATE.get().ok_or_else(|| eyre::eyre!("Daemon state not initialized"))?; + let mut state = daemon_state.write().await; // Start one P2P listener per identity for (cluster_alias, identity, role) in cluster_roles { println!("🚀 Starting P2P listener for: {} ({:?})", cluster_alias, role); let cluster_alias_clone = cluster_alias.clone(); - fastn_p2p::spawn(async move { - if let Err(e) = run_cluster_listener(cluster_alias_clone, identity, role).await { - println!("❌ Cluster listener failed for {}: {}", cluster_alias, e); + let identity_clone = identity.clone(); + let handle = tokio::spawn(async move { + if let Err(e) = run_cluster_listener(cluster_alias_clone.clone(), identity_clone, role).await { + println!("❌ Cluster listener failed for {}: {}", cluster_alias_clone, e); } }); + + // Track the handle so we can stop it later + state.cluster_listeners.insert(cluster_alias, handle); } - println!("✅ malai daemon started - all cluster listeners active"); - println!("📨 Press Ctrl+C to stop gracefully"); + Ok(()) +} + +/// Stop specific cluster listeners (used during selective rescan) +async fn stop_cluster_listeners(cluster_names: &[String]) -> Result<()> { + let daemon_state = DAEMON_STATE.get().ok_or_else(|| eyre::eyre!("Daemon state not initialized"))?; + let mut state = daemon_state.write().await; - // Wait for graceful shutdown - fastn_p2p::cancelled().await; - println!("👋 malai daemon stopped gracefully"); + for cluster_name in cluster_names { + if let Some(handle) = state.cluster_listeners.remove(cluster_name) { + println!("🛑 Stopping P2P listener for: {}", cluster_name); + handle.abort(); + } + } + + Ok(()) +} + +/// Stop all cluster listeners +async fn stop_all_cluster_listeners() -> Result<()> { + let daemon_state = DAEMON_STATE.get().ok_or_else(|| eyre::eyre!("Daemon state not initialized"))?; + let mut state = daemon_state.write().await; + + println!("🛑 Stopping all P2P listeners..."); + for (cluster_name, handle) in state.cluster_listeners.drain() { + println!("🛑 Stopping P2P listener for: {}", cluster_name); + handle.abort(); + } + + Ok(()) +} + +/// Restart specific cluster listeners (selective rescan) +async fn restart_cluster_listeners(cluster_names: &[String]) -> Result<()> { + // Stop the specific listeners + stop_cluster_listeners(cluster_names).await?; + + // Re-scan configs and start new listeners for these clusters + let cluster_roles = crate::config_manager::scan_cluster_roles().await?; + let daemon_state = DAEMON_STATE.get().ok_or_else(|| eyre::eyre!("Daemon state not initialized"))?; + let mut state = daemon_state.write().await; + + for (cluster_alias, identity, role) in cluster_roles { + if cluster_names.contains(&cluster_alias) { + println!("🔄 Restarting P2P listener for: {} ({:?})", cluster_alias, role); + + let cluster_alias_clone = cluster_alias.clone(); + let identity_clone = identity.clone(); + let handle = tokio::spawn(async move { + if let Err(e) = run_cluster_listener(cluster_alias_clone.clone(), identity_clone, role).await { + println!("❌ Cluster listener failed for {}: {}", cluster_alias_clone, e); + } + }); + + state.cluster_listeners.insert(cluster_alias, handle); + } + } + + Ok(()) +} + +/// Perform actual daemon rescan (REAL IMPLEMENTATION) +pub async fn perform_real_daemon_rescan(cluster_name: Option) -> Result<()> { + println!("🔄 REAL DAEMON RESCAN: Starting rescan operation..."); + + match cluster_name { + Some(cluster) => { + println!("🔄 Selective rescan for cluster: {}", cluster); + restart_cluster_listeners(&[cluster.clone()]).await?; + println!("✅ Selective rescan completed for: {}", cluster); + } + None => { + println!("🔄 Full rescan - restarting all cluster listeners"); + stop_all_cluster_listeners().await?; + start_all_cluster_listeners().await?; + println!("✅ Full rescan completed - all clusters rescanned"); + } + } Ok(()) } diff --git a/malai/src/daemon_socket.rs b/malai/src/daemon_socket.rs index 91657c0..268fd3d 100644 --- a/malai/src/daemon_socket.rs +++ b/malai/src/daemon_socket.rs @@ -162,28 +162,8 @@ pub async fn send_daemon_rescan_command(malai_home: PathBuf, cluster_name: Optio } } -/// Perform actual rescan in daemon (placeholder implementation) +/// Perform actual rescan in daemon (REAL IMPLEMENTATION) async fn perform_daemon_rescan(cluster_name: Option) -> Result<()> { - // TODO: This is where we'd implement the actual daemon rescan logic: - // 1. Stop old P2P listeners for affected clusters - // 2. Re-read cluster configurations - // 3. Start new P2P listeners with updated configs - // 4. Handle errors gracefully (continue with working clusters) - - match cluster_name { - Some(cluster) => { - println!("🔄 [PLACEHOLDER] Rescanning cluster: {}", cluster); - // Simulate selective rescan work - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - println!("✅ [PLACEHOLDER] Cluster {} rescan complete", cluster); - } - None => { - println!("🔄 [PLACEHOLDER] Rescanning all clusters"); - // Simulate full rescan work - tokio::time::sleep(std::time::Duration::from_millis(200)).await; - println!("✅ [PLACEHOLDER] Full rescan complete"); - } - } - - Ok(()) + // ✅ REAL IMPLEMENTATION: Now calls the actual daemon rescan functionality + crate::daemon::perform_real_daemon_rescan(cluster_name).await } \ No newline at end of file diff --git a/malai/src/main.rs b/malai/src/main.rs index d4454fa..65fcd7c 100644 --- a/malai/src/main.rs +++ b/malai/src/main.rs @@ -201,8 +201,7 @@ allow_from = "*" "#, cm_id52, machine_id52); if let Err(e) = malai::send_config(cluster_manager_key.clone(), &machine_id52, &sample_config).await { - println!("❌ Config distribution failed: {}", e); - return Ok(()); + panic!("❌ REAL P2P CONFIG DISTRIBUTION FAILED: {}\n\nThis test was silently returning Ok(()) on failure, making tests pass when P2P was broken!", e); } println!("✅ Config distribution successful"); @@ -213,8 +212,7 @@ allow_from = "*" // Test 2: Command execution (should work after config) println!("📤 Step 2: Testing command execution..."); if let Err(e) = malai::send_command(cluster_manager_key, &machine_id52, "echo", vec!["Complete malai infrastructure working!".to_string()]).await { - println!("❌ Command execution failed: {}", e); - return Ok(()); + panic!("❌ REAL P2P COMMAND EXECUTION FAILED: {}\n\nThis test was silently returning Ok(()) on failure, making tests pass when P2P was broken!", e); } println!("🎉 Complete malai infrastructure test successful!");