diff --git a/containers/piccolo-player.yaml b/containers/piccolo-player.yaml index 0199d690d..2db18ff0b 100644 --- a/containers/piccolo-player.yaml +++ b/containers/piccolo-player.yaml @@ -18,6 +18,15 @@ spec: - name: actioncontroller image: localhost/pullpiri-player:latest command: ["/piccolo/actioncontroller"] + volumeMounts: + - name: piccolo-yaml + mountPath: /root/piccolo_yaml + - name: dbus + mountPath: /run/dbus + - name: systemd + mountPath: /etc/containers/systemd + - name: config-path + mountPath: /piccolo/settings.yaml - name: statemanager image: localhost/pullpiri-player:latest command: ["/piccolo/statemanager"] diff --git a/containers/piccolo-server.yaml b/containers/piccolo-server.yaml index cc4966a81..d29455fa8 100644 --- a/containers/piccolo-server.yaml +++ b/containers/piccolo-server.yaml @@ -17,7 +17,7 @@ spec: command: ["/piccolo/apiserver"] volumeMounts: - name: piccolo-yaml - mountPath: /root/piccolo_yaml + mountPath: /etc/piccolo/yaml - name: config-path mountPath: /piccolo/settings.yaml - name: policymanager diff --git a/src/player/actioncontroller/Cargo.toml b/src/player/actioncontroller/Cargo.toml index 6dd4c25c8..08a28292a 100644 --- a/src/player/actioncontroller/Cargo.toml +++ b/src/player/actioncontroller/Cargo.toml @@ -7,7 +7,7 @@ license = "Apache-2.0" description = "Action Controller component for Pullpiri" [dependencies] -tokio = { version = "1.45.0", features = ["full"] } +tokio = { version = "1.36.0", features = ["full"] } tonic = "0.12.3" prost = "0.13.3" dbus = "0.9.7" diff --git a/src/player/actioncontroller/src/grpc/mod.rs b/src/player/actioncontroller/src/grpc/mod.rs index ed619ca7f..13b40c1b0 100644 --- a/src/player/actioncontroller/src/grpc/mod.rs +++ b/src/player/actioncontroller/src/grpc/mod.rs @@ -23,21 +23,22 @@ use tonic::transport::Server; pub async fn init(manager: crate::manager::ActionControllerManager) -> common::Result<()> { let arc_manager = Arc::new(manager); let grpc_server = receiver::ActionControllerReceiver::new(arc_manager.clone()); - + let addr = common::actioncontroller::open_server().parse()?; println!("Starting gRPC server on {}", addr); - + tokio::spawn(async move { if let Err(e) = Server::builder() .add_service(grpc_server.into_service()) .serve(addr) - .await { + .await + { eprintln!("gRPC server error: {}", e); } }); - + println!("gRPC server started and listening"); - + Ok(()) } diff --git a/src/player/actioncontroller/src/grpc/receiver.rs b/src/player/actioncontroller/src/grpc/receiver.rs index bd4811729..4357e7c21 100644 --- a/src/player/actioncontroller/src/grpc/receiver.rs +++ b/src/player/actioncontroller/src/grpc/receiver.rs @@ -62,6 +62,7 @@ impl ActionControllerConnection for ActionControllerReceiver { request: Request, ) -> Result, Status> { // TODO: Implementation + println!("trigger_action in gprc receiver"); let scenario_name = request.into_inner().scenario_name; match self.manager.trigger_manager_action(&scenario_name).await { @@ -290,13 +291,11 @@ mod tests { scenario_name: "antipinch-enable".to_string(), }); - receiver.trigger_action(request).await.unwrap(); + let response = receiver.trigger_action(request).await.unwrap(); + assert_eq!(response.get_ref().status, 0); let _ = common::etcd::delete("scenario/antipinch-enable").await; let _ = common::etcd::delete("package/antipinch-enable").await; - - let response = receiver.trigger_action(request).await.unwrap(); - assert_eq!(response.get_ref().status, 0); } #[tokio::test] diff --git a/src/player/actioncontroller/src/manager.rs b/src/player/actioncontroller/src/manager.rs index 2672ffec7..948b9b4e0 100644 --- a/src/player/actioncontroller/src/manager.rs +++ b/src/player/actioncontroller/src/manager.rs @@ -1,9 +1,14 @@ +use std::{thread, time::Duration}; + use crate::runtime::bluechi; use common::{ actioncontroller::Status, spec::artifact::{Package, Scenario}, Result, }; + +const SYSTEMD_PATH: &str = "/etc/containers/systemd/"; + /// Manager for coordinating scenario actions and workload operations /// /// Responsible for: @@ -76,10 +81,11 @@ impl ActionControllerManager { /// - The scenario is not allowed by policy /// - The runtime operation fails pub async fn trigger_manager_action(&self, scenario_name: &str) -> Result<()> { + println!("trigger_manager_action in manager {:?}", scenario_name); if scenario_name.trim().is_empty() { return Err("Invalid scenario name: cannot be empty".into()); } - let etcd_scenario_key = format!("scenario/{}", scenario_name); + let etcd_scenario_key = format!("Scenario/{}", scenario_name); let scenario_str: String = match common::etcd::get(&etcd_scenario_key).await { Ok(value) => value, Err(e) => { @@ -90,7 +96,7 @@ impl ActionControllerManager { let action: String = scenario.get_actions(); - let etcd_package_key: String = format!("package/{}", scenario.get_targets()); + let etcd_package_key: String = format!("Package/{}", scenario.get_targets()); let package_str = common::etcd::get(&etcd_package_key).await?; let package: Package = serde_yaml::from_str(&package_str)?; @@ -117,6 +123,17 @@ impl ActionControllerManager { "update" | "rollback" => { self.stop_workload(&model_name, &model_node, &node_type) .await?; + + self.delete_symlink_and_reload(&mi.get_name(), &model_node) + .await?; + + self.make_symlink_and_reload( + &model_node, + &mi.get_name(), + &scenario.get_targets(), + ) + .await?; + self.start_workload(&model_name, &model_node, &node_type) .await?; } @@ -382,6 +399,48 @@ impl ActionControllerManager { } Ok(()) } + + pub async fn make_symlink_and_reload( + &self, + node_name: &str, + model_name: &str, + target_name: &str, + ) -> Result<()> { + println!( + "make_symlink_and_reload'{:?}' on host node '{:?}'", + model_name, node_name + ); + let original: String = format!( + "{0}/{1}.kube", + common::setting::get_config().yaml_storage, + target_name, + ); + let link = format!("{}{}.kube", SYSTEMD_PATH, model_name); + + if node_name == common::setting::get_config().host.name { + std::os::unix::fs::symlink(original, link)?; + } + self.reload_all_node(model_name, node_name).await?; + Ok(()) + } + + pub async fn delete_symlink_and_reload(&self, model_name: &str, node_name: &str) -> Result<()> { + // host node + let kube_symlink_path = format!("{}{}.kube", SYSTEMD_PATH, model_name); + let _ = std::fs::remove_file(&kube_symlink_path); + + self.reload_all_node(model_name, node_name).await?; + Ok(()) + } + + pub async fn reload_all_node(&self, model_name: &str, model_node: &str) -> Result<()> { + let cmd = bluechi::BluechiCmd { + command: bluechi::Command::ControllerReloadAllNodes, + }; + bluechi::handle_bluechi_cmd(model_name, model_node, cmd).await?; + thread::sleep(Duration::from_millis(100)); + Ok(()) + } } //UNIT TEST SKELTON @@ -389,7 +448,6 @@ impl ActionControllerManager { #[cfg(test)] mod tests { use super::*; - use crate::runtime::bluechi::handle_bluechi_cmd; use common::actioncontroller::Status; use std::error::Error; @@ -521,7 +579,7 @@ mod tests { assert!(result.is_err()); } - + #[test] fn test_manager_initializes_nodes() { // Ensures new() returns manager with non-empty nodes @@ -543,5 +601,3 @@ mod tests { assert!(manager.pause_workload("test".into()).await.is_ok()); } } - - diff --git a/src/player/actioncontroller/src/runtime/bluechi/mod.rs b/src/player/actioncontroller/src/runtime/bluechi/mod.rs index 267472266..1fc0cdf92 100644 --- a/src/player/actioncontroller/src/runtime/bluechi/mod.rs +++ b/src/player/actioncontroller/src/runtime/bluechi/mod.rs @@ -43,6 +43,7 @@ impl Command { Command::UnitStop => "StopUnit", Command::UnitRestart => "RestartUnit", Command::UnitReload => "ReloadUnit", + Command::ControllerReloadAllNodes => "ReloadAllNodes", _ => "Unknown", } } @@ -66,7 +67,7 @@ pub async fn handle_bluechi_cmd( ) -> Result<()> { let conn = Connection::new_system().unwrap(); let bluechi = conn.with_proxy(DEST, PATH, Duration::from_millis(5000)); - + print!("handle_bluechi_cmd ...\n"); match bluechi_cmd.command { Command::ControllerReloadAllNodes => { let _ = reload_all_nodes(&bluechi); @@ -99,13 +100,14 @@ pub async fn handle_bluechi_cmd( /// /// * `Ok(String)` - A successful result message including the job path /// * `Err(...)` - If the D-Bus call fails -pub async fn workload_run( +pub fn workload_run( conn: &Connection, method: &str, node_name: &str, proxy: &Proxy<'_, &Connection>, unit_name: &str, ) -> Result { + print!("workload_run ...\n"); let (node,): (Path,) = proxy.method_call(DEST_CONTROLLER, "GetNode", (&node_name,))?; let node_proxy = conn.with_proxy(DEST, node, Duration::from_millis(5000)); @@ -131,7 +133,8 @@ pub async fn workload_run( /// /// * `Ok(String)` - A successful result message listing all reloaded nodes /// * `Err(...)` - If any of the D-Bus calls fail -pub async fn reload_all_nodes(proxy: &Proxy<'_, &Connection>) -> Result { +pub fn reload_all_nodes(proxy: &Proxy<'_, &Connection>) -> Result { + print!("Reloading all nodes...\n"); let (nodes,): (Vec<(String, dbus::Path, String)>,) = proxy.method_call(DEST_CONTROLLER, "ListNodes", ())?; @@ -219,7 +222,7 @@ mod tests { let bluechi_proxy = conn.with_proxy(DEST, PATH, Duration::from_millis(5000)); - let result = workload_run(&conn, "StartUnit", node, &bluechi_proxy, unit_name).await; + let result = workload_run(&conn, "StartUnit", node, &bluechi_proxy, unit_name); assert!(result.is_ok()); let output = result.unwrap(); @@ -238,7 +241,7 @@ mod tests { } let proxy = conn.with_proxy(DEST, PATH, Duration::from_millis(5000)); - let result = reload_all_nodes(&proxy).await; + let result = reload_all_nodes(&proxy); assert!(result.is_ok()); } @@ -271,8 +274,7 @@ mod tests { let bluechi_proxy = conn.with_proxy(DEST, PATH, Duration::from_millis(5000)); - let result = - workload_run(&conn, "StartUnit", invalid_node, &bluechi_proxy, unit_name).await; + let result = workload_run(&conn, "StartUnit", invalid_node, &bluechi_proxy, unit_name); assert!(result.is_err()); } @@ -292,7 +294,7 @@ mod tests { let bluechi_proxy = conn.with_proxy(DEST, PATH, Duration::from_millis(5000)); - let result = workload_run(&conn, invalid_method, node, &bluechi_proxy, unit_name).await; + let result = workload_run(&conn, invalid_method, node, &bluechi_proxy, unit_name); assert!(result.is_err()); } @@ -305,7 +307,7 @@ mod tests { let fake_dest = "org.eclipse.fakebluechi"; let proxy = conn.with_proxy(fake_dest, PATH, Duration::from_millis(5000)); - let result = reload_all_nodes(&proxy).await; + let result = reload_all_nodes(&proxy); assert!(result.is_err()); } }