From af9513e5ff074653841f036bacb5dc5e18030034 Mon Sep 17 00:00:00 2001 From: Mauro Ezequiel Moltrasio Date: Fri, 4 Oct 2024 16:59:52 +0200 Subject: [PATCH 01/10] Create berserker::run function and replace Worker trait with enum Move most logic from the main function into a new run function, this is a bit more idiomatic. Replaced the Worker trait with a Worker enum, this prevents dynamic dispatching and should make the code a bit easier to follow. --- src/lib.rs | 96 +++++++++++++++++++++++++++-- src/main.rs | 87 +-------------------------- src/worker/endpoints.rs | 6 +- src/worker/mod.rs | 90 ++++++++++++++++------------ src/worker/network.rs | 17 ++---- src/worker/processes.rs | 6 +- src/worker/syscalls.rs | 6 +- src/worker/udp.rs | 130 ++++++++++++++++++++++++++++++++++++++++ 8 files changed, 286 insertions(+), 152 deletions(-) create mode 100644 src/worker/udp.rs diff --git a/src/lib.rs b/src/lib.rs index 9593b12..bee3d6c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,18 @@ use core_affinity::CoreId; +use fork::{fork, Fork}; +use itertools::iproduct; +use log::{info, warn}; +use nix::{ + sys::{ + signal::{kill, Signal}, + wait::waitpid, + }, + unistd::Pid, +}; use serde::Deserialize; -use std::fmt::Display; +use std::{fmt::Display, thread}; + +use crate::worker::Worker; pub mod worker; @@ -133,11 +145,6 @@ impl Display for WorkerError { } } -/// Generic interface for workers of any type -pub trait Worker { - fn run_payload(&self) -> Result<(), WorkerError>; -} - /// General information for each worker, on which CPU is it running /// and what is the process number. #[derive(Debug, Copy, Clone)] @@ -152,6 +159,83 @@ impl Display for BaseConfig { } } +pub fn run(config: WorkloadConfig) { + let duration_timer = std::time::SystemTime::now(); + let mut lower = 1024; + let mut upper = 1024; + + let core_ids: Vec = if config.per_core { + // Retrieve the IDs of all active CPU cores. + core_affinity::get_core_ids().unwrap() + } else { + vec![CoreId { id: 0 }] + }; + + let handles: Vec<_> = iproduct!(core_ids.into_iter(), 0..config.workers) + .map(|(cpu, process)| { + let worker = + Worker::new(config, cpu, process, &mut lower, &mut upper); + + match fork() { + Ok(Fork::Parent(child)) => { + info!("Child {}", child); + Some(child) + } + Ok(Fork::Child) => { + if config.per_core { + core_affinity::set_for_current(cpu); + } + + loop { + worker.run_payload().unwrap(); + } + } + Err(e) => { + warn!("Failed: {e:?}"); + None + } + } + }) + .collect(); + + info!("In total: {}", upper); + + let processes = &handles.clone(); + + thread::scope(|s| { + if config.duration != 0 { + // Spin a watcher thread + s.spawn(move || loop { + thread::sleep(std::time::Duration::from_secs(1)); + let elapsed = duration_timer.elapsed().unwrap().as_secs(); + + if elapsed > config.duration { + for handle in processes.iter().flatten() { + info!("Terminating: {}", *handle); + match kill(Pid::from_raw(*handle), Signal::SIGTERM) { + Ok(()) => { + continue; + } + Err(_) => { + continue; + } + } + } + + break; + } + }); + } + + s.spawn(move || { + for handle in processes.iter().flatten() { + info!("waitpid: {}", *handle); + waitpid(Pid::from_raw(*handle), None).unwrap(); + } + }); + }); +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/main.rs b/src/main.rs index 8873b96..e05b93f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,22 +18,13 @@ extern crate log; extern crate core_affinity; use config::Config; -use core_affinity::CoreId; -use fork::{fork, Fork}; -use itertools::iproduct; -use nix::sys::signal::{kill, Signal}; -use nix::sys::wait::waitpid; -use nix::unistd::Pid; -use std::time::SystemTime; -use std::{env, thread, time}; -use berserker::{worker::new_worker, WorkloadConfig}; +use berserker::WorkloadConfig; fn main() { - let args: Vec = env::args().collect(); + let args: Vec = std::env::args().collect(); let default_config = String::from("workload.toml"); let config_path = &args.get(1).unwrap_or(&default_config); - let duration_timer = SystemTime::now(); let config = Config::builder() // Add in `./Settings.toml` @@ -54,81 +45,9 @@ fn main() { .try_deserialize::() .unwrap(); - let mut lower = 1024; - let mut upper = 1024; - env_logger::init(); info!("Config: {:?}", config); - let core_ids: Vec = if config.per_core { - // Retrieve the IDs of all active CPU cores. - core_affinity::get_core_ids().unwrap() - } else { - vec![CoreId { id: 0 }] - }; - - let handles: Vec<_> = iproduct!(core_ids.into_iter(), 0..config.workers) - .map(|(cpu, process)| { - let worker = - new_worker(config, cpu, process, &mut lower, &mut upper); - - match fork() { - Ok(Fork::Parent(child)) => { - info!("Child {}", child); - Some(child) - } - Ok(Fork::Child) => { - if config.per_core { - core_affinity::set_for_current(cpu); - } - - loop { - worker.run_payload().unwrap(); - } - } - Err(e) => { - warn!("Failed: {e:?}"); - None - } - } - }) - .collect(); - - info!("In total: {}", upper); - - let processes = &handles.clone(); - - thread::scope(|s| { - if config.duration != 0 { - // Spin a watcher thread - s.spawn(move || loop { - thread::sleep(time::Duration::from_secs(1)); - let elapsed = duration_timer.elapsed().unwrap().as_secs(); - - if elapsed > config.duration { - for handle in processes.into_iter().flatten() { - info!("Terminating: {}", *handle); - match kill(Pid::from_raw(*handle), Signal::SIGTERM) { - Ok(()) => { - continue; - } - Err(e) => { - continue; - } - } - } - - break; - } - }); - } - - s.spawn(move || { - for handle in processes.into_iter().flatten() { - info!("waitpid: {}", *handle); - waitpid(Pid::from_raw(*handle), None).unwrap(); - } - }); - }); + berserker::run(config); } diff --git a/src/worker/endpoints.rs b/src/worker/endpoints.rs index 35008cf..903e2f9 100644 --- a/src/worker/endpoints.rs +++ b/src/worker/endpoints.rs @@ -3,7 +3,7 @@ use std::{fmt::Display, net::TcpListener, thread, time}; use core_affinity::CoreId; use log::info; -use crate::{BaseConfig, Worker, WorkerError, WorkloadConfig}; +use crate::{BaseConfig, WorkerError, WorkloadConfig}; struct EndpointWorkload { restart_interval: u64, @@ -41,10 +41,8 @@ impl EndpointWorker { }, } } -} -impl Worker for EndpointWorker { - fn run_payload(&self) -> Result<(), WorkerError> { + pub fn run_payload(&self) -> Result<(), WorkerError> { info!("{self}"); let EndpointWorkload { diff --git a/src/worker/mod.rs b/src/worker/mod.rs index 642b84a..f68430d 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -2,7 +2,7 @@ use core_affinity::CoreId; use rand::{thread_rng, Rng}; use rand_distr::{Uniform, Zipf}; -use crate::{Distribution, Worker, Workload, WorkloadConfig}; +use crate::{Distribution, WorkerError, Workload, WorkloadConfig}; use self::{ endpoints::EndpointWorker, network::NetworkWorker, @@ -14,48 +14,60 @@ pub mod network; pub mod processes; pub mod syscalls; -pub fn new_worker( - workload: WorkloadConfig, - cpu: CoreId, - process: usize, - lower_bound: &mut usize, - upper_bound: &mut usize, -) -> Box { - match workload.workload { - Workload::Processes { .. } => { - Box::new(ProcessesWorker::new(workload, cpu, process)) +pub enum Worker { + Endpoint(EndpointWorker), + Process(ProcessesWorker), + Syscalls(SyscallsWorker), + Network(NetworkWorker), +} + +impl Worker { + pub fn run_payload(&self) -> Result<(), WorkerError> { + match self { + Worker::Endpoint(e) => e.run_payload(), + Worker::Process(p) => p.run_payload(), + Worker::Syscalls(s) => s.run_payload(), + Worker::Network(n) => n.run_payload(), } - Workload::Endpoints { distribution } => { - match distribution { - Distribution::Zipfian { n_ports, exponent } => { - let n_ports: f64 = thread_rng() - .sample(Zipf::new(n_ports, exponent).unwrap()); + } - *lower_bound = *upper_bound; - *upper_bound += n_ports as usize; - } - Distribution::Uniform { lower, upper } => { - // TODO: Double check this branch - let n_ports = - thread_rng().sample(Uniform::new(lower, upper)); + pub fn new( + workload: WorkloadConfig, + cpu: CoreId, + process: usize, + lower_bound: &mut usize, + upper_bound: &mut usize, + ) -> Worker { + match workload.workload { + Workload::Processes { .. } => { + Worker::Process(ProcessesWorker::new(workload, cpu, process)) + } + Workload::Endpoints { distribution } => { + let n_ports: usize = match distribution { + Distribution::Zipfian { n_ports, exponent } => thread_rng() + .sample(Zipf::new(n_ports, exponent).unwrap()) + as usize, + Distribution::Uniform { lower, upper } => { + thread_rng().sample(Uniform::new(lower, upper)) as usize + } + }; - *lower_bound = *upper_bound; - *upper_bound += n_ports as usize; - } + *lower_bound = *upper_bound; + *upper_bound += n_ports as usize; + Worker::Endpoint(EndpointWorker::new( + workload, + cpu, + process, + *lower_bound, + *upper_bound, + )) + } + Workload::Syscalls { .. } => { + Worker::Syscalls(SyscallsWorker::new(workload, cpu, process)) + } + Workload::Network { .. } => { + Worker::Network(NetworkWorker::new(workload, cpu, process)) } - Box::new(EndpointWorker::new( - workload, - cpu, - process, - *lower_bound, - *upper_bound, - )) - } - Workload::Syscalls { .. } => { - Box::new(SyscallsWorker::new(workload, cpu, process)) - } - Workload::Network { .. } => { - Box::new(NetworkWorker::new(workload, cpu, process)) } } } diff --git a/src/worker/network.rs b/src/worker/network.rs index a7fa2fd..ea6f34c 100644 --- a/src/worker/network.rs +++ b/src/worker/network.rs @@ -1,8 +1,5 @@ use core_affinity::CoreId; use log::{debug, info, trace}; -use rand::{thread_rng, Rng}; -use rand_distr::Exp; -use std::collections::HashMap; use std::os::unix::io::AsRawFd; use std::str; use std::time::{SystemTime, UNIX_EPOCH}; @@ -13,7 +10,7 @@ use std::{ thread, }; -use crate::{BaseConfig, Worker, WorkerError, Workload, WorkloadConfig}; +use crate::{BaseConfig, WorkerError, Workload, WorkloadConfig}; use smoltcp::iface::{Config, Interface, SocketSet}; use smoltcp::phy::{ @@ -33,7 +30,7 @@ impl NetworkWorker { pub fn new(workload: WorkloadConfig, cpu: CoreId, process: usize) -> Self { NetworkWorker { config: BaseConfig { cpu, process }, - workload: workload, + workload, } } @@ -157,7 +154,7 @@ impl NetworkWorker { iface.poll(timestamp, &mut device, &mut sockets); // Iterate through all sockets, update the state for each one - for (i, (h, s)) in sockets.iter_mut().enumerate() { + for (i, (_, s)) in sockets.iter_mut().enumerate() { let socket = tcp::Socket::downcast_mut(s) .ok_or(WorkerError::Internal)?; @@ -218,7 +215,7 @@ impl NetworkWorker { addr: Ipv4Address, ) -> (Interface, FaultInjector>, i32) { let device_name = "tun0"; - let device = TunTapInterface::new(&device_name, Medium::Ip).unwrap(); + let device = TunTapInterface::new(device_name, Medium::Ip).unwrap(); let fd = device.as_raw_fd(); let seed = SystemTime::now() @@ -275,12 +272,10 @@ impl NetworkWorker { (((index / 100) + 2) % 255) as u8, ); - return (local_addr, local_port); + (local_addr, local_port) } -} -impl Worker for NetworkWorker { - fn run_payload(&self) -> Result<(), WorkerError> { + pub fn run_payload(&self) -> Result<(), WorkerError> { info!("{self}"); let Workload::Network { diff --git a/src/worker/processes.rs b/src/worker/processes.rs index 3028e87..929f08d 100644 --- a/src/worker/processes.rs +++ b/src/worker/processes.rs @@ -7,7 +7,7 @@ use nix::{sys::wait::waitpid, unistd::Pid}; use rand::{distributions::Alphanumeric, thread_rng, Rng}; use rand_distr::Exp; -use crate::{BaseConfig, Worker, WorkerError, Workload, WorkloadConfig}; +use crate::{BaseConfig, WorkerError, Workload, WorkloadConfig}; #[derive(Debug, Clone, Copy)] pub struct ProcessesWorker { @@ -62,10 +62,8 @@ impl ProcessesWorker { } } } -} -impl Worker for ProcessesWorker { - fn run_payload(&self) -> Result<(), WorkerError> { + pub fn run_payload(&self) -> Result<(), WorkerError> { info!("{self}"); let Workload::Processes { diff --git a/src/worker/syscalls.rs b/src/worker/syscalls.rs index 9290bc3..25965f9 100644 --- a/src/worker/syscalls.rs +++ b/src/worker/syscalls.rs @@ -6,7 +6,7 @@ use rand::{thread_rng, Rng}; use rand_distr::Exp; use syscalls::{syscall, Sysno}; -use crate::{BaseConfig, Worker, WorkerError, Workload, WorkloadConfig}; +use crate::{BaseConfig, WorkerError, Workload, WorkloadConfig}; #[derive(Debug, Copy, Clone)] pub struct SyscallsWorker { @@ -31,10 +31,8 @@ impl SyscallsWorker { } } } -} -impl Worker for SyscallsWorker { - fn run_payload(&self) -> Result<(), WorkerError> { + pub fn run_payload(&self) -> Result<(), WorkerError> { info!("{self}"); let Workload::Syscalls { arrival_rate } = self.workload.workload else { diff --git a/src/worker/udp.rs b/src/worker/udp.rs new file mode 100644 index 0000000..9f3d514 --- /dev/null +++ b/src/worker/udp.rs @@ -0,0 +1,130 @@ +use std::{ + ffi::CString, fmt::Display, net::SocketAddr, slice::from_raw_parts, + str::from_utf8, +}; + +use libc::{ + addrinfo, c_int, c_void, getaddrinfo, sendto, socket, strerror, strlen, + AF_INET, AF_INET6, SOCK_DGRAM, +}; + +use crate::{Worker, WorkerError}; + +static LOREM_IPSUM: &[u8] = b"Lorem ipsum dolor sit amet, consectetur adipiscing elit. \ +Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. \ +Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. \ +Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. \ +Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n"; + +#[derive(Debug)] +struct Errno { + code: i32, + msg: String, +} + +impl Errno { + fn new() -> Self { + let code = unsafe { *libc::__errno_location() }; + let msg = unsafe { + let m = strerror(code); + let len = strlen(m); + let m: &[u8] = from_raw_parts(m as *mut u8, len); + from_utf8(m).unwrap() + } + .to_string(); + + Errno { code, msg } + } +} + +impl Display for Errno { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Errno { code, msg } = self; + write!(f, "({code}) {msg}") + } +} + +impl From for WorkerError { + fn from(val: Errno) -> Self { + let msg = format!("{val}"); + WorkerError { msg } + } +} + +pub struct UdpClient { + fd: c_int, + target: addrinfo, +} + +impl UdpClient { + pub fn new(addr: &str) -> Self { + let addr: SocketAddr = addr.parse().unwrap(); + let ai_family = if addr.is_ipv4() { AF_INET } else { AF_INET6 }; + let hints = addrinfo { + ai_family, + ai_socktype: SOCK_DGRAM, + ai_flags: 0, + ai_protocol: 0, + ai_addrlen: 0, + ai_addr: std::ptr::null_mut(), + ai_canonname: std::ptr::null_mut(), + ai_next: std::ptr::null_mut(), + }; + + let target = unsafe { + let mut servinfo: *mut addrinfo = std::ptr::null_mut(); + let address = CString::new(addr.ip().to_string()).unwrap(); + let port = CString::new(addr.port().to_string()).unwrap(); + let ret = getaddrinfo( + address.as_ptr(), + port.as_ptr(), + &hints, + &mut servinfo, + ); + + if ret != 0 { + panic!("getaddrinfo failed: {ret}"); + } + servinfo.read() + }; + + let fd = create_socket(addr.is_ipv4()).unwrap(); + UdpClient { fd, target } + } + + fn send_msg(&self, msg: &[u8]) -> Result<(), Errno> { + let ret = unsafe { + sendto( + self.fd, + msg.as_ptr() as *const c_void, + msg.len(), + 0, + self.target.ai_addr, + self.target.ai_addrlen, + ) + }; + + if ret < 0 { + Err(Errno::new()) + } else { + Ok(()) + } + } +} + +impl Worker for UdpClient { + fn run_payload(&self) -> Result<(), crate::WorkerError> { + self.send_msg(LOREM_IPSUM)?; + Ok(()) + } +} + +fn create_socket(is_ipv4: bool) -> Result { + let domain = if is_ipv4 { AF_INET } else { AF_INET6 }; + let fd = unsafe { socket(domain, SOCK_DGRAM, 0) }; + if fd < 0 { + Err(Errno::new()) + } else { + Ok(fd) + } +} From 06a6d2f40d5b41231fd899eba4b245fad2dc26b5 Mon Sep 17 00:00:00 2001 From: Mauro Ezequiel Moltrasio Date: Mon, 7 Oct 2024 12:35:46 +0200 Subject: [PATCH 02/10] Simplify configuration of workers --- src/lib.rs | 281 ++----------------------------------- src/main.rs | 2 +- src/worker/endpoints.rs | 26 ++-- src/worker/mod.rs | 19 +-- src/worker/network.rs | 67 +++++---- src/worker/processes.rs | 31 ++--- src/worker/syscalls.rs | 18 +-- src/worker/udp.rs | 130 ------------------ src/workload.rs | 297 ++++++++++++++++++++++++++++++++++++++++ 9 files changed, 379 insertions(+), 492 deletions(-) delete mode 100644 src/worker/udp.rs create mode 100644 src/workload.rs diff --git a/src/lib.rs b/src/lib.rs index bee3d6c..1f08312 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,130 +9,13 @@ use nix::{ }, unistd::Pid, }; -use serde::Deserialize; use std::{fmt::Display, thread}; +use workload::WorkloadConfig; use crate::worker::Worker; pub mod worker; - -/// Main workload configuration, contains general bits for all types of -/// workloads plus workload specific data. -#[derive(Debug, Copy, Clone, Deserialize)] -pub struct WorkloadConfig { - /// An amount of time for workload payload to run before restarting. - pub restart_interval: u64, - - /// Controls per-core mode to handle number of workers. If per-core mode - /// is enabled, `workers` will be treated as a number of workers per CPU - /// core. Otherwise it will be treated as a total number of workers. - #[serde(default = "default_per_core")] - pub per_core: bool, - - /// How many workers to spin, depending on `per_core` in either per-core - /// or total mode. - #[serde(default = "default_workers")] - pub workers: usize, - - /// Custom workload configuration. - pub workload: Workload, - - /// For how long to run the worker. Default value is zero, meaning no limit. - #[serde(default = "default_duration")] - pub duration: u64, -} - -fn default_workers() -> usize { - 1 -} - -fn default_per_core() -> bool { - true -} - -fn default_duration() -> u64 { - 0 -} - -/// Workload specific configuration, contains one enum value for each -/// workload type. -#[derive(Debug, Copy, Clone, Deserialize)] -#[serde(rename_all = "lowercase", tag = "type")] -pub enum Workload { - /// How to listen on ports. - Endpoints { - /// Governing the number of ports open. - #[serde(flatten)] - distribution: Distribution, - }, - - /// How to spawn processes. - Processes { - /// How often a new process will be spawn. - arrival_rate: f64, - - /// How long processes are going to live. - departure_rate: f64, - - /// Spawn a new process with random arguments. - random_process: bool, - }, - - /// How to invoke syscalls - Syscalls { - /// How often to invoke a syscall. - arrival_rate: f64, - }, - - /// How to open network connections - Network { - /// Whether the instance functions as a server or client - server: bool, - - /// Which ip address to use for the server to listen on, - /// or for the client to connect to - address: (u8, u8, u8, u8), - - /// Port for the server to listen on, or for the client - /// to connect to. - target_port: u16, - - /// Rate of opening new connections - arrival_rate: f64, - - /// Rate of closing connections - departure_rate: f64, - - /// Starting number of connections - nconnections: u32, - - /// How often send data via new connections, in milliseconds. - /// The interval is applied for all connections, e.g. an interval - /// of 100 ms for 100 connections means that every 100 ms one out - /// of 100 connections will be allowed to send some data. - /// This parameter allows to control the overhead of sending data, - /// so that it will not impact connections monitoring. - #[serde(default = "default_network_send_interval")] - send_interval: u128, - }, -} - -fn default_network_send_interval() -> u128 { - 100 -} - -/// Distribution for number of ports to listen on -#[derive(Debug, Copy, Clone, Deserialize)] -#[serde(tag = "distribution")] -pub enum Distribution { - /// Few processes are opening large number of ports, the rest are only few. - #[serde(alias = "zipf")] - Zipfian { n_ports: u64, exponent: f64 }, - - /// Every process opens more or less the same number of ports. - #[serde(alias = "uniform")] - Uniform { lower: u64, upper: u64 }, -} +pub mod workload; #[derive(Debug)] pub enum WorkerError { @@ -173,8 +56,13 @@ pub fn run(config: WorkloadConfig) { let handles: Vec<_> = iproduct!(core_ids.into_iter(), 0..config.workers) .map(|(cpu, process)| { - let worker = - Worker::new(config, cpu, process, &mut lower, &mut upper); + let worker = Worker::new( + config.clone(), + cpu, + process, + &mut lower, + &mut upper, + ); match fork() { Ok(Fork::Parent(child)) => { @@ -235,154 +123,3 @@ pub fn run(config: WorkloadConfig) { }); }); } - -#[cfg(test)] -mod tests { - use super::*; - use config::{Config, File, FileFormat}; - - #[test] - fn test_processes() { - let input = r#" - restart_interval = 10 - - [workload] - type = "processes" - arrival_rate = 10.0 - departure_rate = 200.0 - random_process = true - "#; - - let config = Config::builder() - .add_source(File::from_str(input, FileFormat::Toml)) - .build() - .expect("failed to parse configuration") - .try_deserialize::() - .expect("failed to deserialize into WorkloadConfig"); - - let WorkloadConfig { - restart_interval, - workload, - .. - } = config; - assert_eq!(restart_interval, 10); - if let Workload::Processes { - arrival_rate, - departure_rate, - random_process, - } = workload - { - assert_eq!(arrival_rate, 10.0); - assert_eq!(departure_rate, 200.0); - assert!(random_process); - } else { - panic!("wrong workload type found"); - } - } - - #[test] - fn test_endpoints_zipf() { - let input = r#" - restart_interval = 10 - - [workload] - type = "endpoints" - distribution = "zipf" - n_ports = 200 - exponent = 1.4 - "#; - - let config = Config::builder() - .add_source(File::from_str(input, FileFormat::Toml)) - .build() - .expect("failed to parse configuration") - .try_deserialize::() - .expect("failed to deserialize into WorkloadConfig"); - - let WorkloadConfig { - restart_interval, - workload, - .. - } = config; - assert_eq!(restart_interval, 10); - - if let Workload::Endpoints { distribution, .. } = workload { - if let Distribution::Zipfian { n_ports, exponent } = distribution { - assert_eq!(n_ports, 200); - assert_eq!(exponent, 1.4); - } else { - panic!("wrong distribution type found"); - } - } else { - panic!("wrong workload type found"); - } - } - - #[test] - fn test_endpoints_uniform() { - let input = r#" - restart_interval = 10 - - [workload] - type = "endpoints" - distribution = "uniform" - upper = 100 - lower = 1 - "#; - - let config = Config::builder() - .add_source(File::from_str(input, FileFormat::Toml)) - .build() - .expect("failed to parse configuration") - .try_deserialize::() - .expect("failed to deserialize into WorkloadConfig"); - - let WorkloadConfig { - restart_interval, - workload, - .. - } = config; - assert_eq!(restart_interval, 10); - - if let Workload::Endpoints { distribution } = workload { - if let Distribution::Uniform { lower, upper } = distribution { - assert_eq!(lower, 1); - assert_eq!(upper, 100); - } else { - panic!("wrong distribution type found"); - } - } else { - panic!("wrong workload type found"); - } - } - - #[test] - fn test_syscalls() { - let input = r#" - restart_interval = 10 - - [workload] - type = "syscalls" - arrival_rate = 10.0 - "#; - - let config = Config::builder() - .add_source(File::from_str(input, FileFormat::Toml)) - .build() - .expect("failed to parse configuration") - .try_deserialize::() - .expect("failed to deserialize into WorkloadConfig"); - - let WorkloadConfig { - restart_interval, - workload, - .. - } = config; - assert_eq!(restart_interval, 10); - if let Workload::Syscalls { arrival_rate } = workload { - assert_eq!(arrival_rate, 10.0); - } else { - panic!("wrong workload type found"); - } - } -} diff --git a/src/main.rs b/src/main.rs index e05b93f..5588a70 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,7 +19,7 @@ extern crate core_affinity; use config::Config; -use berserker::WorkloadConfig; +use berserker::workload::WorkloadConfig; fn main() { let args: Vec = std::env::args().collect(); diff --git a/src/worker/endpoints.rs b/src/worker/endpoints.rs index 903e2f9..3a9ad0f 100644 --- a/src/worker/endpoints.rs +++ b/src/worker/endpoints.rs @@ -5,17 +5,13 @@ use log::info; use crate::{BaseConfig, WorkerError, WorkloadConfig}; -struct EndpointWorkload { +pub struct EndpointWorker { + config: BaseConfig, restart_interval: u64, lower: usize, upper: usize, } -pub struct EndpointWorker { - config: BaseConfig, - workload: EndpointWorkload, -} - impl EndpointWorker { pub fn new( workload: WorkloadConfig, @@ -34,24 +30,18 @@ impl EndpointWorker { EndpointWorker { config: BaseConfig { cpu, process }, - workload: EndpointWorkload { - restart_interval, - lower, - upper, - }, + restart_interval, + lower, + upper, } } pub fn run_payload(&self) -> Result<(), WorkerError> { info!("{self}"); - let EndpointWorkload { - restart_interval, - lower, - upper, - } = self.workload; - - let listeners: Vec<_> = (lower..upper) + // Copy the u64 to prevent moving self into the thread. + let restart_interval = self.restart_interval; + let listeners: Vec<_> = (self.lower..self.upper) .map(|port| thread::spawn(move || listen(port, restart_interval))) .collect(); diff --git a/src/worker/mod.rs b/src/worker/mod.rs index f68430d..5089da7 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -2,7 +2,10 @@ use core_affinity::CoreId; use rand::{thread_rng, Rng}; use rand_distr::{Uniform, Zipf}; -use crate::{Distribution, WorkerError, Workload, WorkloadConfig}; +use crate::{ + workload::{Distribution, Endpoints, Workload}, + WorkerError, WorkloadConfig, +}; use self::{ endpoints::EndpointWorker, network::NetworkWorker, @@ -39,10 +42,10 @@ impl Worker { upper_bound: &mut usize, ) -> Worker { match workload.workload { - Workload::Processes { .. } => { - Worker::Process(ProcessesWorker::new(workload, cpu, process)) + Workload::Processes(processes) => { + Worker::Process(ProcessesWorker::new(processes, cpu, process)) } - Workload::Endpoints { distribution } => { + Workload::Endpoints(Endpoints { distribution }) => { let n_ports: usize = match distribution { Distribution::Zipfian { n_ports, exponent } => thread_rng() .sample(Zipf::new(n_ports, exponent).unwrap()) @@ -62,11 +65,11 @@ impl Worker { *upper_bound, )) } - Workload::Syscalls { .. } => { - Worker::Syscalls(SyscallsWorker::new(workload, cpu, process)) + Workload::Syscalls(syscalls) => { + Worker::Syscalls(SyscallsWorker::new(syscalls, cpu, process)) } - Workload::Network { .. } => { - Worker::Network(NetworkWorker::new(workload, cpu, process)) + Workload::Network(network) => { + Worker::Network(NetworkWorker::new(network, cpu, process)) } } } diff --git a/src/worker/network.rs b/src/worker/network.rs index ea6f34c..1330f40 100644 --- a/src/worker/network.rs +++ b/src/worker/network.rs @@ -10,7 +10,7 @@ use std::{ thread, }; -use crate::{BaseConfig, WorkerError, Workload, WorkloadConfig}; +use crate::{workload, BaseConfig, WorkerError}; use smoltcp::iface::{Config, Interface, SocketSet}; use smoltcp::phy::{ @@ -23,14 +23,36 @@ use smoltcp::wire::{EthernetAddress, IpAddress, IpCidr, Ipv4Address}; pub struct NetworkWorker { config: BaseConfig, - workload: WorkloadConfig, + server: bool, + address: (u8, u8, u8, u8), + target_port: u16, + nconnections: u32, + send_interval: u128, } impl NetworkWorker { - pub fn new(workload: WorkloadConfig, cpu: CoreId, process: usize) -> Self { + pub fn new( + workload: workload::Network, + cpu: CoreId, + process: usize, + ) -> Self { + let workload::Network { + server, + address, + target_port, + arrival_rate: _, + departure_rate: _, + nconnections, + send_interval, + } = workload; + NetworkWorker { config: BaseConfig { cpu, process }, - workload, + server, + address, + target_port, + nconnections, + send_interval, } } @@ -96,19 +118,6 @@ impl NetworkWorker { addr: Ipv4Address, target_port: u16, ) -> Result<(), WorkerError> { - let Workload::Network { - server: _, - address: _, - target_port: _, - arrival_rate: _, - departure_rate: _, - nconnections, - send_interval, - } = self.workload.workload - else { - unreachable!() - }; - debug!("Starting client at {:?}:{:?}", addr, target_port); let (mut iface, mut device, fd) = self.setup_tuntap(addr); @@ -118,7 +127,7 @@ impl NetworkWorker { // the whole run let mut sockets = SocketSet::new(vec![]); - for _i in 0..nconnections { + for _i in 0..self.nconnections { let tcp_rx_buffer = tcp::SocketBuffer::new(vec![0; 1024]); let tcp_tx_buffer = tcp::SocketBuffer::new(vec![0; 1024]); let tcp_socket = tcp::Socket::new(tcp_rx_buffer, tcp_tx_buffer); @@ -179,7 +188,7 @@ impl NetworkWorker { // purpose is to excercise connection monitoring. // Sending data too frequently we risk producing too much // load and making connetion monitoring less reliable. - if elapsed > send_interval { + if elapsed > self.send_interval { // reset the timer send_timer = SystemTime::now(); @@ -278,25 +287,13 @@ impl NetworkWorker { pub fn run_payload(&self) -> Result<(), WorkerError> { info!("{self}"); - let Workload::Network { - server, - address, - target_port, - arrival_rate: _, - departure_rate: _, - nconnections: _, - send_interval: _, - } = self.workload.workload - else { - unreachable!() - }; - + let address = self.address; let ip_addr = Ipv4Address([address.0, address.1, address.2, address.3]); - if server { - let _ = self.start_server(ip_addr, target_port); + if self.server { + let _ = self.start_server(ip_addr, self.target_port); } else { - let _ = self.start_client(ip_addr, target_port); + let _ = self.start_client(ip_addr, self.target_port); } Ok(()) diff --git a/src/worker/processes.rs b/src/worker/processes.rs index 929f08d..1603f7e 100644 --- a/src/worker/processes.rs +++ b/src/worker/processes.rs @@ -7,16 +7,20 @@ use nix::{sys::wait::waitpid, unistd::Pid}; use rand::{distributions::Alphanumeric, thread_rng, Rng}; use rand_distr::Exp; -use crate::{BaseConfig, WorkerError, Workload, WorkloadConfig}; +use crate::{workload, BaseConfig, WorkerError}; -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] pub struct ProcessesWorker { config: BaseConfig, - workload: WorkloadConfig, + workload: workload::Processes, } impl ProcessesWorker { - pub fn new(workload: WorkloadConfig, cpu: CoreId, process: usize) -> Self { + pub fn new( + workload: workload::Processes, + cpu: CoreId, + process: usize, + ) -> Self { ProcessesWorker { config: BaseConfig { cpu, process }, workload, @@ -24,17 +28,9 @@ impl ProcessesWorker { } fn spawn_process(&self, lifetime: u64) -> Result<(), WorkerError> { - let Workload::Processes { - arrival_rate: _, - departure_rate: _, - random_process, - } = self.workload.workload - else { - unreachable!() - }; let BaseConfig { cpu, process } = self.config; - if random_process { + if self.workload.random_process { let uniq_arg: String = rand::thread_rng() .sample_iter(&Alphanumeric) .take(7) @@ -66,20 +62,17 @@ impl ProcessesWorker { pub fn run_payload(&self) -> Result<(), WorkerError> { info!("{self}"); - let Workload::Processes { + let workload::Processes { arrival_rate, departure_rate, random_process: _, - } = self.workload.workload - else { - unreachable!() - }; + } = self.workload; loop { let lifetime: f64 = thread_rng().sample(Exp::new(departure_rate).unwrap()); - let worker = *self; + let worker = self.clone(); thread::spawn(move || { worker.spawn_process((lifetime * 1000.0).round() as u64) }); diff --git a/src/worker/syscalls.rs b/src/worker/syscalls.rs index 25965f9..7abe999 100644 --- a/src/worker/syscalls.rs +++ b/src/worker/syscalls.rs @@ -6,19 +6,23 @@ use rand::{thread_rng, Rng}; use rand_distr::Exp; use syscalls::{syscall, Sysno}; -use crate::{BaseConfig, WorkerError, Workload, WorkloadConfig}; +use crate::{workload, BaseConfig, WorkerError}; #[derive(Debug, Copy, Clone)] pub struct SyscallsWorker { config: BaseConfig, - workload: WorkloadConfig, + arrival_rate: f64, } impl SyscallsWorker { - pub fn new(workload: WorkloadConfig, cpu: CoreId, process: usize) -> Self { + pub fn new( + workload: workload::Syscalls, + cpu: CoreId, + process: usize, + ) -> Self { SyscallsWorker { config: BaseConfig { cpu, process }, - workload, + arrival_rate: workload.arrival_rate, } } @@ -35,10 +39,6 @@ impl SyscallsWorker { pub fn run_payload(&self) -> Result<(), WorkerError> { info!("{self}"); - let Workload::Syscalls { arrival_rate } = self.workload.workload else { - unreachable!() - }; - loop { let worker = *self; thread::spawn(move || { @@ -46,7 +46,7 @@ impl SyscallsWorker { }); let interval: f64 = - thread_rng().sample(Exp::new(arrival_rate).unwrap()); + thread_rng().sample(Exp::new(self.arrival_rate).unwrap()); info!( "{}-{}: Interval {}, rounded {}", self.config.cpu.id, diff --git a/src/worker/udp.rs b/src/worker/udp.rs deleted file mode 100644 index 9f3d514..0000000 --- a/src/worker/udp.rs +++ /dev/null @@ -1,130 +0,0 @@ -use std::{ - ffi::CString, fmt::Display, net::SocketAddr, slice::from_raw_parts, - str::from_utf8, -}; - -use libc::{ - addrinfo, c_int, c_void, getaddrinfo, sendto, socket, strerror, strlen, - AF_INET, AF_INET6, SOCK_DGRAM, -}; - -use crate::{Worker, WorkerError}; - -static LOREM_IPSUM: &[u8] = b"Lorem ipsum dolor sit amet, consectetur adipiscing elit. \ -Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. \ -Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. \ -Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. \ -Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n"; - -#[derive(Debug)] -struct Errno { - code: i32, - msg: String, -} - -impl Errno { - fn new() -> Self { - let code = unsafe { *libc::__errno_location() }; - let msg = unsafe { - let m = strerror(code); - let len = strlen(m); - let m: &[u8] = from_raw_parts(m as *mut u8, len); - from_utf8(m).unwrap() - } - .to_string(); - - Errno { code, msg } - } -} - -impl Display for Errno { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let Errno { code, msg } = self; - write!(f, "({code}) {msg}") - } -} - -impl From for WorkerError { - fn from(val: Errno) -> Self { - let msg = format!("{val}"); - WorkerError { msg } - } -} - -pub struct UdpClient { - fd: c_int, - target: addrinfo, -} - -impl UdpClient { - pub fn new(addr: &str) -> Self { - let addr: SocketAddr = addr.parse().unwrap(); - let ai_family = if addr.is_ipv4() { AF_INET } else { AF_INET6 }; - let hints = addrinfo { - ai_family, - ai_socktype: SOCK_DGRAM, - ai_flags: 0, - ai_protocol: 0, - ai_addrlen: 0, - ai_addr: std::ptr::null_mut(), - ai_canonname: std::ptr::null_mut(), - ai_next: std::ptr::null_mut(), - }; - - let target = unsafe { - let mut servinfo: *mut addrinfo = std::ptr::null_mut(); - let address = CString::new(addr.ip().to_string()).unwrap(); - let port = CString::new(addr.port().to_string()).unwrap(); - let ret = getaddrinfo( - address.as_ptr(), - port.as_ptr(), - &hints, - &mut servinfo, - ); - - if ret != 0 { - panic!("getaddrinfo failed: {ret}"); - } - servinfo.read() - }; - - let fd = create_socket(addr.is_ipv4()).unwrap(); - UdpClient { fd, target } - } - - fn send_msg(&self, msg: &[u8]) -> Result<(), Errno> { - let ret = unsafe { - sendto( - self.fd, - msg.as_ptr() as *const c_void, - msg.len(), - 0, - self.target.ai_addr, - self.target.ai_addrlen, - ) - }; - - if ret < 0 { - Err(Errno::new()) - } else { - Ok(()) - } - } -} - -impl Worker for UdpClient { - fn run_payload(&self) -> Result<(), crate::WorkerError> { - self.send_msg(LOREM_IPSUM)?; - Ok(()) - } -} - -fn create_socket(is_ipv4: bool) -> Result { - let domain = if is_ipv4 { AF_INET } else { AF_INET6 }; - let fd = unsafe { socket(domain, SOCK_DGRAM, 0) }; - if fd < 0 { - Err(Errno::new()) - } else { - Ok(fd) - } -} diff --git a/src/workload.rs b/src/workload.rs new file mode 100644 index 0000000..898089b --- /dev/null +++ b/src/workload.rs @@ -0,0 +1,297 @@ +use std::fmt::Display; + +use serde::Deserialize; + +/// Main workload configuration, contains general bits for all types of +/// workloads plus workload specific data. +#[derive(Debug, Clone, Deserialize)] +pub struct WorkloadConfig { + /// An amount of time for workload payload to run before restarting. + pub restart_interval: u64, + + /// Controls per-core mode to handle number of workers. If per-core mode + /// is enabled, `workers` will be treated as a number of workers per CPU + /// core. Otherwise it will be treated as a total number of workers. + #[serde(default = "default_per_core")] + pub per_core: bool, + + /// How many workers to spin, depending on `per_core` in either per-core + /// or total mode. + #[serde(default = "default_workers")] + pub workers: usize, + + /// Custom workload configuration. + pub workload: Workload, + + /// For how long to run the worker. Default value is zero, meaning no limit. + #[serde(default = "default_duration")] + pub duration: u64, +} + +fn default_workers() -> usize { + 1 +} + +fn default_per_core() -> bool { + true +} + +fn default_duration() -> u64 { + 0 +} + +/// Workload specific configuration, contains one enum value for each +/// workload type. +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "lowercase", tag = "type")] +pub enum Workload { + /// How to listen on ports. + Endpoints(Endpoints), + + /// How to spawn processes. + Processes(Processes), + + /// How to invoke syscalls + Syscalls(Syscalls), + + /// How to open network connections + Network(Network), +} + +#[derive(Debug, Clone, Deserialize)] +pub struct Endpoints { + /// Governing the number of ports open. + #[serde(flatten)] + pub distribution: Distribution, +} + +impl Display for Endpoints { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Using {} distribution", self.distribution) + } +} + +/// Distribution for number of ports to listen on +#[derive(Debug, Copy, Clone, Deserialize)] +#[serde(tag = "distribution")] +pub enum Distribution { + /// Few processes are opening large number of ports, the rest are only few. + #[serde(alias = "zipf")] + Zipfian { n_ports: u64, exponent: f64 }, + + /// Every process opens more or less the same number of ports. + #[serde(alias = "uniform")] + Uniform { lower: u64, upper: u64 }, +} + +impl Display for Distribution { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{self:?}") + } +} + +#[derive(Debug, Clone, Deserialize)] +pub struct Processes { + /// How often a new process will be spawn. + pub arrival_rate: f64, + + /// How long processes are going to live. + pub departure_rate: f64, + + /// Spawn a new process with random arguments. + pub random_process: bool, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct Syscalls { + /// How often to invoke a syscall. + pub arrival_rate: f64, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct Network { + /// Whether the instance functions as a server or client + pub server: bool, + + /// Which ip address to use for the server to listen on, + /// or for the client to connect to + pub address: (u8, u8, u8, u8), + + /// Port for the server to listen on, or for the client + /// to connect to. + pub target_port: u16, + + /// Rate of opening new connections + pub arrival_rate: f64, + + /// Rate of closing connections + pub departure_rate: f64, + + /// Starting number of connections + pub nconnections: u32, + + /// How often send data via new connections, in milliseconds. + /// The interval is applied for all connections, e.g. an interval + /// of 100 ms for 100 connections means that every 100 ms one out + /// of 100 connections will be allowed to send some data. + /// This parameter allows to control the overhead of sending data, + /// so that it will not impact connections monitoring. + #[serde(default = "default_network_send_interval")] + pub send_interval: u128, +} + +fn default_network_send_interval() -> u128 { + 10 +} + +#[cfg(test)] +mod tests { + use super::*; + use config::{Config, File, FileFormat}; + + #[test] + fn test_processes() { + let input = r#" + restart_interval = 10 + + [workload] + type = "processes" + arrival_rate = 10.0 + departure_rate = 200.0 + random_process = true + "#; + + let config = Config::builder() + .add_source(File::from_str(input, FileFormat::Toml)) + .build() + .expect("failed to parse configuration") + .try_deserialize::() + .expect("failed to deserialize into WorkloadConfig"); + + let WorkloadConfig { + restart_interval, + workload, + .. + } = config; + assert_eq!(restart_interval, 10); + if let Workload::Processes(Processes { + arrival_rate, + departure_rate, + random_process, + .. + }) = workload + { + assert_eq!(arrival_rate, 10.0); + assert_eq!(departure_rate, 200.0); + assert!(random_process); + } else { + panic!("wrong workload type found"); + } + } + + #[test] + fn test_endpoints_zipf() { + let input = r#" + restart_interval = 10 + + [workload] + type = "endpoints" + distribution = "zipf" + n_ports = 200 + exponent = 1.4 + "#; + + let config = Config::builder() + .add_source(File::from_str(input, FileFormat::Toml)) + .build() + .expect("failed to parse configuration") + .try_deserialize::() + .expect("failed to deserialize into WorkloadConfig"); + + let WorkloadConfig { + restart_interval, + workload, + .. + } = config; + assert_eq!(restart_interval, 10); + + if let Workload::Endpoints(Endpoints { distribution, .. }) = workload { + if let Distribution::Zipfian { n_ports, exponent } = distribution { + assert_eq!(n_ports, 200); + assert_eq!(exponent, 1.4); + } else { + panic!("wrong distribution type found"); + } + } else { + panic!("wrong workload type found"); + } + } + + #[test] + fn test_endpoints_uniform() { + let input = r#" + restart_interval = 10 + + [workload] + type = "endpoints" + distribution = "uniform" + upper = 100 + lower = 1 + "#; + + let config = Config::builder() + .add_source(File::from_str(input, FileFormat::Toml)) + .build() + .expect("failed to parse configuration") + .try_deserialize::() + .expect("failed to deserialize into WorkloadConfig"); + + let WorkloadConfig { + restart_interval, + workload, + .. + } = config; + assert_eq!(restart_interval, 10); + + if let Workload::Endpoints(Endpoints { distribution, .. }) = workload { + if let Distribution::Uniform { lower, upper } = distribution { + assert_eq!(lower, 1); + assert_eq!(upper, 100); + } else { + panic!("wrong distribution type found"); + } + } else { + panic!("wrong workload type found"); + } + } + + #[test] + fn test_syscalls() { + let input = r#" + restart_interval = 10 + + [workload] + type = "syscalls" + arrival_rate = 10.0 + "#; + + let config = Config::builder() + .add_source(File::from_str(input, FileFormat::Toml)) + .build() + .expect("failed to parse configuration") + .try_deserialize::() + .expect("failed to deserialize into WorkloadConfig"); + + let WorkloadConfig { + restart_interval, + workload, + .. + } = config; + assert_eq!(restart_interval, 10); + if let Workload::Syscalls(Syscalls { arrival_rate, .. }) = workload { + assert_eq!(arrival_rate, 10.0); + } else { + panic!("wrong workload type found"); + } + } +} From 2d4a67ea428d1b7668c1132624bcdfafee2b5c8b Mon Sep 17 00:00:00 2001 From: Mauro Ezequiel Moltrasio Date: Mon, 7 Oct 2024 13:06:41 +0200 Subject: [PATCH 03/10] Simplify port range generation for endpoints --- src/lib.rs | 21 +++++++++++---------- src/worker/endpoints.rs | 40 +++++++++++++++++++++++++++++++--------- src/worker/mod.rs | 17 +++++------------ 3 files changed, 47 insertions(+), 31 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 1f08312..c869d9b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,8 +44,8 @@ impl Display for BaseConfig { pub fn run(config: WorkloadConfig) { let duration_timer = std::time::SystemTime::now(); - let mut lower = 1024; - let mut upper = 1024; + let mut start_port = 1024; + let mut total_ports = 0; let core_ids: Vec = if config.per_core { // Retrieve the IDs of all active CPU cores. @@ -56,13 +56,12 @@ pub fn run(config: WorkloadConfig) { let handles: Vec<_> = iproduct!(core_ids.into_iter(), 0..config.workers) .map(|(cpu, process)| { - let worker = Worker::new( - config.clone(), - cpu, - process, - &mut lower, - &mut upper, - ); + let worker = Worker::new(config.clone(), cpu, process, start_port); + + if let Worker::Endpoint(w) = &worker { + start_port += w.size(); + total_ports += w.size(); + } match fork() { Ok(Fork::Parent(child)) => { @@ -86,7 +85,9 @@ pub fn run(config: WorkloadConfig) { }) .collect(); - info!("In total: {}", upper); + if total_ports != 0 { + info!("In total: {total_ports}"); + } let processes = &handles.clone(); diff --git a/src/worker/endpoints.rs b/src/worker/endpoints.rs index 3a9ad0f..bab733d 100644 --- a/src/worker/endpoints.rs +++ b/src/worker/endpoints.rs @@ -1,15 +1,30 @@ -use std::{fmt::Display, net::TcpListener, thread, time}; +use std::{fmt::Display, net::TcpListener, ops::Range, thread, time}; use core_affinity::CoreId; use log::info; use crate::{BaseConfig, WorkerError, WorkloadConfig}; +struct PortRange { + start: u16, + length: u16, +} + +impl PortRange { + fn new(start: u16, length: u16) -> Self { + PortRange { start, length } + } + + fn get_range(&self) -> Range { + let end = self.start + self.length; + self.start..end + } +} + pub struct EndpointWorker { config: BaseConfig, restart_interval: u64, - lower: usize, - upper: usize, + ports: PortRange, } impl EndpointWorker { @@ -17,8 +32,8 @@ impl EndpointWorker { workload: WorkloadConfig, cpu: CoreId, process: usize, - lower: usize, - upper: usize, + lower: u16, + upper: u16, ) -> Self { let WorkloadConfig { restart_interval, @@ -28,11 +43,12 @@ impl EndpointWorker { duration: _, } = workload; + let ports = PortRange::new(lower, upper - lower); + EndpointWorker { config: BaseConfig { cpu, process }, restart_interval, - lower, - upper, + ports, } } @@ -41,7 +57,9 @@ impl EndpointWorker { // Copy the u64 to prevent moving self into the thread. let restart_interval = self.restart_interval; - let listeners: Vec<_> = (self.lower..self.upper) + let listeners: Vec<_> = self + .ports + .get_range() .map(|port| thread::spawn(move || listen(port, restart_interval))) .collect(); @@ -51,6 +69,10 @@ impl EndpointWorker { Ok(()) } + + pub fn size(&self) -> u16 { + self.ports.length + } } impl Display for EndpointWorker { @@ -59,7 +81,7 @@ impl Display for EndpointWorker { } } -fn listen(port: usize, sleep: u64) -> std::io::Result<()> { +fn listen(port: u16, sleep: u64) -> std::io::Result<()> { let addr = format!("0.0.0.0:{port}"); let listener = TcpListener::bind(addr)?; diff --git a/src/worker/mod.rs b/src/worker/mod.rs index 5089da7..432ab3f 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -38,31 +38,24 @@ impl Worker { workload: WorkloadConfig, cpu: CoreId, process: usize, - lower_bound: &mut usize, - upper_bound: &mut usize, + start_port: u16, ) -> Worker { match workload.workload { Workload::Processes(processes) => { Worker::Process(ProcessesWorker::new(processes, cpu, process)) } Workload::Endpoints(Endpoints { distribution }) => { - let n_ports: usize = match distribution { + let n_ports: u16 = match distribution { Distribution::Zipfian { n_ports, exponent } => thread_rng() .sample(Zipf::new(n_ports, exponent).unwrap()) - as usize, + as u16, Distribution::Uniform { lower, upper } => { - thread_rng().sample(Uniform::new(lower, upper)) as usize + thread_rng().sample(Uniform::new(lower, upper)) as u16 } }; - *lower_bound = *upper_bound; - *upper_bound += n_ports as usize; Worker::Endpoint(EndpointWorker::new( - workload, - cpu, - process, - *lower_bound, - *upper_bound, + workload, cpu, process, start_port, n_ports, )) } Workload::Syscalls(syscalls) => { From 606ff434545386c41c910886a6432b0a50eb20db Mon Sep 17 00:00:00 2001 From: Mauro Ezequiel Moltrasio Date: Mon, 7 Oct 2024 13:16:37 +0200 Subject: [PATCH 04/10] Move restart_interval to endpoints configuration --- src/worker/endpoints.rs | 12 ++----- src/worker/mod.rs | 11 ++++-- src/workload.rs | 57 ++++++++++++-------------------- workloads/endpoints-uniform.toml | 3 +- workloads/endpoints-zipf.toml | 3 +- workloads/network.toml | 2 -- workloads/processes.toml | 2 -- workloads/syscalls.toml | 2 -- 8 files changed, 34 insertions(+), 58 deletions(-) diff --git a/src/worker/endpoints.rs b/src/worker/endpoints.rs index bab733d..63c49df 100644 --- a/src/worker/endpoints.rs +++ b/src/worker/endpoints.rs @@ -3,7 +3,7 @@ use std::{fmt::Display, net::TcpListener, ops::Range, thread, time}; use core_affinity::CoreId; use log::info; -use crate::{BaseConfig, WorkerError, WorkloadConfig}; +use crate::{BaseConfig, WorkerError}; struct PortRange { start: u16, @@ -29,20 +29,12 @@ pub struct EndpointWorker { impl EndpointWorker { pub fn new( - workload: WorkloadConfig, cpu: CoreId, process: usize, + restart_interval: u64, lower: u16, upper: u16, ) -> Self { - let WorkloadConfig { - restart_interval, - workload: _, - per_core: _, - workers: _, - duration: _, - } = workload; - let ports = PortRange::new(lower, upper - lower); EndpointWorker { diff --git a/src/worker/mod.rs b/src/worker/mod.rs index 432ab3f..a92e2fc 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -44,7 +44,10 @@ impl Worker { Workload::Processes(processes) => { Worker::Process(ProcessesWorker::new(processes, cpu, process)) } - Workload::Endpoints(Endpoints { distribution }) => { + Workload::Endpoints(Endpoints { + restart_interval, + distribution, + }) => { let n_ports: u16 = match distribution { Distribution::Zipfian { n_ports, exponent } => thread_rng() .sample(Zipf::new(n_ports, exponent).unwrap()) @@ -55,7 +58,11 @@ impl Worker { }; Worker::Endpoint(EndpointWorker::new( - workload, cpu, process, start_port, n_ports, + cpu, + process, + restart_interval, + start_port, + n_ports, )) } Workload::Syscalls(syscalls) => { diff --git a/src/workload.rs b/src/workload.rs index 898089b..123139e 100644 --- a/src/workload.rs +++ b/src/workload.rs @@ -6,9 +6,6 @@ use serde::Deserialize; /// workloads plus workload specific data. #[derive(Debug, Clone, Deserialize)] pub struct WorkloadConfig { - /// An amount of time for workload payload to run before restarting. - pub restart_interval: u64, - /// Controls per-core mode to handle number of workers. If per-core mode /// is enabled, `workers` will be treated as a number of workers per CPU /// core. Otherwise it will be treated as a total number of workers. @@ -60,6 +57,9 @@ pub enum Workload { #[derive(Debug, Clone, Deserialize)] pub struct Endpoints { + /// An amount of time for the workload to run before restarting + pub restart_interval: u64, + /// Governing the number of ports open. #[serde(flatten)] pub distribution: Distribution, @@ -152,8 +152,6 @@ mod tests { #[test] fn test_processes() { let input = r#" - restart_interval = 10 - [workload] type = "processes" arrival_rate = 10.0 @@ -168,12 +166,7 @@ mod tests { .try_deserialize::() .expect("failed to deserialize into WorkloadConfig"); - let WorkloadConfig { - restart_interval, - workload, - .. - } = config; - assert_eq!(restart_interval, 10); + let WorkloadConfig { workload, .. } = config; if let Workload::Processes(Processes { arrival_rate, departure_rate, @@ -192,10 +185,9 @@ mod tests { #[test] fn test_endpoints_zipf() { let input = r#" - restart_interval = 10 - [workload] type = "endpoints" + restart_interval = 10 distribution = "zipf" n_ports = 200 exponent = 1.4 @@ -208,14 +200,15 @@ mod tests { .try_deserialize::() .expect("failed to deserialize into WorkloadConfig"); - let WorkloadConfig { + let WorkloadConfig { workload, .. } = config; + + if let Workload::Endpoints(Endpoints { restart_interval, - workload, - .. - } = config; - assert_eq!(restart_interval, 10); + distribution, + }) = workload + { + assert_eq!(restart_interval, 10); - if let Workload::Endpoints(Endpoints { distribution, .. }) = workload { if let Distribution::Zipfian { n_ports, exponent } = distribution { assert_eq!(n_ports, 200); assert_eq!(exponent, 1.4); @@ -230,10 +223,9 @@ mod tests { #[test] fn test_endpoints_uniform() { let input = r#" - restart_interval = 10 - [workload] type = "endpoints" + restart_interval = 10 distribution = "uniform" upper = 100 lower = 1 @@ -246,14 +238,14 @@ mod tests { .try_deserialize::() .expect("failed to deserialize into WorkloadConfig"); - let WorkloadConfig { - restart_interval, - workload, - .. - } = config; - assert_eq!(restart_interval, 10); + let WorkloadConfig { workload, .. } = config; - if let Workload::Endpoints(Endpoints { distribution, .. }) = workload { + if let Workload::Endpoints(Endpoints { + restart_interval, + distribution, + }) = workload + { + assert_eq!(restart_interval, 10); if let Distribution::Uniform { lower, upper } = distribution { assert_eq!(lower, 1); assert_eq!(upper, 100); @@ -268,8 +260,6 @@ mod tests { #[test] fn test_syscalls() { let input = r#" - restart_interval = 10 - [workload] type = "syscalls" arrival_rate = 10.0 @@ -282,12 +272,7 @@ mod tests { .try_deserialize::() .expect("failed to deserialize into WorkloadConfig"); - let WorkloadConfig { - restart_interval, - workload, - .. - } = config; - assert_eq!(restart_interval, 10); + let WorkloadConfig { workload, .. } = config; if let Workload::Syscalls(Syscalls { arrival_rate, .. }) = workload { assert_eq!(arrival_rate, 10.0); } else { diff --git a/workloads/endpoints-uniform.toml b/workloads/endpoints-uniform.toml index 197ba6e..3cf9a47 100644 --- a/workloads/endpoints-uniform.toml +++ b/workloads/endpoints-uniform.toml @@ -1,7 +1,6 @@ -restart_interval = 10 - [workload] type = "endpoints" +restart_interval = 10 distribution = "uniform" upper = 100 lower = 1 diff --git a/workloads/endpoints-zipf.toml b/workloads/endpoints-zipf.toml index 8680572..42d2fa4 100644 --- a/workloads/endpoints-zipf.toml +++ b/workloads/endpoints-zipf.toml @@ -1,7 +1,6 @@ -restart_interval = 10 - [workload] type = "endpoints" +restart_interval = 10 distribution = "zipf" n_ports = 200 exponent = 1.4 diff --git a/workloads/network.toml b/workloads/network.toml index 2f338c6..d01c1ff 100644 --- a/workloads/network.toml +++ b/workloads/network.toml @@ -1,5 +1,3 @@ -restart_interval = 10 - [workload] type = "network" server = false diff --git a/workloads/processes.toml b/workloads/processes.toml index f161df0..b265d47 100644 --- a/workloads/processes.toml +++ b/workloads/processes.toml @@ -1,5 +1,3 @@ -restart_interval = 10 - [workload] type = "processes" arrival_rate = 10.0 diff --git a/workloads/syscalls.toml b/workloads/syscalls.toml index 0cbdcfc..b0c99ac 100644 --- a/workloads/syscalls.toml +++ b/workloads/syscalls.toml @@ -1,5 +1,3 @@ -restart_interval = 10 - [workload] type = "syscalls" arrival_rate = 10.0 From 6225f945ab5133d32282fb55195a9470595340d5 Mon Sep 17 00:00:00 2001 From: Mauro Ezequiel Moltrasio Date: Mon, 7 Oct 2024 13:32:45 +0200 Subject: [PATCH 05/10] Prevent cloning vectors of handles --- src/lib.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index c869d9b..fe50318 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,7 +9,7 @@ use nix::{ }, unistd::Pid, }; -use std::{fmt::Display, thread}; +use std::{fmt::Display, sync::Arc, thread}; use workload::WorkloadConfig; use crate::worker::Worker; @@ -89,17 +89,20 @@ pub fn run(config: WorkloadConfig) { info!("In total: {total_ports}"); } - let processes = &handles.clone(); + let handles = Arc::new(handles); thread::scope(|s| { if config.duration != 0 { + // Cloning the Arc so we can hand it over to the watcher thread + let handles = handles.clone(); + // Spin a watcher thread s.spawn(move || loop { thread::sleep(std::time::Duration::from_secs(1)); let elapsed = duration_timer.elapsed().unwrap().as_secs(); if elapsed > config.duration { - for handle in processes.iter().flatten() { + for handle in handles.iter().flatten() { info!("Terminating: {}", *handle); match kill(Pid::from_raw(*handle), Signal::SIGTERM) { Ok(()) => { @@ -117,7 +120,7 @@ pub fn run(config: WorkloadConfig) { } s.spawn(move || { - for handle in processes.iter().flatten() { + for handle in handles.iter().flatten() { info!("waitpid: {}", *handle); waitpid(Pid::from_raw(*handle), None).unwrap(); } From 93b628d44651c9f71d46be51df783447a74c78ce Mon Sep 17 00:00:00 2001 From: Mauro Ezequiel Moltrasio Date: Mon, 7 Oct 2024 13:36:54 +0200 Subject: [PATCH 06/10] Fix EndpointWorker::new --- src/worker/endpoints.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/worker/endpoints.rs b/src/worker/endpoints.rs index 63c49df..6cdb2b9 100644 --- a/src/worker/endpoints.rs +++ b/src/worker/endpoints.rs @@ -32,10 +32,10 @@ impl EndpointWorker { cpu: CoreId, process: usize, restart_interval: u64, - lower: u16, - upper: u16, + start_port: u16, + n_ports: u16, ) -> Self { - let ports = PortRange::new(lower, upper - lower); + let ports = PortRange::new(start_port, n_ports); EndpointWorker { config: BaseConfig { cpu, process }, From 043d98ba4f79c8ce3e401c449c7053164d901c94 Mon Sep 17 00:00:00 2001 From: Mauro Ezequiel Moltrasio Date: Mon, 7 Oct 2024 13:44:41 +0200 Subject: [PATCH 07/10] Minor simplifications of NetworkWorker --- src/worker/network.rs | 46 ++++++++++++++++++++++--------------------- 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/src/worker/network.rs b/src/worker/network.rs index 1330f40..017839e 100644 --- a/src/worker/network.rs +++ b/src/worker/network.rs @@ -24,7 +24,7 @@ use smoltcp::wire::{EthernetAddress, IpAddress, IpCidr, Ipv4Address}; pub struct NetworkWorker { config: BaseConfig, server: bool, - address: (u8, u8, u8, u8), + address: Ipv4Address, target_port: u16, nconnections: u32, send_interval: u128, @@ -46,6 +46,8 @@ impl NetworkWorker { send_interval, } = workload; + let address = Ipv4Address([address.0, address.1, address.2, address.3]); + NetworkWorker { config: BaseConfig { cpu, process }, server, @@ -59,15 +61,15 @@ impl NetworkWorker { /// Start a simple server. The client side is going to be a networking /// worker as well, so for convenience of troubleshooting do not error /// out if something unexpected happened, log and proceed instead. - fn start_server( - &self, - addr: Ipv4Address, - target_port: u16, - ) -> Result<(), WorkerError> { - debug!("Starting server at {:?}:{:?}", addr, target_port); + fn start_server(&self) -> Result<(), WorkerError> { + debug!( + "Starting server at {:?}:{:?}", + self.address, self.target_port + ); let listener = - TcpListener::bind((addr.to_string(), target_port)).unwrap(); + TcpListener::bind((self.address.to_string(), self.target_port)) + .unwrap(); for stream in listener.incoming() { let mut stream = stream.unwrap(); @@ -113,14 +115,13 @@ impl NetworkWorker { Ok(()) } - fn start_client( - &self, - addr: Ipv4Address, - target_port: u16, - ) -> Result<(), WorkerError> { - debug!("Starting client at {:?}:{:?}", addr, target_port); + fn start_client(&self) -> Result<(), WorkerError> { + debug!( + "Starting client at {:?}:{:?}", + self.address, self.target_port + ); - let (mut iface, mut device, fd) = self.setup_tuntap(addr); + let (mut iface, mut device, fd) = self.setup_tuntap(self.address); let cx = iface.context(); // Open static set of connections, that are going to live throughout @@ -142,10 +143,14 @@ impl NetworkWorker { { let index = i; let (local_addr, local_port) = - self.get_local_addr_port(addr, index); + self.get_local_addr_port(self.address, index); info!("connecting from {}:{}", local_addr, local_port); socket - .connect(cx, (addr, target_port), (local_addr, local_port)) + .connect( + cx, + (self.address, self.target_port), + (local_addr, local_port), + ) .unwrap(); } @@ -287,13 +292,10 @@ impl NetworkWorker { pub fn run_payload(&self) -> Result<(), WorkerError> { info!("{self}"); - let address = self.address; - let ip_addr = Ipv4Address([address.0, address.1, address.2, address.3]); - if self.server { - let _ = self.start_server(ip_addr, self.target_port); + let _ = self.start_server(); } else { - let _ = self.start_client(ip_addr, self.target_port); + let _ = self.start_client(); } Ok(()) From daf609bdb43bb04845ac5a447345723c2ef59233 Mon Sep 17 00:00:00 2001 From: Mauro Ezequiel Moltrasio Date: Mon, 7 Oct 2024 16:05:52 +0200 Subject: [PATCH 08/10] Use BaseConfig in places that make sense --- src/lib.rs | 17 +++++++++-------- src/worker/endpoints.rs | 6 ++---- src/worker/mod.rs | 15 ++++++--------- src/worker/network.rs | 9 ++------- src/worker/processes.rs | 12 ++---------- src/worker/syscalls.rs | 9 ++------- 6 files changed, 23 insertions(+), 45 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index fe50318..93df6a6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -31,7 +31,7 @@ impl Display for WorkerError { /// General information for each worker, on which CPU is it running /// and what is the process number. #[derive(Debug, Copy, Clone)] -struct BaseConfig { +pub struct BaseConfig { cpu: CoreId, process: usize, } @@ -42,21 +42,22 @@ impl Display for BaseConfig { } } -pub fn run(config: WorkloadConfig) { +pub fn run(workload: WorkloadConfig) { let duration_timer = std::time::SystemTime::now(); let mut start_port = 1024; let mut total_ports = 0; - let core_ids: Vec = if config.per_core { + let core_ids: Vec = if workload.per_core { // Retrieve the IDs of all active CPU cores. core_affinity::get_core_ids().unwrap() } else { vec![CoreId { id: 0 }] }; - let handles: Vec<_> = iproduct!(core_ids.into_iter(), 0..config.workers) + let handles: Vec<_> = iproduct!(core_ids.into_iter(), 0..workload.workers) .map(|(cpu, process)| { - let worker = Worker::new(config.clone(), cpu, process, start_port); + let config = BaseConfig { cpu, process }; + let worker = Worker::new(workload.clone(), config, start_port); if let Worker::Endpoint(w) = &worker { start_port += w.size(); @@ -69,7 +70,7 @@ pub fn run(config: WorkloadConfig) { Some(child) } Ok(Fork::Child) => { - if config.per_core { + if workload.per_core { core_affinity::set_for_current(cpu); } @@ -92,7 +93,7 @@ pub fn run(config: WorkloadConfig) { let handles = Arc::new(handles); thread::scope(|s| { - if config.duration != 0 { + if workload.duration != 0 { // Cloning the Arc so we can hand it over to the watcher thread let handles = handles.clone(); @@ -101,7 +102,7 @@ pub fn run(config: WorkloadConfig) { thread::sleep(std::time::Duration::from_secs(1)); let elapsed = duration_timer.elapsed().unwrap().as_secs(); - if elapsed > config.duration { + if elapsed > workload.duration { for handle in handles.iter().flatten() { info!("Terminating: {}", *handle); match kill(Pid::from_raw(*handle), Signal::SIGTERM) { diff --git a/src/worker/endpoints.rs b/src/worker/endpoints.rs index 6cdb2b9..b286b47 100644 --- a/src/worker/endpoints.rs +++ b/src/worker/endpoints.rs @@ -1,6 +1,5 @@ use std::{fmt::Display, net::TcpListener, ops::Range, thread, time}; -use core_affinity::CoreId; use log::info; use crate::{BaseConfig, WorkerError}; @@ -29,8 +28,7 @@ pub struct EndpointWorker { impl EndpointWorker { pub fn new( - cpu: CoreId, - process: usize, + config: BaseConfig, restart_interval: u64, start_port: u16, n_ports: u16, @@ -38,7 +36,7 @@ impl EndpointWorker { let ports = PortRange::new(start_port, n_ports); EndpointWorker { - config: BaseConfig { cpu, process }, + config, restart_interval, ports, } diff --git a/src/worker/mod.rs b/src/worker/mod.rs index a92e2fc..efb49ca 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -1,10 +1,9 @@ -use core_affinity::CoreId; use rand::{thread_rng, Rng}; use rand_distr::{Uniform, Zipf}; use crate::{ workload::{Distribution, Endpoints, Workload}, - WorkerError, WorkloadConfig, + BaseConfig, WorkerError, WorkloadConfig, }; use self::{ @@ -36,13 +35,12 @@ impl Worker { pub fn new( workload: WorkloadConfig, - cpu: CoreId, - process: usize, + base_config: BaseConfig, start_port: u16, ) -> Worker { match workload.workload { Workload::Processes(processes) => { - Worker::Process(ProcessesWorker::new(processes, cpu, process)) + Worker::Process(ProcessesWorker::new(processes, base_config)) } Workload::Endpoints(Endpoints { restart_interval, @@ -58,18 +56,17 @@ impl Worker { }; Worker::Endpoint(EndpointWorker::new( - cpu, - process, + base_config, restart_interval, start_port, n_ports, )) } Workload::Syscalls(syscalls) => { - Worker::Syscalls(SyscallsWorker::new(syscalls, cpu, process)) + Worker::Syscalls(SyscallsWorker::new(syscalls, base_config)) } Workload::Network(network) => { - Worker::Network(NetworkWorker::new(network, cpu, process)) + Worker::Network(NetworkWorker::new(network, base_config)) } } } diff --git a/src/worker/network.rs b/src/worker/network.rs index 017839e..6624ac8 100644 --- a/src/worker/network.rs +++ b/src/worker/network.rs @@ -1,4 +1,3 @@ -use core_affinity::CoreId; use log::{debug, info, trace}; use std::os::unix::io::AsRawFd; use std::str; @@ -31,11 +30,7 @@ pub struct NetworkWorker { } impl NetworkWorker { - pub fn new( - workload: workload::Network, - cpu: CoreId, - process: usize, - ) -> Self { + pub fn new(workload: workload::Network, config: BaseConfig) -> Self { let workload::Network { server, address, @@ -49,7 +44,7 @@ impl NetworkWorker { let address = Ipv4Address([address.0, address.1, address.2, address.3]); NetworkWorker { - config: BaseConfig { cpu, process }, + config, server, address, target_port, diff --git a/src/worker/processes.rs b/src/worker/processes.rs index 1603f7e..8e66008 100644 --- a/src/worker/processes.rs +++ b/src/worker/processes.rs @@ -1,6 +1,5 @@ use std::{fmt::Display, process::Command, thread, time}; -use core_affinity::CoreId; use fork::{fork, Fork}; use log::{info, warn}; use nix::{sys::wait::waitpid, unistd::Pid}; @@ -16,15 +15,8 @@ pub struct ProcessesWorker { } impl ProcessesWorker { - pub fn new( - workload: workload::Processes, - cpu: CoreId, - process: usize, - ) -> Self { - ProcessesWorker { - config: BaseConfig { cpu, process }, - workload, - } + pub fn new(workload: workload::Processes, config: BaseConfig) -> Self { + ProcessesWorker { config, workload } } fn spawn_process(&self, lifetime: u64) -> Result<(), WorkerError> { diff --git a/src/worker/syscalls.rs b/src/worker/syscalls.rs index 7abe999..7b3bc88 100644 --- a/src/worker/syscalls.rs +++ b/src/worker/syscalls.rs @@ -1,6 +1,5 @@ use std::{fmt::Display, thread, time}; -use core_affinity::CoreId; use log::{info, warn}; use rand::{thread_rng, Rng}; use rand_distr::Exp; @@ -15,13 +14,9 @@ pub struct SyscallsWorker { } impl SyscallsWorker { - pub fn new( - workload: workload::Syscalls, - cpu: CoreId, - process: usize, - ) -> Self { + pub fn new(workload: workload::Syscalls, config: BaseConfig) -> Self { SyscallsWorker { - config: BaseConfig { cpu, process }, + config, arrival_rate: workload.arrival_rate, } } From c16e9c7bc1ee55d792e743311251073ff96c4ca1 Mon Sep 17 00:00:00 2001 From: Mauro Ezequiel Moltrasio Date: Mon, 7 Oct 2024 16:16:42 +0200 Subject: [PATCH 09/10] Remove unused fields in NetworkWorker --- src/worker/network.rs | 2 -- src/workload.rs | 6 ------ 2 files changed, 8 deletions(-) diff --git a/src/worker/network.rs b/src/worker/network.rs index 6624ac8..de32405 100644 --- a/src/worker/network.rs +++ b/src/worker/network.rs @@ -35,8 +35,6 @@ impl NetworkWorker { server, address, target_port, - arrival_rate: _, - departure_rate: _, nconnections, send_interval, } = workload; diff --git a/src/workload.rs b/src/workload.rs index 123139e..94b73f0 100644 --- a/src/workload.rs +++ b/src/workload.rs @@ -121,12 +121,6 @@ pub struct Network { /// to connect to. pub target_port: u16, - /// Rate of opening new connections - pub arrival_rate: f64, - - /// Rate of closing connections - pub departure_rate: f64, - /// Starting number of connections pub nconnections: u32, From 277b55cbc21cb2ee533e32e1726edd40131e4937 Mon Sep 17 00:00:00 2001 From: Mauro Ezequiel Moltrasio Date: Mon, 7 Oct 2024 16:29:09 +0200 Subject: [PATCH 10/10] Split workload module --- src/worker/mod.rs | 5 +- src/worker/network.rs | 7 +-- src/workload/endpoints.rs | 38 +++++++++++++ src/{workload.rs => workload/mod.rs} | 80 ++++------------------------ src/workload/network.rs | 31 +++++++++++ 5 files changed, 88 insertions(+), 73 deletions(-) create mode 100644 src/workload/endpoints.rs rename src/{workload.rs => workload/mod.rs} (72%) create mode 100644 src/workload/network.rs diff --git a/src/worker/mod.rs b/src/worker/mod.rs index efb49ca..11a47de 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -2,7 +2,10 @@ use rand::{thread_rng, Rng}; use rand_distr::{Uniform, Zipf}; use crate::{ - workload::{Distribution, Endpoints, Workload}, + workload::{ + endpoints::{Distribution, Endpoints}, + Workload, + }, BaseConfig, WorkerError, WorkloadConfig, }; diff --git a/src/worker/network.rs b/src/worker/network.rs index de32405..f044d98 100644 --- a/src/worker/network.rs +++ b/src/worker/network.rs @@ -9,7 +9,8 @@ use std::{ thread, }; -use crate::{workload, BaseConfig, WorkerError}; +use crate::workload::network::Network; +use crate::{BaseConfig, WorkerError}; use smoltcp::iface::{Config, Interface, SocketSet}; use smoltcp::phy::{ @@ -30,8 +31,8 @@ pub struct NetworkWorker { } impl NetworkWorker { - pub fn new(workload: workload::Network, config: BaseConfig) -> Self { - let workload::Network { + pub fn new(workload: Network, config: BaseConfig) -> Self { + let Network { server, address, target_port, diff --git a/src/workload/endpoints.rs b/src/workload/endpoints.rs new file mode 100644 index 0000000..05fd851 --- /dev/null +++ b/src/workload/endpoints.rs @@ -0,0 +1,38 @@ +use std::fmt::Display; + +use serde::Deserialize; + +#[derive(Debug, Clone, Deserialize)] +pub struct Endpoints { + /// An amount of time for the workload to run before restarting + pub restart_interval: u64, + + /// Governing the number of ports open. + #[serde(flatten)] + pub distribution: Distribution, +} + +impl Display for Endpoints { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Using {} distribution", self.distribution) + } +} + +/// Distribution for number of ports to listen on +#[derive(Debug, Copy, Clone, Deserialize)] +#[serde(tag = "distribution")] +pub enum Distribution { + /// Few processes are opening large number of ports, the rest are only few. + #[serde(alias = "zipf")] + Zipfian { n_ports: u64, exponent: f64 }, + + /// Every process opens more or less the same number of ports. + #[serde(alias = "uniform")] + Uniform { lower: u64, upper: u64 }, +} + +impl Display for Distribution { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{self:?}") + } +} diff --git a/src/workload.rs b/src/workload/mod.rs similarity index 72% rename from src/workload.rs rename to src/workload/mod.rs index 94b73f0..369b7c4 100644 --- a/src/workload.rs +++ b/src/workload/mod.rs @@ -1,7 +1,10 @@ -use std::fmt::Display; - use serde::Deserialize; +use self::{endpoints::Endpoints, network::Network}; + +pub(crate) mod endpoints; +pub(crate) mod network; + /// Main workload configuration, contains general bits for all types of /// workloads plus workload specific data. #[derive(Debug, Clone, Deserialize)] @@ -55,41 +58,6 @@ pub enum Workload { Network(Network), } -#[derive(Debug, Clone, Deserialize)] -pub struct Endpoints { - /// An amount of time for the workload to run before restarting - pub restart_interval: u64, - - /// Governing the number of ports open. - #[serde(flatten)] - pub distribution: Distribution, -} - -impl Display for Endpoints { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Using {} distribution", self.distribution) - } -} - -/// Distribution for number of ports to listen on -#[derive(Debug, Copy, Clone, Deserialize)] -#[serde(tag = "distribution")] -pub enum Distribution { - /// Few processes are opening large number of ports, the rest are only few. - #[serde(alias = "zipf")] - Zipfian { n_ports: u64, exponent: f64 }, - - /// Every process opens more or less the same number of ports. - #[serde(alias = "uniform")] - Uniform { lower: u64, upper: u64 }, -} - -impl Display for Distribution { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{self:?}") - } -} - #[derive(Debug, Clone, Deserialize)] pub struct Processes { /// How often a new process will be spawn. @@ -108,36 +76,6 @@ pub struct Syscalls { pub arrival_rate: f64, } -#[derive(Debug, Clone, Deserialize)] -pub struct Network { - /// Whether the instance functions as a server or client - pub server: bool, - - /// Which ip address to use for the server to listen on, - /// or for the client to connect to - pub address: (u8, u8, u8, u8), - - /// Port for the server to listen on, or for the client - /// to connect to. - pub target_port: u16, - - /// Starting number of connections - pub nconnections: u32, - - /// How often send data via new connections, in milliseconds. - /// The interval is applied for all connections, e.g. an interval - /// of 100 ms for 100 connections means that every 100 ms one out - /// of 100 connections will be allowed to send some data. - /// This parameter allows to control the overhead of sending data, - /// so that it will not impact connections monitoring. - #[serde(default = "default_network_send_interval")] - pub send_interval: u128, -} - -fn default_network_send_interval() -> u128 { - 10 -} - #[cfg(test)] mod tests { use super::*; @@ -203,7 +141,9 @@ mod tests { { assert_eq!(restart_interval, 10); - if let Distribution::Zipfian { n_ports, exponent } = distribution { + if let endpoints::Distribution::Zipfian { n_ports, exponent } = + distribution + { assert_eq!(n_ports, 200); assert_eq!(exponent, 1.4); } else { @@ -240,7 +180,9 @@ mod tests { }) = workload { assert_eq!(restart_interval, 10); - if let Distribution::Uniform { lower, upper } = distribution { + if let endpoints::Distribution::Uniform { lower, upper } = + distribution + { assert_eq!(lower, 1); assert_eq!(upper, 100); } else { diff --git a/src/workload/network.rs b/src/workload/network.rs new file mode 100644 index 0000000..b008987 --- /dev/null +++ b/src/workload/network.rs @@ -0,0 +1,31 @@ +use serde::Deserialize; + +#[derive(Debug, Clone, Deserialize)] +pub struct Network { + /// Whether the instance functions as a server or client + pub server: bool, + + /// Which ip address to use for the server to listen on, + /// or for the client to connect to + pub address: (u8, u8, u8, u8), + + /// Port for the server to listen on, or for the client + /// to connect to. + pub target_port: u16, + + /// Starting number of connections + pub nconnections: u32, + + /// How often send data via new connections, in milliseconds. + /// The interval is applied for all connections, e.g. an interval + /// of 100 ms for 100 connections means that every 100 ms one out + /// of 100 connections will be allowed to send some data. + /// This parameter allows to control the overhead of sending data, + /// so that it will not impact connections monitoring. + #[serde(default = "default_network_send_interval")] + pub send_interval: u128, +} + +fn default_network_send_interval() -> u128 { + 10 +}