From 5cd804506dfd0c88439fb9ea78a88e73b486f5a6 Mon Sep 17 00:00:00 2001 From: raIIe Date: Mon, 26 Jan 2026 23:57:22 +0100 Subject: [PATCH 1/7] Create machine_learning_models.py Final commit containing: -PositionalEncoding module for the hybrid model -custom Bandwidth dataset for uniform handling of the data for the hybrid model - the hybrid model -the training and prediction methods for the hybrid model -helper spectral loss method -extract method, mainly based FTIO's core - train arima method, which is essenially the main entrypoint for training and forecasting using the ARIMA and SARIMA models --- .../machine_learning_models.py | 597 ++++++++++++++++++ 1 file changed, 597 insertions(+) create mode 100644 machine_learning_models/machine_learning_models.py diff --git a/machine_learning_models/machine_learning_models.py b/machine_learning_models/machine_learning_models.py new file mode 100644 index 00000000..9dee81c1 --- /dev/null +++ b/machine_learning_models/machine_learning_models.py @@ -0,0 +1,597 @@ +from math import inf + +import numpy as np +import pandas +import pandas as pd +from statsmodels.tsa.stattools import adfuller, kpss +from statsmodels.tsa.statespace.sarimax import SARIMAX +from statsmodels.tsa.arima.model import ARIMA +from ftio.cli.ftio_core import freq_analysis +from ftio.freq.autocorrelation import find_autocorrelation +from ftio.prediction.unify_predictions import merge_predictions +from ftio.parse.extract import get_time_behavior +from ftio.parse.scales import Scales +import torch.nn as nn +import torch +from torch.utils.data import Dataset +import torch.optim as optim +import torch.nn.functional as F +import matplotlib.pyplot as plt +import os + +class PositionalEncoding(nn.Module): + """Position encoding used in the hybrid model to increase the likelihood of understanding sequential data. + + """ + def __init__(self, emb_dim, max_len=5555): + """ Constructor of the positional encoding. + + Args: + emb_dim: Size of the embedding dimension + max_len: Maximum length of sequence length that can be handled. + """ + super().__init__() + # Learned Positional Encoding + self.positional_encoding = nn.Parameter(torch.zeros(1, max_len, emb_dim)) + + def forward(self, x): + """ Forward method of the positional encoding. + + Args: + x: The embedded sequence. + + Returns: The embedded sequence added to the vector of the sliced positional encoding. + + """ + # add embedded sequence + sliced positional encoding (added to device to prevent RuntimeError) + return x + self.positional_encoding[:, :x.size(1)].to(x.device) + +class BandwidthDataset(Dataset): + """ This Dataset accepts lists containing multiple partial sequences of list[ranks, bandwidth, duration, start_time] + that combine into one complete sequence. + The conversion of the list into a new representation in torch.tensors allows for direct training or prediction. + + """ + def __init__(self, data_list, num_parts=3): + + """ Constructor of the Dataset. Convertion of data into torch.tensors. + + Args: + data_list: Lists containing multiple partial sequences of list[ranks, bandwidth, duration, start_time] + """ + self.data = [] + self.num_parts = num_parts + + for seq in data_list: + seq = torch.tensor(seq, dtype=torch.float32) + bandwidth = seq[:, 0].unsqueeze(-1) + + # normalize per-sequence + min_val, max_val = bandwidth.min(), bandwidth.max() + bandwidth = (bandwidth - min_val) / (max_val - min_val + 1e-8) + + L = len(bandwidth) + part_len = L // self.num_parts + + # split into num_parts equal parts + parts = [bandwidth[i*part_len:(i+1)*part_len] for i in range(self.num_parts-1)] + parts.append(bandwidth[(self.num_parts-1)*part_len:]) + # training input + past = torch.cat(parts[:-1], dim=0) + + # target = last part + future = parts[-1] + self.data.append((past, future)) + + + def __len__(self): + """ Method providing the amount of datapoints inside the dataset. + + Returns: Amount of datapoints inside the dataset + + """ + return len(self.data) + + def __getitem__(self, idx): + """ Method providing datapoint from specified index. + + Args: + idx: Index of the specific datapoint + + Returns: The datapoint from the specified index. + + """ + return self.data[idx] + +class HybridModel(nn.Module): + """ A hybrid model leveraging transformer and long short-term memory. + + """ + def __init__(self, emb_dim=128, n_heads=4, ff_dim=128, num_layers=6, lstm_hidden=128): + """ Constructor of the hybrid model. Currently only supports the most important parameters. + + Args: + emb_dim: Size of the embedding dimension + n_heads: Amount of attention heads of transformer approach + ff_dim: Size of the Feedforward network dimension + num_layers: Amount of Transformer encoder layers + lstm_hidden: Amount of hidden units in the LSTM part after transformer + """ + super().__init__() + self.bandwidth_embedding = nn.Linear(1, emb_dim) + self.positional_encoding = PositionalEncoding(emb_dim) + self.transformer = nn.TransformerEncoder( + nn.TransformerEncoderLayer(d_model=emb_dim, nhead=n_heads, + dim_feedforward=ff_dim, batch_first=True), + num_layers=num_layers + ) + self.lstm = nn.LSTM(input_size=emb_dim, hidden_size=lstm_hidden, + batch_first=True, bidirectional=True) + self.fc_bandwidth = nn.Linear(lstm_hidden * 2, 1) + + def forward(self, past_seq, prediction_length=None): + x = self.bandwidth_embedding(past_seq) + x = self.positional_encoding(x) + x = self.transformer(x) + x, _ = self.lstm(x) + + # shrink output to target length if needed + if prediction_length is not None: + x = x[:, -prediction_length:, :] # take last part only + + return self.fc_bandwidth(x) + +def train_hybrid_model(file_or_directory, bandwidth_cutoff = -1, load_state_dict_and_optimizer_state = None, + emb_dim = 128, n_heads = 4, ff_dim = 128, num_layers=3, epochs= 10, lr = 1e-3, save = False, + additional_ftio_args = None)\ + ->HybridModel: + """ Trains the hybrid model contained in this file and saves the trained model as a .pth file containing + the model's state dict and optimizer state. + + + Args: + file_or_directory: Path to either a singular file or a directory of files to train the model with. + bandwidth_cutoff: Any partial bandwidth sequence below this thresh hold will be cut off from the training data. + load_state_dict_and_optimizer_state: .pth file checkpoint to continue training the model from + emb_dim: embedded dimensions of the model + n_heads: heads of the model + ff_dim: feedforward dimension of the model + num_layers: number of layers of the model + epochs: number of epochs used to train the model + lr: intensity of the learn rate + save: boolean representing if the model is supposed to be saved. Doesn't affect the return value. + additional_ftio_args: additional supported ftio arguments that aren't the initial ftio call and files + + Returns: + HybridModel trained with provided data + + """ + + # checks if file_or_directory is either just a singular file OR a path to a directory with files + # and then saves the individual files paths + files = [] + if os.path.isfile(file_or_directory): + files.append(file_or_directory) + elif os.path.isdir(file_or_directory): + files = [os.path.join(file_or_directory, f) for f in os.listdir(file_or_directory) if os.path.isfile(os.path.join(file_or_directory, f))] + + if not files: + raise ValueError("No file(s) found.") + + # the initialised model + model = HybridModel(emb_dim=emb_dim, n_heads=n_heads, ff_dim=ff_dim, num_layers=num_layers) + load_optimizer = None + + # in case the function caller wants to train a previously trained model + if load_state_dict_and_optimizer_state is not None: + loaded_data = torch.load(load_state_dict_and_optimizer_state) + model.load_state_dict(loaded_data['model_state_dict']) + load_optimizer = loaded_data['optimizer_state_dict'] + + + for x in files: + set = [] + frequency = [] + # extraction of data through frequency analysis for darshan & json + if x.endswith(".darshan") | x.endswith(".json"): + if additional_ftio_args is not None: + frequency, set = extract(cmd_input=["ftio", x, additional_ftio_args], bandwidth_cutoff=bandwidth_cutoff) + else: + frequency, set = extract(cmd_input=["ftio", x], bandwidth_cutoff=bandwidth_cutoff) + + # extraction from .csv files of the sdumont traces ; not a general method to extract from any .csv file + if x.endswith(".csv"): + csv_file = pandas.read_csv(x) + n = 0 + for y in csv_file["both"]: + set.append([y]) + n = n + 1 + + # max size of positional encoding of the model, limitation due to the system used for computation + if n == 5555: + break + + # Calculate z-score and throw out traces with possible outliers, which might negatively influence the data + # not used anymore, but may hold additional context / reasoning / on development + #mean = np.mean(set) + #deviation = np.std(set) + #if deviation == 0: + # deviation = 1 + #z_scores = [(x - mean) / deviation for x in set] + #outliers = False + #for o in z_scores: + # if (o[0] > 3) | (o[0] < -3): + # outliers = True + # booted = booted+1 + # break + #if outliers: + # continue + + dataset = BandwidthDataset([set], num_parts=frequency) + load_optimizer = train_model(model, dataset, epochs=epochs, lr=lr, load_optimizer=load_optimizer) + predict_next_sequence(model, "", only_data=(set, frequency)) + + # save the model + # currently saving it in the currently active archive + if save: + torch.save({ + 'model_state_dict': model.state_dict(), + 'optimizer_state_dict':load_optimizer + }, 'model_and_optimizer.pth') + + return model + +def train_model(model, dataset, epochs = 3, lr= 1e-3, load_optimizer = None, optimizer = None): + """ Training method for the hybrid model. + + Args: + model: The hybrid model to be trained + dataset: The bandwidth dataset containing all datapoints to train the model. + epochs: The amount of epochs used to train the model. + lr: The learn rate of the model. + load_optimizer: Optimizer dic in case the model was previously trained and not lose the optimizer state. + optimizer: The optimizer to be used in the training process. + + Returns: + The state dict of the optimizer for saving/reusing purposes. + + """ + # check if customizer needs to be loaded or initialized + if optimizer is None: + optimizer = optim.AdamW(model.parameters(), lr=lr) + if load_optimizer is not None: + optimizer.load_state_dict(load_optimizer) + + # beginning of the training loop + scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3) + model.train() + for epoch in range(epochs): + total_loss = 0 + for past, future in dataset: + past = past.unsqueeze(0) + future = future.unsqueeze(0) + #zero gradient + optimizer.zero_grad() + pred_future = model(past, prediction_length=future.size(1)) + # loss & backwards propagation + loss = (0.3*F.huber_loss(pred_future, future)) + (0.7*spectral_loss(pred_future, future)) + loss.backward() + torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) + optimizer.step() + total_loss += loss.item() + # just in case to not divide by zero, it shouldn't happen but making sure it doesn't + if len(dataset) == 0: + a = 1 + else: + a = len(dataset) + scheduler.step(total_loss / a) + # provides the total loss for some insight during the epochs IF a human is overseeing the training + if epoch % 2 == 0: + print(f"Last Loss: {total_loss:.4f}") + return optimizer.state_dict() + + +def predict_next_sequence(model_or_pthfile, file_or_directory, bandwidth_cutoff = 0, additional_ftio_args = None, only_data = None): + + """ Entry point of the prediction process. + + Args: + model_or_pthfile: Accepts either a HybridModel or a .pth file (to initialize a model) to use for the prediction. + file_or_directory: Path to either a singular file or a directory of files to train the model with. + bandwidth_cutoff: Any bandwidth below this thresh hold will be cut off from the data used for prediction. + additional_ftio_args: additional supported ftio arguments that aren't the initial ftio call and files + only_data: in the case of directly using extracted data instead of files or a path to files + Returns: + A list of the following form [[ranks, bandwidths], ...] where inner lists represent singular predictions. + """ + + # checks if file_or_path is either just a singular file OR a path to a directory with files + # and then saves the individual files paths + files = [] + if os.path.isfile(file_or_directory): + files.append(file_or_directory) + elif os.path.isdir(file_or_directory): + files = [os.path.join(file_or_directory, f) for f in os.listdir(file_or_directory) if os.path.isfile(os.path.join(file_or_directory, f))] + + # in case no files nor data was supplied + if (not files) and (only_data is None): + print("No files found. Aborting prediction.") + return [] + + # either use the given model or instantiate with .pth file or test with untrained model + model = None + if isinstance(model_or_pthfile, HybridModel): + model = model_or_pthfile + elif model_or_pthfile.endswith('.pth'): + model = HybridModel(emb_dim=128, n_heads=4, ff_dim=128, num_layers=3) + model.load_state_dict( (torch.load(model_or_pthfile))['model_state_dict'] ) + + # prediction with an untrained model... questionable but maybe someone desires this as a test? + if model is None: + model = HybridModel(emb_dim=128, n_heads=4, ff_dim=128, num_layers=3) + + + + predictions = [] + + if only_data is not None: + set, frequency = only_data + dataset = BandwidthDataset([set], num_parts=frequency) + + # min-max normalization + min1, max1 = min(set)[0], max(set)[0] + for u in range(len(set)): + set[u][0] = (set[u][0] - min1 ) / (max1 - min1 + 1e-8) + + # prediction process + prediction_current = __predict_next_trace(model, dataset) + predictions.append(prediction_current) + + actual = [] + for c in dataset.data[0][1]: + actual.append([c[0]]) + return predictions + + + for x in files: + set = [] + frequency = [] + # extraction of data through frequency analysis for darshan & json + if x.endswith(".darshan") | x.endswith(".json"): + if additional_ftio_args is not None: + frequency, set = (extract(cmd_input=["ftio", x, additional_ftio_args], bandwidth_cutoff=bandwidth_cutoff)) + else: + frequency, set =(extract(cmd_input=["ftio", x], bandwidth_cutoff=bandwidth_cutoff)) + # extract data from traces obtained from sdumont dataset, most certainly not an approach for any .csv file + if x.endswith(".csv"): + csv_file = pandas.read_csv(x) + n = 0 + for y in csv_file["both"]: + set.append([y]) + n = n + 1 + + # size limit on local machine, can be changed IF positional encoding in hybrid model is also changed + if n == 5555: + break + # min-max normalization + min1, max1 = min(set)[0], max(set)[0] + for u in range(len(set)): + set[u][0] = (set[u][0] - min1 ) / (max1 - min1 + 1e-8) + + + # prediction + + dataset = BandwidthDataset([set], num_parts=frequency) + prediction_current = __predict_next_trace(model, dataset) + predictions.append(prediction_current) + + return predictions + +def spectral_loss(pred, target): + """ Basic spectral loss funcion based on one-dimensional fourier transform + Args: + pred: the predicted values + target: the actual values + """ + # pred, target: [batch, seq_len] + pred_fft = torch.fft.rfft(pred, dim=1) + target_fft = torch.fft.rfft(target, dim=1) + return F.mse_loss(torch.abs(pred_fft), torch.abs(target_fft)) + +def __predict_next_trace(model, dataset): + """ Uses the provided hybrid model to predict the next bandwidth sequence. + + Args: + model: the hybrid model of this file + dataset: dataset containing the bandwidth sequences used for the prediction + + """ + + model.eval() + with torch.no_grad(): + prediction = [] + for past, future in dataset: + # prediction + squeezed_past = past.unsqueeze(0) + pred_future = model(squeezed_past, prediction_length=future.size(0)) + pred_sequence = pred_future.squeeze(0).tolist() + # only positive values are interesting + pred_sequence = np.abs(pred_sequence) + + min_val, max_val = pred_sequence.min(), pred_sequence.max() + pred_sequence = (pred_sequence - min_val) / (max_val - min_val + 1e-8) + + prediction.append(pred_sequence) + + # currently assumes that past and future have the same length + # calculate MAE, MSE and RMSE + mae = 0 + mse = 0 + for actual, predicted in zip(future, pred_sequence): + inner = actual.item() - predicted[0] + mae = mae + abs(inner) + mse = mse + (inner * inner) + + n =future.size(dim=0) + if n == 0: + n = 1 + mae = mae / n + mse = mse / n + rmse =np.sqrt(mse) + + + print("Predicted Bandwidth Trace:") + print(pred_sequence) + return prediction + +def extract(cmd_input, msgs = None) -> list: + """ Extraction method leveraging frequency analysis for the initial data in dataframe form. And calculates the amount + of expected partial patterns + + Args: + cmd_input: The ftio arguments in the form of a list of strings. + msgs: ZMQ message (not used / no use intended yet) + + Returns: + list[ n , list[ bandwidth, ], ...] + """ + # taken from get_time_behavior_and_args from extract.py + data = Scales(cmd_input, msgs) + args = data.args + df = data.get_io_mode(args.mode) + data = get_time_behavior(df) + + dfs_out = [[], [], [], []] + prediction = None + # taken from ftio_core.py's main + for sim in data: + # get prediction + # Perform frequency analysis (dft/wavelet) + prediction_dft, dfs_out, share = freq_analysis(args, sim) + + # Perform autocorrelation if args.autocorrelation is true + Merge the results into a single prediction + prediction_auto = find_autocorrelation(args, sim,dfs_out, share) + # Merge results + prediction = merge_predictions(args, prediction_dft, prediction_auto, dfs_out) + + # extraction of the relevant data from the dataframes + b_sampled = dfs_out.b_sampled.tolist() + + result = [] + + for x in b_sampled: + result.append([x]) + + # calculates the amount of partial patterns using the predicted dominant frequency of FTIO + n = 3 + if (prediction.dominant_freq.size != 0) and (prediction.dominant_freq[0] != 0): + n = int(prediction.t_end/(1 / prediction.dominant_freq[0])) + return n, result + +def train_arima(file_or_directory, max_depth = 3, model_architecture = "SARIMA"): + """ The entry point for training the ARIMA and SARIMA models + + Args: + file_or_directory: the file or directory to train the ARIMA model + max_depth: the maximum depth of the ARIMA model + model_architecture: choose between SARIMA or ARIMA architecture + + """ + # checks if file_or_path is either just a singular file OR a path to a directory with files + # and then saves the individual files paths + files = [] + if os.path.isfile(file_or_directory): + files.append(file_or_directory) + elif os.path.isdir(file_or_directory): + files = [os.path.join(file_or_directory, f) for f in os.listdir(file_or_directory) if os.path.isfile(os.path.join(file_or_directory, f))] + + # in the case of no supplied files or filepath + if not files: + print("No files found. Aborting prediction.") + return [] + + # use extract method for extraction of data + predictions = [] + sequences = [] + for file in files: + n, bandwidth_sequence = extract(cmd_input=["ftio", file]) + sequences.append([n, bandwidth_sequence]) + + for sequence in sequences: + + # initial min-max normalization + min_val_initial, max_val_initial = inf, -inf + for value in sequence[1]: + if value[0] < min_val_initial: + min_val_initial = value[0] + if value[0] > max_val_initial: + max_val_initial = value[0] + + for value in sequence[1]: + value[0] = (value[0] - min_val_initial) / (max_val_initial - min_val_initial + 1e-8) + + # split data into trainings and comparison data + split_index = int( len(sequence[1])/sequence[0] )*(sequence[0]-1) + trainings_part = sequence[1][:split_index] + future_part = sequence[1][split_index:] + d = 0 + # main loop to apply KPSS and ADF to find depth d + for y in range(1): + partial_sequence = pd.Series([x[y] for x in trainings_part], pd.Index(np.arange(len([x[y] for x in trainings_part]), dtype="int64" ))) + + try: + kps = kpss(partial_sequence.dropna())[1] + except: + kps = 0.0 + + try: + adf = adfuller(partial_sequence.dropna())[1] + except: + adf = 1.0 + + while d is not max_depth and kps > 0.05 > adf: + + partial_sequence = partial_sequence.diff() + d = d + 1 + + try: + kps = kpss(partial_sequence.dropna())[1] + except: + kps = 0.0 + + try: + adf = adfuller(partial_sequence.dropna())[1] + except: + adf = 1.0 + # in case NA, INF and -INF values are introduced by differencing + partial_sequence = partial_sequence.fillna(0.0) + partial_sequence = partial_sequence.replace([np.inf, -np.inf], 0) + + final_model = None + best_aic = inf + # search grid based on AIC, limited to max p = 5 and q = 8, going deeper requires longer computing + for p in range(5): + for q in range(8): + try: + if model_architecture == "SARIMA": + model = SARIMAX(partial_sequence, order = (p,d,q), seasonal_order=(0, 0, 0, len(future_part) )) + else: + model = ARIMA(partial_sequence, order = (p,d,q)) + model = model.fit() + + # application of the AIC + aic = model.aic + if aic < best_aic: + final_model = model + best_aic = aic + print("p : d : q", (p,d,q)) + # some variations will throw exceptions and warnings, this will exclude them + except: + continue + # the forecast + prediction = final_model.forecast(len(future_part)) + # min-max normalization for uniform displaying of results to actual + min_val, max_val = prediction.min(), prediction.max() + prediction = (prediction - min_val) / (max_val - min_val + 1e-8) + predictions.append(prediction) + return predictions From 5f520934f82885641d6197ff9683cb8ea1b961b6 Mon Sep 17 00:00:00 2001 From: raIIe Date: Mon, 2 Feb 2026 06:32:14 +0100 Subject: [PATCH 2/7] minor changes Small bug fix in machine_learning models added test cases added documentation and examples in the form of mark down document applied code style --- .../machine_learning_models.py | 283 +++++++++++------- .../machine_learning_models_documentation.md | 43 +++ machine_learning_models/test_cases.py | 70 +++++ 3 files changed, 283 insertions(+), 113 deletions(-) create mode 100644 machine_learning_models/machine_learning_models_documentation.md create mode 100644 machine_learning_models/test_cases.py diff --git a/machine_learning_models/machine_learning_models.py b/machine_learning_models/machine_learning_models.py index 9dee81c1..22f782e9 100644 --- a/machine_learning_models/machine_learning_models.py +++ b/machine_learning_models/machine_learning_models.py @@ -1,30 +1,45 @@ +import os from math import inf import numpy as np import pandas import pandas as pd -from statsmodels.tsa.stattools import adfuller, kpss -from statsmodels.tsa.statespace.sarimax import SARIMAX +import torch +import torch.nn as nn +import torch.nn.functional as F +import torch.optim as optim from statsmodels.tsa.arima.model import ARIMA +from statsmodels.tsa.statespace.sarimax import SARIMAX +from statsmodels.tsa.stattools import adfuller, kpss +from torch.utils.data import Dataset + from ftio.cli.ftio_core import freq_analysis from ftio.freq.autocorrelation import find_autocorrelation -from ftio.prediction.unify_predictions import merge_predictions from ftio.parse.extract import get_time_behavior from ftio.parse.scales import Scales -import torch.nn as nn -import torch -from torch.utils.data import Dataset -import torch.optim as optim -import torch.nn.functional as F -import matplotlib.pyplot as plt -import os +from ftio.prediction.unify_predictions import merge_predictions + +""" +Example Description: +This module contains the entry points for extracting data for training and prediction purpose, +the hybrid-model, the ARIMA and SARIMA models and their training and forecasting functions. +As utility it contains a custom dataset intended for the hybrid-model but was initially planned for general purpose use. + +Author: Robert Alles +Copyright (c) 2025 TU Darmstadt, Germany +Date: + +Licensed under the BSD 3-Clause License. +For more information, see the LICENSE file in the project root: +https://github.com/tuda-parallel/FTIO/blob/main/LICENSE +""" + class PositionalEncoding(nn.Module): - """Position encoding used in the hybrid model to increase the likelihood of understanding sequential data. + """Position encoding used in the hybrid model to increase the likelihood of understanding sequential data.""" - """ def __init__(self, emb_dim, max_len=5555): - """ Constructor of the positional encoding. + """Constructor of the positional encoding. Args: emb_dim: Size of the embedding dimension @@ -35,7 +50,7 @@ def __init__(self, emb_dim, max_len=5555): self.positional_encoding = nn.Parameter(torch.zeros(1, max_len, emb_dim)) def forward(self, x): - """ Forward method of the positional encoding. + """Forward method of the positional encoding. Args: x: The embedded sequence. @@ -44,25 +59,31 @@ def forward(self, x): """ # add embedded sequence + sliced positional encoding (added to device to prevent RuntimeError) - return x + self.positional_encoding[:, :x.size(1)].to(x.device) + return x + self.positional_encoding[:, : x.size(1)].to(x.device) + class BandwidthDataset(Dataset): - """ This Dataset accepts lists containing multiple partial sequences of list[ranks, bandwidth, duration, start_time] - that combine into one complete sequence. - The conversion of the list into a new representation in torch.tensors allows for direct training or prediction. + """This Dataset accepts lists containing multiple partial sequences of list[ranks, bandwidth, duration, start_time] + that combine into one complete sequence. + The conversion of the list into a new representation in torch.tensors allows for direct training or prediction. """ - def __init__(self, data_list, num_parts=3): - """ Constructor of the Dataset. Convertion of data into torch.tensors. + def __init__(self, data_list, num_parts=3): + """Constructor of the Dataset. Convertion of data into torch.tensors. Args: - data_list: Lists containing multiple partial sequences of list[ranks, bandwidth, duration, start_time] + data_list: Lists containing the bandwidth values of the sequence. + num_parts: Int deciding the number of slices to be made from the sequence. """ self.data = [] self.num_parts = num_parts + if not self.num_parts: + self.num_parts = 3 + for seq in data_list: + seq = torch.tensor(seq, dtype=torch.float32) bandwidth = seq[:, 0].unsqueeze(-1) @@ -74,8 +95,11 @@ def __init__(self, data_list, num_parts=3): part_len = L // self.num_parts # split into num_parts equal parts - parts = [bandwidth[i*part_len:(i+1)*part_len] for i in range(self.num_parts-1)] - parts.append(bandwidth[(self.num_parts-1)*part_len:]) + parts = [ + bandwidth[i * part_len : (i + 1) * part_len] + for i in range(self.num_parts - 1) + ] + parts.append(bandwidth[(self.num_parts - 1) * part_len :]) # training input past = torch.cat(parts[:-1], dim=0) @@ -83,32 +107,31 @@ def __init__(self, data_list, num_parts=3): future = parts[-1] self.data.append((past, future)) - def __len__(self): - """ Method providing the amount of datapoints inside the dataset. + """Method providing the amount of datapoints inside the dataset. - Returns: Amount of datapoints inside the dataset + Returns: Amount of datapoints inside the dataset. """ return len(self.data) def __getitem__(self, idx): - """ Method providing datapoint from specified index. + """Method providing datapoint from specified index. Args: - idx: Index of the specific datapoint + idx: Index of the specific datapoint. Returns: The datapoint from the specified index. """ return self.data[idx] + class HybridModel(nn.Module): - """ A hybrid model leveraging transformer and long short-term memory. + """A hybrid model leveraging transformer and long short-term memory.""" - """ def __init__(self, emb_dim=128, n_heads=4, ff_dim=128, num_layers=6, lstm_hidden=128): - """ Constructor of the hybrid model. Currently only supports the most important parameters. + """Constructor of the hybrid model. Currently only supports the most important parameters. Args: emb_dim: Size of the embedding dimension @@ -121,12 +144,17 @@ def __init__(self, emb_dim=128, n_heads=4, ff_dim=128, num_layers=6, lstm_hidden self.bandwidth_embedding = nn.Linear(1, emb_dim) self.positional_encoding = PositionalEncoding(emb_dim) self.transformer = nn.TransformerEncoder( - nn.TransformerEncoderLayer(d_model=emb_dim, nhead=n_heads, - dim_feedforward=ff_dim, batch_first=True), - num_layers=num_layers + nn.TransformerEncoderLayer( + d_model=emb_dim, nhead=n_heads, dim_feedforward=ff_dim, batch_first=True + ), + num_layers=num_layers, + ) + self.lstm = nn.LSTM( + input_size=emb_dim, + hidden_size=lstm_hidden, + batch_first=True, + bidirectional=True, ) - self.lstm = nn.LSTM(input_size=emb_dim, hidden_size=lstm_hidden, - batch_first=True, bidirectional=True) self.fc_bandwidth = nn.Linear(lstm_hidden * 2, 1) def forward(self, past_seq, prediction_length=None): @@ -141,11 +169,21 @@ def forward(self, past_seq, prediction_length=None): return self.fc_bandwidth(x) -def train_hybrid_model(file_or_directory, bandwidth_cutoff = -1, load_state_dict_and_optimizer_state = None, - emb_dim = 128, n_heads = 4, ff_dim = 128, num_layers=3, epochs= 10, lr = 1e-3, save = False, - additional_ftio_args = None)\ - ->HybridModel: - """ Trains the hybrid model contained in this file and saves the trained model as a .pth file containing + +def train_hybrid_model( + file_or_directory, + bandwidth_cutoff=-1, + load_state_dict_and_optimizer_state=None, + emb_dim=128, + n_heads=4, + ff_dim=128, + num_layers=3, + epochs=10, + lr=1e-3, + save=False, + additional_ftio_args=None, +) -> HybridModel: + """Trains the hybrid model contained in this file and saves the trained model as a .pth file containing the model's state dict and optimizer state. @@ -173,31 +211,36 @@ def train_hybrid_model(file_or_directory, bandwidth_cutoff = -1, load_state_dict if os.path.isfile(file_or_directory): files.append(file_or_directory) elif os.path.isdir(file_or_directory): - files = [os.path.join(file_or_directory, f) for f in os.listdir(file_or_directory) if os.path.isfile(os.path.join(file_or_directory, f))] + files = [ + os.path.join(file_or_directory, f) + for f in os.listdir(file_or_directory) + if os.path.isfile(os.path.join(file_or_directory, f)) + ] if not files: raise ValueError("No file(s) found.") # the initialised model - model = HybridModel(emb_dim=emb_dim, n_heads=n_heads, ff_dim=ff_dim, num_layers=num_layers) + model = HybridModel( + emb_dim=emb_dim, n_heads=n_heads, ff_dim=ff_dim, num_layers=num_layers + ) load_optimizer = None # in case the function caller wants to train a previously trained model if load_state_dict_and_optimizer_state is not None: loaded_data = torch.load(load_state_dict_and_optimizer_state) - model.load_state_dict(loaded_data['model_state_dict']) - load_optimizer = loaded_data['optimizer_state_dict'] - + model.load_state_dict(loaded_data["model_state_dict"]) + load_optimizer = loaded_data["optimizer_state_dict"] for x in files: set = [] frequency = [] # extraction of data through frequency analysis for darshan & json - if x.endswith(".darshan") | x.endswith(".json"): + if x.endswith(".darshan") | x.endswith(".jsonl"): if additional_ftio_args is not None: - frequency, set = extract(cmd_input=["ftio", x, additional_ftio_args], bandwidth_cutoff=bandwidth_cutoff) + frequency, set = extract(cmd_input=["ftio", x, additional_ftio_args]) else: - frequency, set = extract(cmd_input=["ftio", x], bandwidth_cutoff=bandwidth_cutoff) + frequency, set = extract(cmd_input=["ftio", x]) # extraction from .csv files of the sdumont traces ; not a general method to extract from any .csv file if x.endswith(".csv"): @@ -211,41 +254,31 @@ def train_hybrid_model(file_or_directory, bandwidth_cutoff = -1, load_state_dict if n == 5555: break - # Calculate z-score and throw out traces with possible outliers, which might negatively influence the data - # not used anymore, but may hold additional context / reasoning / on development - #mean = np.mean(set) - #deviation = np.std(set) - #if deviation == 0: - # deviation = 1 - #z_scores = [(x - mean) / deviation for x in set] - #outliers = False - #for o in z_scores: - # if (o[0] > 3) | (o[0] < -3): - # outliers = True - # booted = booted+1 - # break - #if outliers: - # continue - dataset = BandwidthDataset([set], num_parts=frequency) - load_optimizer = train_model(model, dataset, epochs=epochs, lr=lr, load_optimizer=load_optimizer) + load_optimizer = train_model( + model, dataset, epochs=epochs, lr=lr, load_optimizer=load_optimizer + ) predict_next_sequence(model, "", only_data=(set, frequency)) # save the model # currently saving it in the currently active archive if save: - torch.save({ - 'model_state_dict': model.state_dict(), - 'optimizer_state_dict':load_optimizer - }, 'model_and_optimizer.pth') + torch.save( + { + "model_state_dict": model.state_dict(), + "optimizer_state_dict": load_optimizer, + }, + "model_and_optimizer.pth", + ) return model -def train_model(model, dataset, epochs = 3, lr= 1e-3, load_optimizer = None, optimizer = None): - """ Training method for the hybrid model. + +def train_model(model, dataset, epochs=3, lr=1e-3, load_optimizer=None, optimizer=None): + """Training method for the hybrid model. Args: - model: The hybrid model to be trained + model: The hybrid model to be trained. dataset: The bandwidth dataset containing all datapoints to train the model. epochs: The amount of epochs used to train the model. lr: The learn rate of the model. @@ -263,18 +296,22 @@ def train_model(model, dataset, epochs = 3, lr= 1e-3, load_optimizer = None, opt optimizer.load_state_dict(load_optimizer) # beginning of the training loop - scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3) + scheduler = optim.lr_scheduler.ReduceLROnPlateau( + optimizer, mode="min", factor=0.5, patience=3 + ) model.train() for epoch in range(epochs): total_loss = 0 for past, future in dataset: past = past.unsqueeze(0) future = future.unsqueeze(0) - #zero gradient + # zero gradient optimizer.zero_grad() pred_future = model(past, prediction_length=future.size(1)) # loss & backwards propagation - loss = (0.3*F.huber_loss(pred_future, future)) + (0.7*spectral_loss(pred_future, future)) + loss = (0.3 * F.huber_loss(pred_future, future)) + ( + 0.7 * spectral_loss(pred_future, future) + ) loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) optimizer.step() @@ -291,9 +328,14 @@ def train_model(model, dataset, epochs = 3, lr= 1e-3, load_optimizer = None, opt return optimizer.state_dict() -def predict_next_sequence(model_or_pthfile, file_or_directory, bandwidth_cutoff = 0, additional_ftio_args = None, only_data = None): - - """ Entry point of the prediction process. +def predict_next_sequence( + model_or_pthfile, + file_or_directory, + bandwidth_cutoff=0, + additional_ftio_args=None, + only_data=None, +): + """Entry point of the prediction process. Args: model_or_pthfile: Accepts either a HybridModel or a .pth file (to initialize a model) to use for the prediction. @@ -311,7 +353,11 @@ def predict_next_sequence(model_or_pthfile, file_or_directory, bandwidth_cutoff if os.path.isfile(file_or_directory): files.append(file_or_directory) elif os.path.isdir(file_or_directory): - files = [os.path.join(file_or_directory, f) for f in os.listdir(file_or_directory) if os.path.isfile(os.path.join(file_or_directory, f))] + files = [ + os.path.join(file_or_directory, f) + for f in os.listdir(file_or_directory) + if os.path.isfile(os.path.join(file_or_directory, f)) + ] # in case no files nor data was supplied if (not files) and (only_data is None): @@ -322,16 +368,14 @@ def predict_next_sequence(model_or_pthfile, file_or_directory, bandwidth_cutoff model = None if isinstance(model_or_pthfile, HybridModel): model = model_or_pthfile - elif model_or_pthfile.endswith('.pth'): + elif model_or_pthfile.endswith(".pth"): model = HybridModel(emb_dim=128, n_heads=4, ff_dim=128, num_layers=3) - model.load_state_dict( (torch.load(model_or_pthfile))['model_state_dict'] ) + model.load_state_dict((torch.load(model_or_pthfile))["model_state_dict"]) # prediction with an untrained model... questionable but maybe someone desires this as a test? if model is None: model = HybridModel(emb_dim=128, n_heads=4, ff_dim=128, num_layers=3) - - predictions = [] if only_data is not None: @@ -341,7 +385,7 @@ def predict_next_sequence(model_or_pthfile, file_or_directory, bandwidth_cutoff # min-max normalization min1, max1 = min(set)[0], max(set)[0] for u in range(len(set)): - set[u][0] = (set[u][0] - min1 ) / (max1 - min1 + 1e-8) + set[u][0] = (set[u][0] - min1) / (max1 - min1 + 1e-8) # prediction process prediction_current = __predict_next_trace(model, dataset) @@ -352,16 +396,15 @@ def predict_next_sequence(model_or_pthfile, file_or_directory, bandwidth_cutoff actual.append([c[0]]) return predictions - for x in files: set = [] frequency = [] # extraction of data through frequency analysis for darshan & json - if x.endswith(".darshan") | x.endswith(".json"): + if x.endswith(".darshan") | x.endswith(".jsonl"): if additional_ftio_args is not None: - frequency, set = (extract(cmd_input=["ftio", x, additional_ftio_args], bandwidth_cutoff=bandwidth_cutoff)) + frequency, set = extract(cmd_input=["ftio", x, additional_ftio_args]) else: - frequency, set =(extract(cmd_input=["ftio", x], bandwidth_cutoff=bandwidth_cutoff)) + frequency, set = extract(cmd_input=["ftio", x]) # extract data from traces obtained from sdumont dataset, most certainly not an approach for any .csv file if x.endswith(".csv"): csv_file = pandas.read_csv(x) @@ -376,19 +419,18 @@ def predict_next_sequence(model_or_pthfile, file_or_directory, bandwidth_cutoff # min-max normalization min1, max1 = min(set)[0], max(set)[0] for u in range(len(set)): - set[u][0] = (set[u][0] - min1 ) / (max1 - min1 + 1e-8) - + set[u][0] = (set[u][0] - min1) / (max1 - min1 + 1e-8) # prediction - dataset = BandwidthDataset([set], num_parts=frequency) prediction_current = __predict_next_trace(model, dataset) predictions.append(prediction_current) return predictions + def spectral_loss(pred, target): - """ Basic spectral loss funcion based on one-dimensional fourier transform + """Basic spectral loss function based on one-dimensional fourier transform Args: pred: the predicted values target: the actual values @@ -398,8 +440,9 @@ def spectral_loss(pred, target): target_fft = torch.fft.rfft(target, dim=1) return F.mse_loss(torch.abs(pred_fft), torch.abs(target_fft)) + def __predict_next_trace(model, dataset): - """ Uses the provided hybrid model to predict the next bandwidth sequence. + """Uses the provided hybrid model to predict the next bandwidth sequence. Args: model: the hybrid model of this file @@ -432,20 +475,20 @@ def __predict_next_trace(model, dataset): mae = mae + abs(inner) mse = mse + (inner * inner) - n =future.size(dim=0) + n = future.size(dim=0) if n == 0: n = 1 mae = mae / n mse = mse / n - rmse =np.sqrt(mse) - + rmse = np.sqrt(mse) print("Predicted Bandwidth Trace:") print(pred_sequence) return prediction -def extract(cmd_input, msgs = None) -> list: - """ Extraction method leveraging frequency analysis for the initial data in dataframe form. And calculates the amount + +def extract(cmd_input, msgs=None) -> list: + """Extraction method leveraging frequency analysis for the initial data in dataframe form. And calculates the amount of expected partial patterns Args: @@ -453,7 +496,7 @@ def extract(cmd_input, msgs = None) -> list: msgs: ZMQ message (not used / no use intended yet) Returns: - list[ n , list[ bandwidth, ], ...] + list[ n , list[bandwidth], ...] """ # taken from get_time_behavior_and_args from extract.py data = Scales(cmd_input, msgs) @@ -470,7 +513,7 @@ def extract(cmd_input, msgs = None) -> list: prediction_dft, dfs_out, share = freq_analysis(args, sim) # Perform autocorrelation if args.autocorrelation is true + Merge the results into a single prediction - prediction_auto = find_autocorrelation(args, sim,dfs_out, share) + prediction_auto = find_autocorrelation(args, sim, dfs_out, share) # Merge results prediction = merge_predictions(args, prediction_dft, prediction_auto, dfs_out) @@ -485,11 +528,12 @@ def extract(cmd_input, msgs = None) -> list: # calculates the amount of partial patterns using the predicted dominant frequency of FTIO n = 3 if (prediction.dominant_freq.size != 0) and (prediction.dominant_freq[0] != 0): - n = int(prediction.t_end/(1 / prediction.dominant_freq[0])) - return n, result + n = int(prediction.t_end / (1 / prediction.dominant_freq[0])) + return [n, result] + -def train_arima(file_or_directory, max_depth = 3, model_architecture = "SARIMA"): - """ The entry point for training the ARIMA and SARIMA models +def train_arima(file_or_directory, max_depth=3, model_architecture="SARIMA"): + """The entry point for training the ARIMA and SARIMA models Args: file_or_directory: the file or directory to train the ARIMA model @@ -503,7 +547,11 @@ def train_arima(file_or_directory, max_depth = 3, model_architecture = "SARIMA") if os.path.isfile(file_or_directory): files.append(file_or_directory) elif os.path.isdir(file_or_directory): - files = [os.path.join(file_or_directory, f) for f in os.listdir(file_or_directory) if os.path.isfile(os.path.join(file_or_directory, f))] + files = [ + os.path.join(file_or_directory, f) + for f in os.listdir(file_or_directory) + if os.path.isfile(os.path.join(file_or_directory, f)) + ] # in the case of no supplied files or filepath if not files: @@ -528,16 +576,21 @@ def train_arima(file_or_directory, max_depth = 3, model_architecture = "SARIMA") max_val_initial = value[0] for value in sequence[1]: - value[0] = (value[0] - min_val_initial) / (max_val_initial - min_val_initial + 1e-8) + value[0] = (value[0] - min_val_initial) / ( + max_val_initial - min_val_initial + 1e-8 + ) # split data into trainings and comparison data - split_index = int( len(sequence[1])/sequence[0] )*(sequence[0]-1) + split_index = int(len(sequence[1]) / sequence[0]) * (sequence[0] - 1) trainings_part = sequence[1][:split_index] - future_part = sequence[1][split_index:] + future_part = sequence[1][split_index:] d = 0 # main loop to apply KPSS and ADF to find depth d for y in range(1): - partial_sequence = pd.Series([x[y] for x in trainings_part], pd.Index(np.arange(len([x[y] for x in trainings_part]), dtype="int64" ))) + partial_sequence = pd.Series( + [x[y] for x in trainings_part], + pd.Index(np.arange(len([x[y] for x in trainings_part]), dtype="int64")), + ) try: kps = kpss(partial_sequence.dropna())[1] @@ -574,9 +627,13 @@ def train_arima(file_or_directory, max_depth = 3, model_architecture = "SARIMA") for q in range(8): try: if model_architecture == "SARIMA": - model = SARIMAX(partial_sequence, order = (p,d,q), seasonal_order=(0, 0, 0, len(future_part) )) + model = SARIMAX( + partial_sequence, + order=(p, d, q), + seasonal_order=(0, 0, 0, len(future_part)), + ) else: - model = ARIMA(partial_sequence, order = (p,d,q)) + model = ARIMA(partial_sequence, order=(p, d, q)) model = model.fit() # application of the AIC @@ -584,7 +641,7 @@ def train_arima(file_or_directory, max_depth = 3, model_architecture = "SARIMA") if aic < best_aic: final_model = model best_aic = aic - print("p : d : q", (p,d,q)) + print("p : d : q", (p, d, q)) # some variations will throw exceptions and warnings, this will exclude them except: continue diff --git a/machine_learning_models/machine_learning_models_documentation.md b/machine_learning_models/machine_learning_models_documentation.md new file mode 100644 index 00000000..a0580451 --- /dev/null +++ b/machine_learning_models/machine_learning_models_documentation.md @@ -0,0 +1,43 @@ +# Machine-Learning Models Documentation + +## General Usage +[Hybrid Model] + +The following example shows the high-level entry to training and forecasting using the function train_hybrid_model() of the hybrid-model. +```python + file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") + model = train_hybrid_model(file, epochs=10, lr=0.003) + prediction = predict_next_sequence(model, file) +``` +The function train_hybrid_model() also has parameters with standard values for the underlying structure of the model which can be changed. +In this example only the embedded dimension is changed, but there are also parameters for the attention heads, the feed-forward dimension etc. +Common values such as 2^n are usually the most effective variations to explore. +```python + file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") + model = train_hybrid_model(file, epochs=10, lr=0.003, emb_dim = 256) + prediction = predict_next_sequence(model, file) +``` +The training of the hybrid-model can be resumed by loading a .pth file created by the saving process. +It contains the parameters of the model and the state of the used optimizer. +```python + file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") + model = train_hybrid_model(file, epochs=10, lr=0.003, save=True) + model = train_hybrid_model( + file, + epochs=10, + lr=0.003, + load_state_dict_and_optimizer_state="model_and_optimizer.pth", + ) + prediction = predict_next_sequence(model, file) +``` +[(S)ARIMA] + +The following example shows the high-level entry to training and forecasting using the train_arima() function of the ARIMA/SARIMA models. +By changing the model_architecture parameter SARIMA or ARIMA can be selected. The max_depth is recommended to be relatively small, +since it's defining the maximum depth of differentations of the underlying data to reach stationarity. +A resumption of training is inherently not supported by the underlying model structure. Therefore, if new data is available, then +training from the beginning is the only option. +```python + file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") + prediction = train_arima(file, max_depth = 3, model_architecture="ARIMA") +``` diff --git a/machine_learning_models/test_cases.py b/machine_learning_models/test_cases.py new file mode 100644 index 00000000..e68c1600 --- /dev/null +++ b/machine_learning_models/test_cases.py @@ -0,0 +1,70 @@ +from machine_learning_models import * + +""" +Tests for the active workflow (and application) of the implemented models, dataset and functions. +""" + + +def test_hybrid_model(): + """ + Tests the training and prediction of the hybrid-model. + """ + file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") + model = train_hybrid_model(file, epochs=10, lr=0.003) + prediction = predict_next_sequence(model, file) + assert True + + +def test_hybrid_model_resume_training(): + """ + Tests the saving of the hybrid model's checkpoint and resuming of training of the model. + """ + file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") + model = train_hybrid_model(file, epochs=10, lr=0.003, save=True) + model = train_hybrid_model( + file, + epochs=10, + lr=0.003, + load_state_dict_and_optimizer_state="model_and_optimizer.pth", + ) + prediction = predict_next_sequence(model, file) + assert True + + +def test_extract(): + """ + Tests the extract functionality when providing FTIO arguments. + """ + file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") + args = ["ftio", file] + n, data = extract(args) + assert True + + +def test_dataset(): + """ + Tests the correct initialization of the dataset. + """ + file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") + args = ["ftio", file] + n, data = extract(args) + dataset = BandwidthDataset([data], num_parts=n) + assert True + + +def test_arima_model(): + """ + Tests the training and prediction of the ARIMA model. + """ + file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") + prediction = train_arima(file, model_architecture="ARIMA") + assert True + + +def test_sarima_model(): + """ + Tests the training and prediction of the SARIMA model. + """ + file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") + prediction = train_arima(file, model_architecture="SARIMA") + assert True From fcc0ef18bb624e742d2f09f9b2a617b34158b5c5 Mon Sep 17 00:00:00 2001 From: raIIe Date: Fri, 13 Feb 2026 03:48:36 +0100 Subject: [PATCH 3/7] Refactored files into directories Refactored files into directories --- .../machine_learning_models_documentation.md | 0 .../machine_learning_models}/machine_learning_models.py | 0 .../test_cases.py => test/test_machine-learning-models.py | 0 3 files changed, 0 insertions(+), 0 deletions(-) rename {machine_learning_models => docs}/machine_learning_models_documentation.md (100%) rename {machine_learning_models => ftio/machine_learning_models}/machine_learning_models.py (100%) rename machine_learning_models/test_cases.py => test/test_machine-learning-models.py (100%) diff --git a/machine_learning_models/machine_learning_models_documentation.md b/docs/machine_learning_models_documentation.md similarity index 100% rename from machine_learning_models/machine_learning_models_documentation.md rename to docs/machine_learning_models_documentation.md diff --git a/machine_learning_models/machine_learning_models.py b/ftio/machine_learning_models/machine_learning_models.py similarity index 100% rename from machine_learning_models/machine_learning_models.py rename to ftio/machine_learning_models/machine_learning_models.py diff --git a/machine_learning_models/test_cases.py b/test/test_machine-learning-models.py similarity index 100% rename from machine_learning_models/test_cases.py rename to test/test_machine-learning-models.py From 6d4eb8c0fb813c284ed864d0dc1b64a509579361 Mon Sep 17 00:00:00 2001 From: raIIe Date: Fri, 13 Feb 2026 04:10:04 +0100 Subject: [PATCH 4/7] changed broken import changed broken import to proper directory --- test/test_machine-learning-models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_machine-learning-models.py b/test/test_machine-learning-models.py index e68c1600..5247a1fc 100644 --- a/test/test_machine-learning-models.py +++ b/test/test_machine-learning-models.py @@ -1,4 +1,4 @@ -from machine_learning_models import * +from ftio.machine_learning_models.machine_learning_models import * """ Tests for the active workflow (and application) of the implemented models, dataset and functions. From c53475d07d2b77c13d9ff8f8867452c5622a5001 Mon Sep 17 00:00:00 2001 From: raIIe Date: Fri, 13 Feb 2026 04:23:22 +0100 Subject: [PATCH 5/7] removed trailing widespaces removed trailing widespaces for style --- .../machine_learning_models.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/ftio/machine_learning_models/machine_learning_models.py b/ftio/machine_learning_models/machine_learning_models.py index 22f782e9..bc098303 100644 --- a/ftio/machine_learning_models/machine_learning_models.py +++ b/ftio/machine_learning_models/machine_learning_models.py @@ -20,16 +20,16 @@ from ftio.prediction.unify_predictions import merge_predictions """ -Example Description: -This module contains the entry points for extracting data for training and prediction purpose, +Example Description: +This module contains the entry points for extracting data for training and prediction purpose, the hybrid-model, the ARIMA and SARIMA models and their training and forecasting functions. As utility it contains a custom dataset intended for the hybrid-model but was initially planned for general purpose use. -Author: Robert Alles -Copyright (c) 2025 TU Darmstadt, Germany +Author: Robert Alles +Copyright (c) 2025 TU Darmstadt, Germany Date: -Licensed under the BSD 3-Clause License. +Licensed under the BSD 3-Clause License. For more information, see the LICENSE file in the project root: https://github.com/tuda-parallel/FTIO/blob/main/LICENSE """ @@ -470,7 +470,7 @@ def __predict_next_trace(model, dataset): # calculate MAE, MSE and RMSE mae = 0 mse = 0 - for actual, predicted in zip(future, pred_sequence): + for actual, predicted in zip(future, pred_sequence, strict=False): inner = actual.item() - predicted[0] mae = mae + abs(inner) mse = mse + (inner * inner) @@ -480,7 +480,6 @@ def __predict_next_trace(model, dataset): n = 1 mae = mae / n mse = mse / n - rmse = np.sqrt(mse) print("Predicted Bandwidth Trace:") print(pred_sequence) From e223dd5d23eae141280d3c2fdc3eb3291646ca15 Mon Sep 17 00:00:00 2001 From: Ahmad Tarraf Date: Mon, 16 Feb 2026 19:05:52 +0100 Subject: [PATCH 6/7] fixed pipeline --- Makefile | 2 +- docs/machine_learning_models_documentation.md | 43 ------------ docs/ml_models.md | 60 +++++++++++++++++ ftio/freq/_dft_workflow.py | 2 +- .../models_and_training.py} | 67 +++++++++++-------- ftio/parse/args.py | 8 +++ ftio/plot/dash_files/dash_app.py | 3 +- pyproject.toml | 5 ++ ...e-learning-models.py => test_ml_models.py} | 41 +++++++++--- 9 files changed, 145 insertions(+), 86 deletions(-) delete mode 100644 docs/machine_learning_models_documentation.md create mode 100644 docs/ml_models.md rename ftio/{machine_learning_models/machine_learning_models.py => ml/models_and_training.py} (93%) rename test/{test_machine-learning-models.py => test_ml_models.py} (61%) diff --git a/Makefile b/Makefile index 717c5924..9acb720e 100644 --- a/Makefile +++ b/Makefile @@ -54,7 +54,7 @@ ftio: $(PYTHON) -m pip install . ftio_full: - $(PYTHON) -m pip install '.[external-libs,development-libs,plot-libs]' + $(PYTHON) -m pip install '.[external-libs,development-libs,plot-libs,ml-libs]' venv: $(PYTHON) -m venv .venv @echo -e "Environment created. Using python from .venv/bin/python3" diff --git a/docs/machine_learning_models_documentation.md b/docs/machine_learning_models_documentation.md deleted file mode 100644 index a0580451..00000000 --- a/docs/machine_learning_models_documentation.md +++ /dev/null @@ -1,43 +0,0 @@ -# Machine-Learning Models Documentation - -## General Usage -[Hybrid Model] - -The following example shows the high-level entry to training and forecasting using the function train_hybrid_model() of the hybrid-model. -```python - file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") - model = train_hybrid_model(file, epochs=10, lr=0.003) - prediction = predict_next_sequence(model, file) -``` -The function train_hybrid_model() also has parameters with standard values for the underlying structure of the model which can be changed. -In this example only the embedded dimension is changed, but there are also parameters for the attention heads, the feed-forward dimension etc. -Common values such as 2^n are usually the most effective variations to explore. -```python - file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") - model = train_hybrid_model(file, epochs=10, lr=0.003, emb_dim = 256) - prediction = predict_next_sequence(model, file) -``` -The training of the hybrid-model can be resumed by loading a .pth file created by the saving process. -It contains the parameters of the model and the state of the used optimizer. -```python - file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") - model = train_hybrid_model(file, epochs=10, lr=0.003, save=True) - model = train_hybrid_model( - file, - epochs=10, - lr=0.003, - load_state_dict_and_optimizer_state="model_and_optimizer.pth", - ) - prediction = predict_next_sequence(model, file) -``` -[(S)ARIMA] - -The following example shows the high-level entry to training and forecasting using the train_arima() function of the ARIMA/SARIMA models. -By changing the model_architecture parameter SARIMA or ARIMA can be selected. The max_depth is recommended to be relatively small, -since it's defining the maximum depth of differentations of the underlying data to reach stationarity. -A resumption of training is inherently not supported by the underlying model structure. Therefore, if new data is available, then -training from the beginning is the only option. -```python - file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") - prediction = train_arima(file, max_depth = 3, model_architecture="ARIMA") -``` diff --git a/docs/ml_models.md b/docs/ml_models.md new file mode 100644 index 00000000..e3922e9d --- /dev/null +++ b/docs/ml_models.md @@ -0,0 +1,60 @@ +# Machine-Learning Models Documentation + +## Prerequisites + +Install the packages needed (`pip install '.[external-libs,development-libs,ml-libs]'`) + +## General Usage + +[Hybrid Model] +The following example shows the high-level entry to training and forecasting using the function train_hybrid_model() of +the hybrid-model. + +```python +file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") +model = train_hybrid_model(file, epochs=10, lr=0.003) +prediction = predict_next_sequence(model, file) +``` + +The function train_hybrid_model() also has parameters with standard values for the underlying structure of the model +which can be changed. +In this example only the embedded dimension is changed, but there are also parameters for the attention heads, the +feed-forward dimension etc. +Common values such as 2^n are usually the most effective variations to explore. + +```python +file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") +model = train_hybrid_model(file, epochs=10, lr=0.003, emb_dim=256) +prediction = predict_next_sequence(model, file) +``` + +The training of the hybrid-model can be resumed by loading a .pth file created by the saving process. +It contains the parameters of the model and the state of the used optimizer. + +```python +file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") +model = train_hybrid_model(file, epochs=10, lr=0.003, save=True) +model = train_hybrid_model( + file, + epochs=10, + lr=0.003, + load_state_dict_and_optimizer_state="model_and_optimizer.pth", +) +prediction = predict_next_sequence(model, file) +``` + +[(S)ARIMA] + +The following example shows the high-level entry to training and forecasting using the train_arima() function of the +ARIMA/SARIMA models. +By changing the model_architecture parameter, SARIMA or ARIMA can be selected. The max_depth is recommended to be +relatively small, +since it's defining the maximum depth of differentations of the underlying data to reach stationarity. +A resumption of training is inherently not supported by the underlying model structure. Therefore, if new data is +available, then +training from the beginning is the only option. + +```python +file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") +prediction = train_arima(file, max_depth=3, model_architecture="ARIMA") +``` diff --git a/ftio/freq/_dft_workflow.py b/ftio/freq/_dft_workflow.py index 9a350e4d..2d576fb9 100644 --- a/ftio/freq/_dft_workflow.py +++ b/ftio/freq/_dft_workflow.py @@ -156,7 +156,7 @@ def ftio_dft( plot_dft(args, prediction, analysis_figures) console.print(" --- Done --- \n") - if args.autocorrelation: + if args.autocorrelation or args.machine_learning: share.set_data_from_predicition(b_sampled, prediction) precision_text = "" diff --git a/ftio/machine_learning_models/machine_learning_models.py b/ftio/ml/models_and_training.py similarity index 93% rename from ftio/machine_learning_models/machine_learning_models.py rename to ftio/ml/models_and_training.py index bc098303..79832095 100644 --- a/ftio/machine_learning_models/machine_learning_models.py +++ b/ftio/ml/models_and_training.py @@ -1,22 +1,29 @@ +import importlib.util import os from math import inf import numpy as np import pandas import pandas as pd -import torch -import torch.nn as nn -import torch.nn.functional as F -import torch.optim as optim -from statsmodels.tsa.arima.model import ARIMA -from statsmodels.tsa.statespace.sarimax import SARIMAX -from statsmodels.tsa.stattools import adfuller, kpss -from torch.utils.data import Dataset + +TORCH_AVAILABLE = importlib.util.find_spec("torch") is not None +if TORCH_AVAILABLE: + import torch + import torch.nn as nn + import torch.nn.functional as F + import torch.optim as optim + from statsmodels.tsa.arima.model import ARIMA + from statsmodels.tsa.statespace.sarimax import SARIMAX + from statsmodels.tsa.stattools import adfuller, kpss + from torch.utils.data import Dataset +else: + raise RuntimeError( + "Tocrch module not found. Please install it using 'make full' or 'pip install ftio[ml-libs]'." + ) from ftio.cli.ftio_core import freq_analysis from ftio.freq.autocorrelation import find_autocorrelation -from ftio.parse.extract import get_time_behavior -from ftio.parse.scales import Scales +from ftio.parse.extract import get_time_behavior_and_args from ftio.prediction.unify_predictions import merge_predictions """ @@ -26,8 +33,8 @@ As utility it contains a custom dataset intended for the hybrid-model but was initially planned for general purpose use. Author: Robert Alles -Copyright (c) 2025 TU Darmstadt, Germany -Date: +Copyright (c) 2026 TU Darmstadt, Germany +Date: January 2026 Licensed under the BSD 3-Clause License. For more information, see the LICENSE file in the project root: @@ -83,7 +90,6 @@ def __init__(self, data_list, num_parts=3): self.num_parts = 3 for seq in data_list: - seq = torch.tensor(seq, dtype=torch.float32) bandwidth = seq[:, 0].unsqueeze(-1) @@ -237,10 +243,11 @@ def train_hybrid_model( frequency = [] # extraction of data through frequency analysis for darshan & json if x.endswith(".darshan") | x.endswith(".jsonl"): + cmd_input = ["ftio", x] if additional_ftio_args is not None: - frequency, set = extract(cmd_input=["ftio", x, additional_ftio_args]) - else: - frequency, set = extract(cmd_input=["ftio", x]) + cmd_input += additional_ftio_args + print(cmd_input) + frequency, set = extract(cmd_input=cmd_input) # extraction from .csv files of the sdumont traces ; not a general method to extract from any .csv file if x.endswith(".csv"): @@ -401,10 +408,10 @@ def predict_next_sequence( frequency = [] # extraction of data through frequency analysis for darshan & json if x.endswith(".darshan") | x.endswith(".jsonl"): + cmd_input = ["ftio", x] if additional_ftio_args is not None: - frequency, set = extract(cmd_input=["ftio", x, additional_ftio_args]) - else: - frequency, set = extract(cmd_input=["ftio", x]) + cmd_input += additional_ftio_args + frequency, set = extract(cmd_input=cmd_input) # extract data from traces obtained from sdumont dataset, most certainly not an approach for any .csv file if x.endswith(".csv"): csv_file = pandas.read_csv(x) @@ -498,12 +505,9 @@ def extract(cmd_input, msgs=None) -> list: list[ n , list[bandwidth], ...] """ # taken from get_time_behavior_and_args from extract.py - data = Scales(cmd_input, msgs) - args = data.args - df = data.get_io_mode(args.mode) - data = get_time_behavior(df) - + data, args = get_time_behavior_and_args(cmd_input, msgs) dfs_out = [[], [], [], []] + args.machine_learning = True prediction = None # taken from ftio_core.py's main for sim in data: @@ -517,7 +521,8 @@ def extract(cmd_input, msgs=None) -> list: prediction = merge_predictions(args, prediction_dft, prediction_auto, dfs_out) # extraction of the relevant data from the dataframes - b_sampled = dfs_out.b_sampled.tolist() + # b_sampled = dfs_out.b_sampled.tolist() + b_sampled = share.get("b_sampled") result = [] @@ -531,13 +536,16 @@ def extract(cmd_input, msgs=None) -> list: return [n, result] -def train_arima(file_or_directory, max_depth=3, model_architecture="SARIMA"): +def train_arima( + file_or_directory, max_depth=3, model_architecture="SARIMA", additional_ftio_args=None +): """The entry point for training the ARIMA and SARIMA models Args: file_or_directory: the file or directory to train the ARIMA model max_depth: the maximum depth of the ARIMA model model_architecture: choose between SARIMA or ARIMA architecture + additional_ftio_args: additional supported ftio arguments that aren't the initial ftio call and files """ # checks if file_or_path is either just a singular file OR a path to a directory with files @@ -561,11 +569,13 @@ def train_arima(file_or_directory, max_depth=3, model_architecture="SARIMA"): predictions = [] sequences = [] for file in files: - n, bandwidth_sequence = extract(cmd_input=["ftio", file]) + cmd_input = ["ftio", file] + if additional_ftio_args is not None: + cmd_input += additional_ftio_args + n, bandwidth_sequence = extract(cmd_input=cmd_input) sequences.append([n, bandwidth_sequence]) for sequence in sequences: - # initial min-max normalization min_val_initial, max_val_initial = inf, -inf for value in sequence[1]: @@ -602,7 +612,6 @@ def train_arima(file_or_directory, max_depth=3, model_architecture="SARIMA"): adf = 1.0 while d is not max_depth and kps > 0.05 > adf: - partial_sequence = partial_sequence.diff() d = d + 1 diff --git a/ftio/parse/args.py b/ftio/parse/args.py index 38a68cb9..225d6cf2 100644 --- a/ftio/parse/args.py +++ b/ftio/parse/args.py @@ -250,6 +250,14 @@ def parse_args(argv: list, name="") -> argparse.Namespace: help="if set, autocorrelation is calculated in addition to DFT. The results are merged to a single prediction at the end", ) parser.set_defaults(autocorrelation=False) + parser.add_argument( + "-ml", + "--machine_learning", + dest="machine_learning", + action="store_true", + help="if set, machine learning is enabled (api call only)", + ) + parser.set_defaults(machine_learning=False) parser.add_argument( "-w", "--window_adaptation", diff --git a/ftio/plot/dash_files/dash_app.py b/ftio/plot/dash_files/dash_app.py index dbbeba77..76bfc94a 100644 --- a/ftio/plot/dash_files/dash_app.py +++ b/ftio/plot/dash_files/dash_app.py @@ -9,7 +9,6 @@ """ import importlib.util -import sys import ftio.plot.dash_files.constants.id as id import ftio.plot.dash_files.constants.io_mode as io_mode @@ -17,7 +16,7 @@ DASH_AVAILABLE = importlib.util.find_spec("dash") is not None if not DASH_AVAILABLE: - sys.exit( + raise RuntimeError( "Dash module not found. Please install it using 'make full' or 'pip install dash dash-extensions'." ) else: diff --git a/pyproject.toml b/pyproject.toml index df3a5fe8..8ec2ce94 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -127,6 +127,11 @@ plot-libs = [ "trace_updater", ] +ml-libs = [ + "torch", + "statsmodels" +] + [tool.setuptools.dynamic] version = { attr = "ftio.__version__" } readme = { file = ["README.md"] } diff --git a/test/test_machine-learning-models.py b/test/test_ml_models.py similarity index 61% rename from test/test_machine-learning-models.py rename to test/test_ml_models.py index 5247a1fc..46888534 100644 --- a/test/test_machine-learning-models.py +++ b/test/test_ml_models.py @@ -1,4 +1,18 @@ -from ftio.machine_learning_models.machine_learning_models import * +import importlib.util +import os + +import pytest + +if importlib.util.find_spec("torch") is None: + pytest.skip("Torch not available, skipping ML tests.", allow_module_level=True) + +from ftio.ml.models_and_training import ( + BandwidthDataset, + extract, + predict_next_sequence, + train_arima, + train_hybrid_model, +) """ Tests for the active workflow (and application) of the implemented models, dataset and functions. @@ -10,8 +24,10 @@ def test_hybrid_model(): Tests the training and prediction of the hybrid-model. """ file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") - model = train_hybrid_model(file, epochs=10, lr=0.003) - prediction = predict_next_sequence(model, file) + model = train_hybrid_model( + file, epochs=10, lr=0.003, additional_ftio_args=["-e", "no"] + ) + _ = predict_next_sequence(model, file) assert True @@ -20,14 +36,17 @@ def test_hybrid_model_resume_training(): Tests the saving of the hybrid model's checkpoint and resuming of training of the model. """ file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") - model = train_hybrid_model(file, epochs=10, lr=0.003, save=True) + model = train_hybrid_model( + file, epochs=10, lr=0.003, save=True, additional_ftio_args=["-e", "no"] + ) model = train_hybrid_model( file, epochs=10, lr=0.003, load_state_dict_and_optimizer_state="model_and_optimizer.pth", + additional_ftio_args=["-e", "no"], ) - prediction = predict_next_sequence(model, file) + _ = predict_next_sequence(model, file) assert True @@ -36,8 +55,10 @@ def test_extract(): Tests the extract functionality when providing FTIO arguments. """ file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") - args = ["ftio", file] + args = ["ftio", file, "-e", "no"] n, data = extract(args) + print(data) + print(n) assert True @@ -46,9 +67,9 @@ def test_dataset(): Tests the correct initialization of the dataset. """ file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") - args = ["ftio", file] + args = ["ftio", file, "-e", "no"] n, data = extract(args) - dataset = BandwidthDataset([data], num_parts=n) + _ = BandwidthDataset([data], num_parts=n) assert True @@ -57,7 +78,7 @@ def test_arima_model(): Tests the training and prediction of the ARIMA model. """ file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") - prediction = train_arima(file, model_architecture="ARIMA") + _ = train_arima(file, model_architecture="ARIMA", additional_ftio_args=["-e", "no"]) assert True @@ -66,5 +87,5 @@ def test_sarima_model(): Tests the training and prediction of the SARIMA model. """ file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") - prediction = train_arima(file, model_architecture="SARIMA") + _ = train_arima(file, model_architecture="SARIMA", additional_ftio_args=["-e", "no"]) assert True From c1e51f03d665b741199a740920a424e1ac8255a2 Mon Sep 17 00:00:00 2001 From: Ahmad Tarraf Date: Tue, 17 Feb 2026 10:34:57 +0100 Subject: [PATCH 7/7] FTIO: Improved frequency analysis and remove SharedSignalData class - Updated frequency analysis functions to eliminate the use of SharedSignalData, simplifying data handling. - Adjusted function signatures and return values to reflect changes. - Enhanced Makefile for better installation options and added parallel test execution. - Updated tests to align with changes in prediction handling. --- Makefile | 12 ++- ftio/cli/ftio_core.py | 34 +++----- ftio/freq/_dft_workflow.py | 9 +-- ftio/freq/_share_signal_data.py | 115 ---------------------------- ftio/freq/_wavelet_cont_workflow.py | 6 +- ftio/freq/_wavelet_disc_workflow.py | 10 +-- ftio/freq/autocorrelation.py | 22 +++--- ftio/freq/prediction.py | 13 +++- ftio/ml/models_and_training.py | 44 +++-------- pyproject.toml | 5 ++ test/test_ml_models.py | 8 +- 11 files changed, 73 insertions(+), 205 deletions(-) delete mode 100644 ftio/freq/_share_signal_data.py diff --git a/Makefile b/Makefile index 9acb720e..55394487 100644 --- a/Makefile +++ b/Makefile @@ -32,7 +32,7 @@ all: install install: clean venv ftio_venv msg # Installs with external dependencies -full: clean venv ftio_venv_full msg +full: venv ftio_venv_full msg # Installs debug version external dependencies debug: venv ftio_debug_venv msg @@ -54,7 +54,8 @@ ftio: $(PYTHON) -m pip install . ftio_full: - $(PYTHON) -m pip install '.[external-libs,development-libs,plot-libs,ml-libs]' + $(PYTHON) -m pip install -e '.[external-libs,development-libs,plot-libs,ml-libs]' --no-cache-dir || \ + (echo "Installing external libs failed, trying fallback..." && $(PYTHON) -m pip install -e . --no-cache-dir) venv: $(PYTHON) -m venv .venv @echo -e "Environment created. Using python from .venv/bin/python3" @@ -114,6 +115,13 @@ test_all: test: cd test && python3 -m pytest && make clean +test_parallel: + @python3 -m pip show pytest-xdist > /dev/null 2>&1 || python3 -m pip install pytest-xdist + cd test && python3 -m pytest -n 4 && make clean + +test_failed: + cd test && python3 -m pytest --ff && make clean + check_style: check_tools black . ruff check --fix diff --git a/ftio/cli/ftio_core.py b/ftio/cli/ftio_core.py index 7b70ccdf..f2f2c5ce 100644 --- a/ftio/cli/ftio_core.py +++ b/ftio/cli/ftio_core.py @@ -26,7 +26,6 @@ from ftio.freq._analysis_figures import AnalysisFigures from ftio.freq._dft_workflow import ftio_dft -from ftio.freq._share_signal_data import SharedSignalData from ftio.freq._wavelet_cont_workflow import ftio_wavelet_cont from ftio.freq._wavelet_disc_workflow import ftio_wavelet_disc from ftio.freq.autocorrelation import find_autocorrelation @@ -119,9 +118,11 @@ def core(sim: dict, args: Namespace) -> tuple[Prediction, AnalysisFigures]: return Prediction(), AnalysisFigures() # Perform frequency analysis (dft/wavelet) - prediction_freq_analysis, analysis_figures, share = freq_analysis(args, sim) + prediction_freq_analysis, analysis_figures = freq_analysis(args, sim) # Perform autocorrelation if args.autocorrelation is true + Merge the results into a single prediction - prediction_auto = find_autocorrelation(args, sim, analysis_figures, share) + prediction_auto = find_autocorrelation( + args, sim, analysis_figures, prediction_freq_analysis + ) # Merge results prediction = merge_predictions( args, prediction_freq_analysis, prediction_auto, analysis_figures @@ -130,9 +131,7 @@ def core(sim: dict, args: Namespace) -> tuple[Prediction, AnalysisFigures]: return prediction, analysis_figures -def freq_analysis( - args: Namespace, data: dict -) -> tuple[Prediction, AnalysisFigures, SharedSignalData]: +def freq_analysis(args: Namespace, data: dict) -> tuple[Prediction, AnalysisFigures]: """ Performs frequency analysis (DFT, continuous wavelet, or discrete wavelet) and prepares data for plotting. @@ -153,20 +152,13 @@ def freq_analysis( Returns: tuple: A tuple containing: - - Prediction: Contains the prediction results, including: + - Prediction: Contains the prediction results, including - "dominant_freq" (list): The identified dominant frequencies. - "conf" (np.ndarray): Confidence values corresponding to the dominant frequencies. - "t_start" (int): Start time of the analysis. - "t_end" (int): End time of the analysis. - "total_bytes" (int): Total bytes involved in the analysis. - AnalysisFigures - - SharedSignalData: Contains sampled data used for sharing (e.g., autocorrelation) containing - the following fields: - - "b_sampled" (np.ndarray): The sampled bandwidth data. - - "freq" (np.ndarray): Frequencies corresponding to the sampled data. - - "t_start" (int): Start time of the sampled data. - - "t_end" (int): End time of the sampled data. - - "total_bytes" (int): Total bytes from the sampled data. """ #! Init @@ -182,19 +174,17 @@ def freq_analysis( #! Perform transformation if "dft" in args.transformation: - prediction, analysis_figures, share = ftio_dft( + prediction, analysis_figures = ftio_dft( args, bandwidth, time_b, total_bytes, ranks, text ) elif "wave_disc" in args.transformation: - prediction, analysis_figures, share = ftio_wavelet_disc( + prediction, analysis_figures = ftio_wavelet_disc( args, bandwidth, time_b, ranks, total_bytes ) elif "wave_cont" in args.transformation: - prediction, analysis_figures, share = ftio_wavelet_cont( - args, bandwidth, time_b, ranks - ) + prediction, analysis_figures = ftio_wavelet_cont(args, bandwidth, time_b, ranks) elif any(t in args.transformation for t in ("astft", "efd", "vmd")): # TODO: add a way to pass the results to FTIO @@ -211,7 +201,7 @@ def freq_analysis( from ftio.freq._astft_workflow import ftio_astft - prediction, analysis_figures, share = ftio_astft( + prediction, analysis_figures = ftio_astft( args, bandwidth, time_b, total_bytes, ranks, text ) sys.exit() @@ -221,7 +211,7 @@ def freq_analysis( from ftio.freq._amd_workflow import ftio_amd - prediction, analysis_figures, share = ftio_amd( + prediction, analysis_figures = ftio_amd( args, bandwidth, time_b, total_bytes, ranks, text ) sys.exit() @@ -229,7 +219,7 @@ def freq_analysis( else: raise Exception("Unsupported decomposition specified") - return prediction, analysis_figures, share + return prediction, analysis_figures def run(): diff --git a/ftio/freq/_dft_workflow.py b/ftio/freq/_dft_workflow.py index 2d576fb9..f10377b1 100644 --- a/ftio/freq/_dft_workflow.py +++ b/ftio/freq/_dft_workflow.py @@ -21,7 +21,6 @@ from ftio.freq._dft import dft from ftio.freq._filter import filter_signal from ftio.freq._fourier_fit import fourier_fit -from ftio.freq._share_signal_data import SharedSignalData from ftio.freq.discretize import sample_data from ftio.freq.helper import MyConsole from ftio.freq.prediction import Prediction @@ -35,7 +34,7 @@ def ftio_dft( total_bytes: int = 0, ranks: int = 1, text: str = "", -) -> tuple[Prediction, AnalysisFigures, SharedSignalData]: +) -> tuple[Prediction, AnalysisFigures]: """ Performs a Discrete Fourier Transform (DFT) on the sampled bandwidth data, finds the dominant frequency, followed by outlier detection to spot the dominant frequency. This function also prepares the necessary outputs for plotting or reporting. @@ -52,10 +51,8 @@ def ftio_dft( tuple: - prediction (Prediction): Contains prediction results including dominant frequency, confidence, amplitude, etc. - analysis_figures (AnalysisFigures): Data and plot figures. - - share (SharedSignalData): Contains shared information, including sampled bandwidth and total bytes. """ #! Default values for variables - share = SharedSignalData() prediction = Prediction(args.transformation) analysis_figures = AnalysisFigures(args) console = MyConsole(verbose=args.verbose) @@ -157,7 +154,7 @@ def ftio_dft( console.print(" --- Done --- \n") if args.autocorrelation or args.machine_learning: - share.set_data_from_predicition(b_sampled, prediction) + prediction.b_sampled = b_sampled precision_text = "" # precision_text = precision_dft(amp, phi, dominant_index, b_sampled, t_sampled, frequencies, args.engine) @@ -175,4 +172,4 @@ def ftio_dft( console.print( f"\n[cyan]{args.transformation.upper()} + {args.outlier} finished:[/] {time.time() - tik:.3f} s" ) - return prediction, analysis_figures, share + return prediction, analysis_figures diff --git a/ftio/freq/_share_signal_data.py b/ftio/freq/_share_signal_data.py deleted file mode 100644 index 132d6e33..00000000 --- a/ftio/freq/_share_signal_data.py +++ /dev/null @@ -1,115 +0,0 @@ -""" -This module provides the SharedSignalData class to store and manage -common signal analysis data shared between modules such as -autocorrelation and discrete Fourier transform (DFT). - -Author: Ahmad Tarraf -Copyright (c) 2026 TU Darmstadt, Germany -Version: v0.0.7 -Date: May 2025 -Licensed under the BSD 3-Clause License. -For more information, see the LICENSE file in the project root: -https://github.com/tuda-parallel/FTIO/blob/main/LICENSE""" - -from ftio.freq.prediction import Prediction - - -class SharedSignalData: - """ - A container class for sharing signal analysis data (e.g., between autocorrelation and DFT modules). - - Attributes: - data (dict): Internal dictionary storing shared signal parameters. - """ - - def __init__(self): - self.data = {} - - def set_data( - self, - b_sampled=None, - freq=None, - t_start=None, - t_end=None, - total_bytes=None, - ): - """ - Set shared signal analysis data. - - Args: - b_sampled (array-like, optional): Sampled bandwidth or signal data. - freq (float, optional): Frequency value used in analysis. - t_start (float, optional): Start time of the signal. - t_end (float, optional): End time of the signal. - total_bytes (int, optional): Total number of bytes processed or analyzed. - """ - self.data = {} - - if b_sampled is not None: - self.data["b_sampled"] = b_sampled - if freq is not None: - self.data["freq"] = freq - if t_start is not None: - self.data["t_start"] = t_start - if t_end is not None: - self.data["t_end"] = t_end - if total_bytes is not None: - self.data["total_bytes"] = total_bytes - - def set_data_from_predicition(self, b_sampled, prediction: Prediction): - self.data["b_sampled"] = b_sampled - self.data["freq"] = prediction.freq - self.data["t_start"] = prediction.t_start - self.data["t_end"] = prediction.t_end - self.data["total_bytes"] = prediction.total_bytes - - def get_data(self): - """ - Get the shared data dictionary. - - Returns: - dict: The internal data dictionary containing signal parameters. - """ - return self.data - - def get(self, key): - """ - Get a specific field from the shared data. - - Args: - key (str): The key to look for in the data dictionary. - - Returns: - The value corresponding to the key if present, else None. - """ - return self.data.get(key, None) - - def is_empty(self): - """ - Check whether the shared data dictionary is empty. - - Returns: - bool: True if no data has been set, False otherwise. - """ - return len(self.data) == 0 - - def has_key(self, key): - """ - Check if a specific key exists in the shared data. - - Args: - key (str): The key to look for. - - Returns: - bool: True if the key exists, False otherwise. - """ - return key in self.data - - def __str__(self): - """ - Return a string summary of the current shared data. - - Returns: - str: A string showing which keys are currently set. - """ - return f"SharedSignalData with keys: {list(self.data.keys()) if self.data else 'None'}" diff --git a/ftio/freq/_wavelet_cont_workflow.py b/ftio/freq/_wavelet_cont_workflow.py index 1d2fd63d..d84c783d 100644 --- a/ftio/freq/_wavelet_cont_workflow.py +++ b/ftio/freq/_wavelet_cont_workflow.py @@ -17,7 +17,6 @@ from ftio.freq._analysis_figures import AnalysisFigures from ftio.freq._dft_workflow import ftio_dft -from ftio.freq._share_signal_data import SharedSignalData from ftio.freq._wavelet import wavelet_cont from ftio.freq._wavelet_helpers import get_scales from ftio.freq.discretize import sample_data @@ -49,7 +48,6 @@ def ftio_wavelet_cont( ranks (int): The rank value (default is 0). """ #! Default values for variables - share = SharedSignalData() prediction = Prediction(args.transformation) console = MyConsole(verbose=args.verbose) @@ -90,7 +88,7 @@ def ftio_wavelet_cont( use_dominant_only = False scales = [] t_sampled = time_stamps[0] + np.arange(0, len(b_sampled)) * 1 / args.freq - prediction, analysis_figures, share = ftio_dft(args, b_sampled, t_sampled) + prediction, analysis_figures = ftio_dft(args, b_sampled, t_sampled) dominant_freq, _ = get_dominant_and_conf(prediction) # Adjust wavelet @@ -237,4 +235,4 @@ def ftio_wavelet_cont( f"\n[cyan]{args.transformation.upper()} + {args.outlier} finished:[/] {time.time() - tik:.3f} s" ) - return prediction, analysis_figures, share + return prediction, analysis_figures diff --git a/ftio/freq/_wavelet_disc_workflow.py b/ftio/freq/_wavelet_disc_workflow.py index f8874dbc..9ebcd07c 100644 --- a/ftio/freq/_wavelet_disc_workflow.py +++ b/ftio/freq/_wavelet_disc_workflow.py @@ -18,7 +18,6 @@ from ftio.freq._analysis_figures import AnalysisFigures from ftio.freq._dft_workflow import ftio_dft from ftio.freq._dft_x_dwt import analyze_correlation -from ftio.freq._share_signal_data import SharedSignalData from ftio.freq._wavelet import wavelet_disc from ftio.freq._wavelet_helpers import ( decomposition_level, @@ -55,7 +54,6 @@ def ftio_wavelet_disc( total_bytes (int): total transferred bytes (default is 0). """ # Default values for variables - share = SharedSignalData() prediction = { "source": {args.transformation}, "dominant_freq": [], @@ -90,7 +88,7 @@ def ftio_wavelet_disc( if args.level == 0: if method == "dft": args.transformation = "dft" - prediction, _, _ = ftio_dft(args, bandwidth, time_stamps, total_bytes, ranks) + prediction, _ = ftio_dft(args, bandwidth, time_stamps, total_bytes, ranks) if len(prediction.dominant_freq) > 0: args.level = int(1 / (5 * prediction.get_dominant_freq())) console.print(f"[green]Decomposition level adjusted to {args.level}[/]") @@ -151,7 +149,7 @@ def ftio_wavelet_disc( if "dft_on_approx_coeff" in analysis: args.transformation = "dft" # Option 1: Filter using wavelet and call DFT on lowest last coefficient - prediction, analysis_figures_dft, share = ftio_dft( + prediction, analysis_figures_dft = ftio_dft( args, coefficients_upsampled[0], t_sampled, total_bytes, ranks ) analysis_figures_wavelet += analysis_figures_dft @@ -187,7 +185,7 @@ def ftio_wavelet_disc( elif "dft_x_dwt" in analysis: args.transformation = "dft" # 1): compute DFT on lowest last coefficient - prediction, analysis_figures_dft, share = ftio_dft( + prediction, analysis_figures_dft = ftio_dft( args, coefficients_upsampled[0], t_sampled, total_bytes, ranks ) # 2) compare the results @@ -205,4 +203,4 @@ def ftio_wavelet_disc( ) exit() - return prediction, analysis_figures_wavelet, share + return prediction, analysis_figures_wavelet diff --git a/ftio/freq/autocorrelation.py b/ftio/freq/autocorrelation.py index f3ef66a2..3aae60f2 100644 --- a/ftio/freq/autocorrelation.py +++ b/ftio/freq/autocorrelation.py @@ -22,7 +22,6 @@ # from rich.padding import Padding import ftio.freq.discretize as dis from ftio.freq._analysis_figures import AnalysisFigures -from ftio.freq._share_signal_data import SharedSignalData from ftio.freq.helper import MyConsole from ftio.freq.prediction import Prediction from ftio.plot.plot_autocorrelation import plot_autocorr_results @@ -32,16 +31,15 @@ def find_autocorrelation( args: Namespace, data: dict, analysis_figures: AnalysisFigures, - share: SharedSignalData, + prediction_freq_analysis: Prediction, ) -> Prediction: """Finds the period using autocorreleation Args: args (argparse.Namespace): Command line arguments data (dict): Sampled data - share (SharedSignalData): shared signal data from freq analysis like DFT: analysis_figures (AnalysisFigures): Data and plot figures. - + prediction_freq_analysis (Prediction): Frequency analysis prediction contains bandwidth and time stamps. Returns: tuple: - prediction (Prediction): Contains prediction results including dominant frequency, confidence, amplitude, etc. @@ -65,12 +63,12 @@ def find_autocorrelation( ) # Take data if already aviable from previous step - if not share.is_empty(): - b_sampled = share.get("b_sampled") - freq = share.get("freq") - t_s = share.get("t_start") - t_e = share.get("t_end") - total_bytes = share.get("total_bytes") + if len(prediction_freq_analysis.b_sampled) > 0: + b_sampled = prediction_freq_analysis.b_sampled + freq = prediction_freq_analysis.freq + t_s = prediction_freq_analysis.t_start + t_e = prediction_freq_analysis.t_end + total_bytes = prediction_freq_analysis.total_bytes else: total_bytes = 0 bandwidth = data["bandwidth"] if "bandwidth" in data else np.array([]) @@ -220,7 +218,7 @@ def find_fd_autocorrelation( if candidates.size > 0: mean = np.average(candidates, weights=weights) std = np.sqrt(np.abs(np.average((candidates - mean) ** 2, weights=weights))) - tmp = [f"{1/i:.4f}" for i in candidates] # Formatting frequencies + tmp = [f"{1 / i:.4f}" for i in candidates] # Formatting frequencies periodicity = mean coef_var = np.abs(std / mean) conf = 1 - coef_var @@ -236,7 +234,7 @@ def find_fd_autocorrelation( f"Found periods are [purple]{candidates}[/]\n" f"Matching frequencies are [purple]{tmp}[/]\n" f"Average period is [purple]{periodicity:.2f} [/]sec\n" - f"Average frequency is [purple]{1/periodicity if periodicity > 0 else np.nan:.4f} [/]Hz\n" + f"Average frequency is [purple]{1 / periodicity if periodicity > 0 else np.nan:.4f} [/]Hz\n" f"Confidence is [purple]{conf * 100:.2f} [/]%\n" ) console = MyConsole() diff --git a/ftio/freq/prediction.py b/ftio/freq/prediction.py index 0612f827..0300da0f 100644 --- a/ftio/freq/prediction.py +++ b/ftio/freq/prediction.py @@ -59,6 +59,7 @@ def __init__( self._candidates = np.array([]) self._ranges = np.array([]) self._metric = "" + self._b_sampled = np.array([]) @property def source(self): @@ -249,6 +250,16 @@ def metric(self): def metric(self, value): self._metric = str(value) + @property + def b_sampled(self): + return self._b_sampled + + @b_sampled.setter + def b_sampled(self, value): + if not isinstance(value, (list, np.ndarray)): + raise TypeError("b_sampled must be a list or numpy array") + self._b_sampled = value + def get(self, key: str): """ Retrieve the value for a given attribute. @@ -456,7 +467,7 @@ def disp_dominant_freq_and_conf(self) -> str: if not np.isnan(f_d): text = ( f"[cyan underline]Prediction results:[/]\n[cyan]Frequency:[/] {f_d:.3e} Hz" - f"[cyan]->[/] {np.round(1/f_d, 4)} s\n" + f"[cyan]->[/] {np.round(1 / f_d, 4)} s\n" f"[cyan]Confidence:[/] {color_pred(c_d)}" f"{np.round(c_d * 100, 2)}[/] %\n" ) diff --git a/ftio/ml/models_and_training.py b/ftio/ml/models_and_training.py index 79832095..a948b708 100644 --- a/ftio/ml/models_and_training.py +++ b/ftio/ml/models_and_training.py @@ -21,10 +21,7 @@ "Tocrch module not found. Please install it using 'make full' or 'pip install ftio[ml-libs]'." ) -from ftio.cli.ftio_core import freq_analysis -from ftio.freq.autocorrelation import find_autocorrelation -from ftio.parse.extract import get_time_behavior_and_args -from ftio.prediction.unify_predictions import merge_predictions +from ftio.cli import ftio_core """ Example Description: @@ -246,7 +243,6 @@ def train_hybrid_model( cmd_input = ["ftio", x] if additional_ftio_args is not None: cmd_input += additional_ftio_args - print(cmd_input) frequency, set = extract(cmd_input=cmd_input) # extraction from .csv files of the sdumont traces ; not a general method to extract from any .csv file @@ -504,35 +500,19 @@ def extract(cmd_input, msgs=None) -> list: Returns: list[ n , list[bandwidth], ...] """ - # taken from get_time_behavior_and_args from extract.py - data, args = get_time_behavior_and_args(cmd_input, msgs) - dfs_out = [[], [], [], []] - args.machine_learning = True - prediction = None - # taken from ftio_core.py's main - for sim in data: - # get prediction - # Perform frequency analysis (dft/wavelet) - prediction_dft, dfs_out, share = freq_analysis(args, sim) - - # Perform autocorrelation if args.autocorrelation is true + Merge the results into a single prediction - prediction_auto = find_autocorrelation(args, sim, dfs_out, share) - # Merge results - prediction = merge_predictions(args, prediction_dft, prediction_auto, dfs_out) - - # extraction of the relevant data from the dataframes - # b_sampled = dfs_out.b_sampled.tolist() - b_sampled = share.get("b_sampled") - result = [] - - for x in b_sampled: - result.append([x]) - - # calculates the amount of partial patterns using the predicted dominant frequency of FTIO n = 3 - if (prediction.dominant_freq.size != 0) and (prediction.dominant_freq[0] != 0): - n = int(prediction.t_end / (1 / prediction.dominant_freq[0])) + cmd_input.append("--machine_learning") + prediction_list, args = ftio_core.main(cmd_input) + # take only the latest for now + if prediction_list: + prediction = prediction_list[-1] + b_sampled = prediction._b_sampled + for x in b_sampled: + result.append([x]) + # calculates the amount of partial patterns using the predicted dominant frequency of FTIO + if (prediction.dominant_freq.size != 0) and (prediction.dominant_freq[0] != 0): + n = int(prediction.t_end / (1 / prediction.dominant_freq[0])) return [n, result] diff --git a/pyproject.toml b/pyproject.toml index 8ec2ce94..eec2a54b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -141,6 +141,11 @@ log_cli = true log_cli_level = "INFO" log_cli_format = "%(asctime)s [%(levelname)8s] %(message)s (%(filename)s:%(lineno)s)" log_cli_date_format = "%Y-%m-%d %H:%M:%S" +filterwarnings = [ + "ignore::statsmodels.tools.sm_exceptions.InterpolationWarning", + "ignore::statsmodels.tools.sm_exceptions.ConvergenceWarning", + "ignore::UserWarning", +] [tool.coverage.run] source = ["ftio"] diff --git a/test/test_ml_models.py b/test/test_ml_models.py index 46888534..319ec000 100644 --- a/test/test_ml_models.py +++ b/test/test_ml_models.py @@ -27,7 +27,7 @@ def test_hybrid_model(): model = train_hybrid_model( file, epochs=10, lr=0.003, additional_ftio_args=["-e", "no"] ) - _ = predict_next_sequence(model, file) + _ = predict_next_sequence(model, file, additional_ftio_args=["-e", "no"]) assert True @@ -36,7 +36,7 @@ def test_hybrid_model_resume_training(): Tests the saving of the hybrid model's checkpoint and resuming of training of the model. """ file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") - model = train_hybrid_model( + _ = train_hybrid_model( file, epochs=10, lr=0.003, save=True, additional_ftio_args=["-e", "no"] ) model = train_hybrid_model( @@ -46,7 +46,7 @@ def test_hybrid_model_resume_training(): load_state_dict_and_optimizer_state="model_and_optimizer.pth", additional_ftio_args=["-e", "no"], ) - _ = predict_next_sequence(model, file) + _ = predict_next_sequence(model, file, additional_ftio_args=["-e", "no"]) assert True @@ -57,8 +57,6 @@ def test_extract(): file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") args = ["ftio", file, "-e", "no"] n, data = extract(args) - print(data) - print(n) assert True