Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions machine-config.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[cluster_manager]
id52 = "qkm0c2a05f6ke2qa31h85cto32hupvnnqlou0515pf2m0qgmkotg"
id52 = "b53nq9ob3o556etiioi55jkkn85n39g1h3q4kvfsbrifvpkr47j0"
cluster_name = "test"

[machine.server1]
id52 = "dm3l1t9cuskovaumoqioe246k4fk6c65e4sllagt2uhsv0e2geag"
id52 = "u8e1n91bcqsf77397sj50bb6gm7hcfs5bm6jnct2la6453gajkgg"
allow_from = "*"
141 changes: 130 additions & 11 deletions malai/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,39 @@

use eyre::Result;
use futures_util::stream::StreamExt;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;

/// Global daemon state for managing P2P listeners
#[derive(Debug)]
struct DaemonState {
cluster_listeners: HashMap<String, tokio::task::JoinHandle<()>>,
malai_home: std::path::PathBuf,
}

impl DaemonState {
fn new(malai_home: std::path::PathBuf) -> Self {
Self {
cluster_listeners: HashMap::new(),
malai_home,
}
}
}

/// Global daemon state - accessible for rescan operations
static DAEMON_STATE: tokio::sync::OnceCell<Arc<RwLock<DaemonState>>> = tokio::sync::OnceCell::const_new();

/// Start the real malai daemon - MVP implementation
pub async fn start_real_daemon(foreground: bool) -> Result<()> {
let malai_home = crate::core_utils::get_malai_home();
println!("πŸ”₯ Starting malai daemon (MVP)");
println!("πŸ“ MALAI_HOME: {}", malai_home.display());

// Initialize global daemon state
let daemon_state = Arc::new(RwLock::new(DaemonState::new(malai_home.clone())));
DAEMON_STATE.set(daemon_state.clone()).map_err(|_| eyre::eyre!("Daemon state already initialized"))?;

// File locking (proven working pattern)
let lock_path = malai_home.join("malai.lock");
let lock_file = std::fs::OpenOptions::new()
Expand All @@ -37,7 +63,24 @@ pub async fn start_real_daemon(foreground: bool) -> Result<()> {
println!("πŸ“‹ Running in foreground mode (daemonization TODO)");
}

// Scan all cluster identities and roles
// Start Unix socket listener for daemon-CLI communication (wait for it to be ready)
let _socket_handle = crate::daemon_socket::start_daemon_socket_listener(malai_home.clone()).await?;

// Initial cluster scan and startup
start_all_cluster_listeners().await?;

println!("βœ… malai daemon started - all cluster listeners active");
println!("πŸ“¨ Press Ctrl+C to stop gracefully");

// Wait for graceful shutdown
fastn_p2p::cancelled().await;
println!("πŸ‘‹ malai daemon stopped gracefully");

Ok(())
}

/// Start all cluster listeners (called at startup and during rescan)
async fn start_all_cluster_listeners() -> Result<()> {
let cluster_roles = crate::config_manager::scan_cluster_roles().await?;

if cluster_roles.is_empty() {
Expand All @@ -48,27 +91,103 @@ pub async fn start_real_daemon(foreground: bool) -> Result<()> {

println!("βœ… Found {} cluster identities", cluster_roles.len());

// Start Unix socket listener for daemon-CLI communication (wait for it to be ready)
let _socket_handle = crate::daemon_socket::start_daemon_socket_listener(malai_home.clone()).await?;
let daemon_state = DAEMON_STATE.get().ok_or_else(|| eyre::eyre!("Daemon state not initialized"))?;
let mut state = daemon_state.write().await;

// Start one P2P listener per identity
for (cluster_alias, identity, role) in cluster_roles {
println!("πŸš€ Starting P2P listener for: {} ({:?})", cluster_alias, role);

let cluster_alias_clone = cluster_alias.clone();
fastn_p2p::spawn(async move {
if let Err(e) = run_cluster_listener(cluster_alias_clone, identity, role).await {
println!("❌ Cluster listener failed for {}: {}", cluster_alias, e);
let identity_clone = identity.clone();
let handle = tokio::spawn(async move {
if let Err(e) = run_cluster_listener(cluster_alias_clone.clone(), identity_clone, role).await {
println!("❌ Cluster listener failed for {}: {}", cluster_alias_clone, e);
}
});

// Track the handle so we can stop it later
state.cluster_listeners.insert(cluster_alias, handle);
}

println!("βœ… malai daemon started - all cluster listeners active");
println!("πŸ“¨ Press Ctrl+C to stop gracefully");
Ok(())
}

/// Stop specific cluster listeners (used during selective rescan)
async fn stop_cluster_listeners(cluster_names: &[String]) -> Result<()> {
let daemon_state = DAEMON_STATE.get().ok_or_else(|| eyre::eyre!("Daemon state not initialized"))?;
let mut state = daemon_state.write().await;

// Wait for graceful shutdown
fastn_p2p::cancelled().await;
println!("πŸ‘‹ malai daemon stopped gracefully");
for cluster_name in cluster_names {
if let Some(handle) = state.cluster_listeners.remove(cluster_name) {
println!("πŸ›‘ Stopping P2P listener for: {}", cluster_name);
handle.abort();
}
}

Ok(())
}

/// Stop all cluster listeners
async fn stop_all_cluster_listeners() -> Result<()> {
let daemon_state = DAEMON_STATE.get().ok_or_else(|| eyre::eyre!("Daemon state not initialized"))?;
let mut state = daemon_state.write().await;

println!("πŸ›‘ Stopping all P2P listeners...");
for (cluster_name, handle) in state.cluster_listeners.drain() {
println!("πŸ›‘ Stopping P2P listener for: {}", cluster_name);
handle.abort();
}

Ok(())
}

/// Restart specific cluster listeners (selective rescan)
async fn restart_cluster_listeners(cluster_names: &[String]) -> Result<()> {
// Stop the specific listeners
stop_cluster_listeners(cluster_names).await?;

// Re-scan configs and start new listeners for these clusters
let cluster_roles = crate::config_manager::scan_cluster_roles().await?;
let daemon_state = DAEMON_STATE.get().ok_or_else(|| eyre::eyre!("Daemon state not initialized"))?;
let mut state = daemon_state.write().await;

for (cluster_alias, identity, role) in cluster_roles {
if cluster_names.contains(&cluster_alias) {
println!("πŸ”„ Restarting P2P listener for: {} ({:?})", cluster_alias, role);

let cluster_alias_clone = cluster_alias.clone();
let identity_clone = identity.clone();
let handle = tokio::spawn(async move {
if let Err(e) = run_cluster_listener(cluster_alias_clone.clone(), identity_clone, role).await {
println!("❌ Cluster listener failed for {}: {}", cluster_alias_clone, e);
}
});

state.cluster_listeners.insert(cluster_alias, handle);
}
}

Ok(())
}

/// Perform actual daemon rescan (REAL IMPLEMENTATION)
pub async fn perform_real_daemon_rescan(cluster_name: Option<String>) -> Result<()> {
println!("πŸ”„ REAL DAEMON RESCAN: Starting rescan operation...");

match cluster_name {
Some(cluster) => {
println!("πŸ”„ Selective rescan for cluster: {}", cluster);
restart_cluster_listeners(&[cluster.clone()]).await?;
println!("βœ… Selective rescan completed for: {}", cluster);
}
None => {
println!("πŸ”„ Full rescan - restarting all cluster listeners");
stop_all_cluster_listeners().await?;
start_all_cluster_listeners().await?;
println!("βœ… Full rescan completed - all clusters rescanned");
}
}

Ok(())
}
Expand Down
26 changes: 3 additions & 23 deletions malai/src/daemon_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,28 +162,8 @@ pub async fn send_daemon_rescan_command(malai_home: PathBuf, cluster_name: Optio
}
}

/// Perform actual rescan in daemon (placeholder implementation)
/// Perform actual rescan in daemon (REAL IMPLEMENTATION)
async fn perform_daemon_rescan(cluster_name: Option<String>) -> Result<()> {
// TODO: This is where we'd implement the actual daemon rescan logic:
// 1. Stop old P2P listeners for affected clusters
// 2. Re-read cluster configurations
// 3. Start new P2P listeners with updated configs
// 4. Handle errors gracefully (continue with working clusters)

match cluster_name {
Some(cluster) => {
println!("πŸ”„ [PLACEHOLDER] Rescanning cluster: {}", cluster);
// Simulate selective rescan work
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
println!("βœ… [PLACEHOLDER] Cluster {} rescan complete", cluster);
}
None => {
println!("πŸ”„ [PLACEHOLDER] Rescanning all clusters");
// Simulate full rescan work
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
println!("βœ… [PLACEHOLDER] Full rescan complete");
}
}

Ok(())
// βœ… REAL IMPLEMENTATION: Now calls the actual daemon rescan functionality
crate::daemon::perform_real_daemon_rescan(cluster_name).await
}
6 changes: 2 additions & 4 deletions malai/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,7 @@ allow_from = "*"
"#, cm_id52, machine_id52);

if let Err(e) = malai::send_config(cluster_manager_key.clone(), &machine_id52, &sample_config).await {
println!("❌ Config distribution failed: {}", e);
return Ok(());
panic!("❌ REAL P2P CONFIG DISTRIBUTION FAILED: {}\n\nThis test was silently returning Ok(()) on failure, making tests pass when P2P was broken!", e);
}

println!("βœ… Config distribution successful");
Expand All @@ -213,8 +212,7 @@ allow_from = "*"
// Test 2: Command execution (should work after config)
println!("πŸ“€ Step 2: Testing command execution...");
if let Err(e) = malai::send_command(cluster_manager_key, &machine_id52, "echo", vec!["Complete malai infrastructure working!".to_string()]).await {
println!("❌ Command execution failed: {}", e);
return Ok(());
panic!("❌ REAL P2P COMMAND EXECUTION FAILED: {}\n\nThis test was silently returning Ok(()) on failure, making tests pass when P2P was broken!", e);
}

println!("πŸŽ‰ Complete malai infrastructure test successful!");
Expand Down