Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions containers/piccolo-player.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ spec:
- name: actioncontroller
image: localhost/pullpiri-player:latest
command: ["/piccolo/actioncontroller"]
volumeMounts:
- name: piccolo-yaml
mountPath: /root/piccolo_yaml
- name: dbus
mountPath: /run/dbus
- name: systemd
mountPath: /etc/containers/systemd
- name: config-path
mountPath: /piccolo/settings.yaml
- name: statemanager
image: localhost/pullpiri-player:latest
command: ["/piccolo/statemanager"]
Expand Down
2 changes: 1 addition & 1 deletion containers/piccolo-server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ spec:
command: ["/piccolo/apiserver"]
volumeMounts:
- name: piccolo-yaml
mountPath: /root/piccolo_yaml
mountPath: /etc/piccolo/yaml
- name: config-path
mountPath: /piccolo/settings.yaml
- name: policymanager
Expand Down
2 changes: 1 addition & 1 deletion src/player/actioncontroller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ license = "Apache-2.0"
description = "Action Controller component for Pullpiri"

[dependencies]
tokio = { version = "1.45.0", features = ["full"] }
tokio = { version = "1.36.0", features = ["full"] }
tonic = "0.12.3"
prost = "0.13.3"
dbus = "0.9.7"
Expand Down
11 changes: 6 additions & 5 deletions src/player/actioncontroller/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,22 @@ use tonic::transport::Server;
pub async fn init(manager: crate::manager::ActionControllerManager) -> common::Result<()> {
let arc_manager = Arc::new(manager);
let grpc_server = receiver::ActionControllerReceiver::new(arc_manager.clone());

let addr = common::actioncontroller::open_server().parse()?;
println!("Starting gRPC server on {}", addr);

tokio::spawn(async move {
if let Err(e) = Server::builder()
.add_service(grpc_server.into_service())
.serve(addr)
.await {
.await
{
eprintln!("gRPC server error: {}", e);
}
});

println!("gRPC server started and listening");

Ok(())
}

Expand Down
7 changes: 3 additions & 4 deletions src/player/actioncontroller/src/grpc/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ impl ActionControllerConnection for ActionControllerReceiver {
request: Request<TriggerActionRequest>,
) -> Result<Response<TriggerActionResponse>, Status> {
// TODO: Implementation
println!("trigger_action in gprc receiver");
let scenario_name = request.into_inner().scenario_name;

match self.manager.trigger_manager_action(&scenario_name).await {
Expand Down Expand Up @@ -290,13 +291,11 @@ mod tests {
scenario_name: "antipinch-enable".to_string(),
});

receiver.trigger_action(request).await.unwrap();
let response = receiver.trigger_action(request).await.unwrap();
assert_eq!(response.get_ref().status, 0);

let _ = common::etcd::delete("scenario/antipinch-enable").await;
let _ = common::etcd::delete("package/antipinch-enable").await;

let response = receiver.trigger_action(request).await.unwrap();
assert_eq!(response.get_ref().status, 0);
}

#[tokio::test]
Expand Down
68 changes: 62 additions & 6 deletions src/player/actioncontroller/src/manager.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
use std::{thread, time::Duration};

use crate::runtime::bluechi;
use common::{
actioncontroller::Status,
spec::artifact::{Package, Scenario},
Result,
};

const SYSTEMD_PATH: &str = "/etc/containers/systemd/";

/// Manager for coordinating scenario actions and workload operations
///
/// Responsible for:
Expand Down Expand Up @@ -76,10 +81,11 @@ impl ActionControllerManager {
/// - The scenario is not allowed by policy
/// - The runtime operation fails
pub async fn trigger_manager_action(&self, scenario_name: &str) -> Result<()> {
println!("trigger_manager_action in manager {:?}", scenario_name);
if scenario_name.trim().is_empty() {
return Err("Invalid scenario name: cannot be empty".into());
}
let etcd_scenario_key = format!("scenario/{}", scenario_name);
let etcd_scenario_key = format!("Scenario/{}", scenario_name);
let scenario_str: String = match common::etcd::get(&etcd_scenario_key).await {
Ok(value) => value,
Err(e) => {
Expand All @@ -90,7 +96,7 @@ impl ActionControllerManager {

let action: String = scenario.get_actions();

let etcd_package_key: String = format!("package/{}", scenario.get_targets());
let etcd_package_key: String = format!("Package/{}", scenario.get_targets());
let package_str = common::etcd::get(&etcd_package_key).await?;
let package: Package = serde_yaml::from_str(&package_str)?;

Expand All @@ -117,6 +123,17 @@ impl ActionControllerManager {
"update" | "rollback" => {
self.stop_workload(&model_name, &model_node, &node_type)
.await?;

self.delete_symlink_and_reload(&mi.get_name(), &model_node)
.await?;

self.make_symlink_and_reload(
&model_node,
&mi.get_name(),
&scenario.get_targets(),
)
.await?;

self.start_workload(&model_name, &model_node, &node_type)
.await?;
}
Expand Down Expand Up @@ -382,14 +399,55 @@ impl ActionControllerManager {
}
Ok(())
}

pub async fn make_symlink_and_reload(
&self,
node_name: &str,
model_name: &str,
target_name: &str,
) -> Result<()> {
println!(
"make_symlink_and_reload'{:?}' on host node '{:?}'",
model_name, node_name
);
let original: String = format!(
"{0}/{1}.kube",
common::setting::get_config().yaml_storage,
target_name,
);
let link = format!("{}{}.kube", SYSTEMD_PATH, model_name);

if node_name == common::setting::get_config().host.name {
std::os::unix::fs::symlink(original, link)?;
}
self.reload_all_node(model_name, node_name).await?;
Ok(())
}

pub async fn delete_symlink_and_reload(&self, model_name: &str, node_name: &str) -> Result<()> {
// host node
let kube_symlink_path = format!("{}{}.kube", SYSTEMD_PATH, model_name);
let _ = std::fs::remove_file(&kube_symlink_path);

self.reload_all_node(model_name, node_name).await?;
Ok(())
}

pub async fn reload_all_node(&self, model_name: &str, model_node: &str) -> Result<()> {
let cmd = bluechi::BluechiCmd {
command: bluechi::Command::ControllerReloadAllNodes,
};
bluechi::handle_bluechi_cmd(model_name, model_node, cmd).await?;
thread::sleep(Duration::from_millis(100));
Ok(())
}
}

//UNIT TEST SKELTON

#[cfg(test)]
mod tests {
use super::*;
use crate::runtime::bluechi::handle_bluechi_cmd;
use common::actioncontroller::Status;
use std::error::Error;

Expand Down Expand Up @@ -521,7 +579,7 @@ mod tests {

assert!(result.is_err());
}

#[test]
fn test_manager_initializes_nodes() {
// Ensures new() returns manager with non-empty nodes
Expand All @@ -543,5 +601,3 @@ mod tests {
assert!(manager.pause_workload("test".into()).await.is_ok());
}
}


20 changes: 11 additions & 9 deletions src/player/actioncontroller/src/runtime/bluechi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ impl Command {
Command::UnitStop => "StopUnit",
Command::UnitRestart => "RestartUnit",
Command::UnitReload => "ReloadUnit",
Command::ControllerReloadAllNodes => "ReloadAllNodes",
_ => "Unknown",
}
}
Expand All @@ -66,7 +67,7 @@ pub async fn handle_bluechi_cmd(
) -> Result<()> {
let conn = Connection::new_system().unwrap();
let bluechi = conn.with_proxy(DEST, PATH, Duration::from_millis(5000));

print!("handle_bluechi_cmd ...\n");
match bluechi_cmd.command {
Command::ControllerReloadAllNodes => {
let _ = reload_all_nodes(&bluechi);
Expand Down Expand Up @@ -99,13 +100,14 @@ pub async fn handle_bluechi_cmd(
///
/// * `Ok(String)` - A successful result message including the job path
/// * `Err(...)` - If the D-Bus call fails
pub async fn workload_run(
pub fn workload_run(
conn: &Connection,
method: &str,
node_name: &str,
proxy: &Proxy<'_, &Connection>,
unit_name: &str,
) -> Result<String> {
print!("workload_run ...\n");
let (node,): (Path,) = proxy.method_call(DEST_CONTROLLER, "GetNode", (&node_name,))?;

let node_proxy = conn.with_proxy(DEST, node, Duration::from_millis(5000));
Expand All @@ -131,7 +133,8 @@ pub async fn workload_run(
///
/// * `Ok(String)` - A successful result message listing all reloaded nodes
/// * `Err(...)` - If any of the D-Bus calls fail
pub async fn reload_all_nodes(proxy: &Proxy<'_, &Connection>) -> Result<String> {
pub fn reload_all_nodes(proxy: &Proxy<'_, &Connection>) -> Result<String> {
print!("Reloading all nodes...\n");
let (nodes,): (Vec<(String, dbus::Path, String)>,) =
proxy.method_call(DEST_CONTROLLER, "ListNodes", ())?;

Expand Down Expand Up @@ -219,7 +222,7 @@ mod tests {

let bluechi_proxy = conn.with_proxy(DEST, PATH, Duration::from_millis(5000));

let result = workload_run(&conn, "StartUnit", node, &bluechi_proxy, unit_name).await;
let result = workload_run(&conn, "StartUnit", node, &bluechi_proxy, unit_name);
assert!(result.is_ok());

let output = result.unwrap();
Expand All @@ -238,7 +241,7 @@ mod tests {
}

let proxy = conn.with_proxy(DEST, PATH, Duration::from_millis(5000));
let result = reload_all_nodes(&proxy).await;
let result = reload_all_nodes(&proxy);
assert!(result.is_ok());
}

Expand Down Expand Up @@ -271,8 +274,7 @@ mod tests {

let bluechi_proxy = conn.with_proxy(DEST, PATH, Duration::from_millis(5000));

let result =
workload_run(&conn, "StartUnit", invalid_node, &bluechi_proxy, unit_name).await;
let result = workload_run(&conn, "StartUnit", invalid_node, &bluechi_proxy, unit_name);
assert!(result.is_err());
}

Expand All @@ -292,7 +294,7 @@ mod tests {

let bluechi_proxy = conn.with_proxy(DEST, PATH, Duration::from_millis(5000));

let result = workload_run(&conn, invalid_method, node, &bluechi_proxy, unit_name).await;
let result = workload_run(&conn, invalid_method, node, &bluechi_proxy, unit_name);
assert!(result.is_err());
}

Expand All @@ -305,7 +307,7 @@ mod tests {
let fake_dest = "org.eclipse.fakebluechi";
let proxy = conn.with_proxy(fake_dest, PATH, Duration::from_millis(5000));

let result = reload_all_nodes(&proxy).await;
let result = reload_all_nodes(&proxy);
assert!(result.is_err());
}
}