Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "berserker"
version = "0.1.0"
edition = "2021"
edition = "2024"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a huge deal but do we need to update the edition? Is this a requirement from one of the new dependencies added?


# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand All @@ -23,3 +23,6 @@ libc = "0.2.169"
smoltcp = "0.12.0"
aya = "0.13.1"
aya-obj = "0.2.1"
caps = "0.5.5"
io-uring = "0.7.10"
enum_dispatch = "0.3.13"
74 changes: 63 additions & 11 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use core_affinity::CoreId;
use serde::Deserialize;
use std::{fmt::Display, net::Ipv4Addr};
use serde::{Deserialize, Deserializer};
use std::{collections::HashMap, fmt::Display, net::Ipv4Addr};
use syscalls::Sysno;

pub mod worker;

/// Main workload configuration, contains general bits for all types of
/// workloads plus workload specific data.
#[derive(Debug, Copy, Clone, Deserialize)]
#[derive(Debug, Clone, Deserialize)]
pub struct WorkloadConfig {
/// An amount of time for workload payload to run before restarting.
pub restart_interval: u64,
Expand Down Expand Up @@ -55,9 +55,33 @@ fn default_syscalls_syscall_nr() -> u32 {
Sysno::getpid as u32
}

fn default_iouring_iouring_nr() -> u8 {
io_uring::opcode::OpenAt::CODE
}

fn deserialize_args<'de, D>(
deserializer: D,
) -> Result<HashMap<String, String>, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
let mut map = HashMap::new();
for arg in s.split(',').filter(|x| !x.is_empty()) {
let parts = arg.split_once('=');
let Some((key, value)) = parts else {
return Err(serde::de::Error::custom(
"invalid syscall arguments format",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to consider adding the offending argument to this string to help debug the error.

));
};
map.insert(key.to_string(), value.to_string());
}
Ok(map)
Comment on lines +68 to +79
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might make the code a bit too dense, but how about doing this all in a single functional operation?

Suggested change
let s = String::deserialize(deserializer)?;
let mut map = HashMap::new();
for arg in s.split(',').filter(|x| !x.is_empty()) {
let parts = arg.split_once('=');
let Some((key, value)) = parts else {
return Err(serde::de::Error::custom(
"invalid syscall arguments format",
));
};
map.insert(key.to_string(), value.to_string());
}
Ok(map)
String::deserialize(deserializer)?
.split(',')
.filter(|x| !x.is_empty())
.map(|arg| match arg.split_once('=') {
Some((key, value)) => Ok((key.to_string(), value.to_string())),
None => Err(serde::de::Error::custom(
"invalid syscall arguments format",
)),
})
.collect::<Result<HashMap<_, _>, _>>()

}

/// Workload specific configuration, contains one enum value for each
/// workload type.
#[derive(Debug, Copy, Clone, Deserialize)]
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "lowercase", tag = "type")]
pub enum Workload {
/// How to listen on ports.
Expand Down Expand Up @@ -92,6 +116,33 @@ pub enum Workload {
/// Which syscall to trigger
#[serde(default = "default_syscalls_syscall_nr")]
syscall_nr: u32,

/// Arguments for syscall in format "arg1=value1,arg2=value2"
#[serde(
deserialize_with = "deserialize_args",
default = "HashMap::new"
)]
syscall_args: HashMap<String, String>,
},

/// How to invoke syscalls
IOUring {
/// How often to invoke a io_uring events.
#[serde(default = "default_syscalls_arrival_rate")]
arrival_rate: f64,

/// Number of io uring event to trigger
/// List of io_uring events can be found at https://github.com/tokio-rs/io-uring/blob/master/src/sys/sys_x86_64.rs
/// or at https://github.com/torvalds/linux/blob/b320789d6883cc00ac78ce83bccbfe7ed58afcf0/include/uapi/linux/io_uring.h
#[serde(default = "default_iouring_iouring_nr")]
iouring_nr: u8,

/// Arguments for io_uring in format "arg1=value1,arg2=value2"
#[serde(
deserialize_with = "deserialize_args",
default = "HashMap::new"
)]
iouring_args: HashMap<String, String>,
},

/// How to open network connections
Expand Down Expand Up @@ -186,11 +237,17 @@ pub enum Distribution {
#[derive(Debug)]
pub enum WorkerError {
Internal,
InternalWithMessage(String),
}

impl Display for WorkerError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "worker error found")
match self {
WorkerError::Internal => write!(f, "worker error found"),
WorkerError::InternalWithMessage(msg) => {
write!(f, "worker error: {}", msg)
}
}
}
}

Expand Down Expand Up @@ -356,12 +413,7 @@ mod tests {
..
} = config;
assert_eq!(restart_interval, 10);
if let Workload::Syscalls {
arrival_rate,
tight_loop,
syscall_nr,
} = workload
{
if let Workload::Syscalls { arrival_rate, .. } = workload {
assert_eq!(arrival_rate, 10.0);
} else {
panic!("wrong workload type found");
Expand Down
38 changes: 23 additions & 15 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ extern crate core_affinity;

use config::Config;
use core_affinity::CoreId;
use fork::{fork, Fork};
use fork::{Fork, fork};
use itertools::iproduct;
use nix::sys::signal::{kill, Signal};
use nix::sys::signal::{Signal, kill};
use nix::sys::wait::waitpid;
use nix::unistd::Pid;
use std::time::SystemTime;
use std::{env, thread, time};

use berserker::{worker::new_worker, WorkloadConfig};
use berserker::{WorkloadConfig, worker::new_worker};

fn main() {
let args: Vec<String> = env::args().collect();
Expand Down Expand Up @@ -70,8 +70,13 @@ fn main() {

let handles: Vec<_> = iproduct!(core_ids.into_iter(), 0..config.workers)
.map(|(cpu, process)| {
let worker =
new_worker(config, cpu, process, &mut lower, &mut upper);
let worker = new_worker(
config.clone(),
cpu,
process,
&mut lower,
&mut upper,
);

match fork() {
Ok(Fork::Parent(child)) => {
Expand Down Expand Up @@ -102,17 +107,20 @@ fn main() {
thread::scope(|s| {
if config.duration != 0 {
// Spin a watcher thread
s.spawn(move || loop {
thread::sleep(time::Duration::from_secs(1));
let elapsed = duration_timer.elapsed().unwrap().as_secs();

if elapsed > config.duration {
for handle in processes.iter().flatten() {
info!("Terminating: {}", *handle);
let _ = kill(Pid::from_raw(*handle), Signal::SIGTERM);
s.spawn(move || {
loop {
thread::sleep(time::Duration::from_secs(1));
let elapsed = duration_timer.elapsed().unwrap().as_secs();

if elapsed > config.duration {
for handle in processes.iter().flatten() {
info!("Terminating: {}", *handle);
let _ =
kill(Pid::from_raw(*handle), Signal::SIGTERM);
}

break;
}

break;
}
});
}
Expand Down
4 changes: 2 additions & 2 deletions src/worker/bpf.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
cmp,
ffi::{c_char, CString},
ffi::{CString, c_char},
fmt::Display,
mem, slice, thread,
};
Expand All @@ -17,7 +17,7 @@ use aya_obj::generated::{

use crate::{BaseConfig, Worker, WorkerError, Workload, WorkloadConfig};

#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone)]
pub struct BpfWorker {
config: BaseConfig,
workload: WorkloadConfig,
Expand Down
Loading
Loading