diff --git a/Makefile b/Makefile index 717c592..5539448 100644 --- a/Makefile +++ b/Makefile @@ -32,7 +32,7 @@ all: install install: clean venv ftio_venv msg # Installs with external dependencies -full: clean venv ftio_venv_full msg +full: venv ftio_venv_full msg # Installs debug version external dependencies debug: venv ftio_debug_venv msg @@ -54,7 +54,8 @@ ftio: $(PYTHON) -m pip install . ftio_full: - $(PYTHON) -m pip install '.[external-libs,development-libs,plot-libs]' + $(PYTHON) -m pip install -e '.[external-libs,development-libs,plot-libs,ml-libs]' --no-cache-dir || \ + (echo "Installing external libs failed, trying fallback..." && $(PYTHON) -m pip install -e . --no-cache-dir) venv: $(PYTHON) -m venv .venv @echo -e "Environment created. Using python from .venv/bin/python3" @@ -114,6 +115,13 @@ test_all: test: cd test && python3 -m pytest && make clean +test_parallel: + @python3 -m pip show pytest-xdist > /dev/null 2>&1 || python3 -m pip install pytest-xdist + cd test && python3 -m pytest -n 4 && make clean + +test_failed: + cd test && python3 -m pytest --ff && make clean + check_style: check_tools black . ruff check --fix diff --git a/docs/ml_models.md b/docs/ml_models.md new file mode 100644 index 0000000..e3922e9 --- /dev/null +++ b/docs/ml_models.md @@ -0,0 +1,60 @@ +# Machine-Learning Models Documentation + +## Prerequisites + +Install the packages needed (`pip install '.[external-libs,development-libs,ml-libs]'`) + +## General Usage + +[Hybrid Model] +The following example shows the high-level entry to training and forecasting using the function train_hybrid_model() of +the hybrid-model. + +```python +file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") +model = train_hybrid_model(file, epochs=10, lr=0.003) +prediction = predict_next_sequence(model, file) +``` + +The function train_hybrid_model() also has parameters with standard values for the underlying structure of the model +which can be changed. +In this example only the embedded dimension is changed, but there are also parameters for the attention heads, the +feed-forward dimension etc. +Common values such as 2^n are usually the most effective variations to explore. + +```python +file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") +model = train_hybrid_model(file, epochs=10, lr=0.003, emb_dim=256) +prediction = predict_next_sequence(model, file) +``` + +The training of the hybrid-model can be resumed by loading a .pth file created by the saving process. +It contains the parameters of the model and the state of the used optimizer. + +```python +file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") +model = train_hybrid_model(file, epochs=10, lr=0.003, save=True) +model = train_hybrid_model( + file, + epochs=10, + lr=0.003, + load_state_dict_and_optimizer_state="model_and_optimizer.pth", +) +prediction = predict_next_sequence(model, file) +``` + +[(S)ARIMA] + +The following example shows the high-level entry to training and forecasting using the train_arima() function of the +ARIMA/SARIMA models. +By changing the model_architecture parameter, SARIMA or ARIMA can be selected. The max_depth is recommended to be +relatively small, +since it's defining the maximum depth of differentations of the underlying data to reach stationarity. +A resumption of training is inherently not supported by the underlying model structure. Therefore, if new data is +available, then +training from the beginning is the only option. + +```python +file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") +prediction = train_arima(file, max_depth=3, model_architecture="ARIMA") +``` diff --git a/ftio/cli/ftio_core.py b/ftio/cli/ftio_core.py index 7b70ccd..f2f2c5c 100644 --- a/ftio/cli/ftio_core.py +++ b/ftio/cli/ftio_core.py @@ -26,7 +26,6 @@ from ftio.freq._analysis_figures import AnalysisFigures from ftio.freq._dft_workflow import ftio_dft -from ftio.freq._share_signal_data import SharedSignalData from ftio.freq._wavelet_cont_workflow import ftio_wavelet_cont from ftio.freq._wavelet_disc_workflow import ftio_wavelet_disc from ftio.freq.autocorrelation import find_autocorrelation @@ -119,9 +118,11 @@ def core(sim: dict, args: Namespace) -> tuple[Prediction, AnalysisFigures]: return Prediction(), AnalysisFigures() # Perform frequency analysis (dft/wavelet) - prediction_freq_analysis, analysis_figures, share = freq_analysis(args, sim) + prediction_freq_analysis, analysis_figures = freq_analysis(args, sim) # Perform autocorrelation if args.autocorrelation is true + Merge the results into a single prediction - prediction_auto = find_autocorrelation(args, sim, analysis_figures, share) + prediction_auto = find_autocorrelation( + args, sim, analysis_figures, prediction_freq_analysis + ) # Merge results prediction = merge_predictions( args, prediction_freq_analysis, prediction_auto, analysis_figures @@ -130,9 +131,7 @@ def core(sim: dict, args: Namespace) -> tuple[Prediction, AnalysisFigures]: return prediction, analysis_figures -def freq_analysis( - args: Namespace, data: dict -) -> tuple[Prediction, AnalysisFigures, SharedSignalData]: +def freq_analysis(args: Namespace, data: dict) -> tuple[Prediction, AnalysisFigures]: """ Performs frequency analysis (DFT, continuous wavelet, or discrete wavelet) and prepares data for plotting. @@ -153,20 +152,13 @@ def freq_analysis( Returns: tuple: A tuple containing: - - Prediction: Contains the prediction results, including: + - Prediction: Contains the prediction results, including - "dominant_freq" (list): The identified dominant frequencies. - "conf" (np.ndarray): Confidence values corresponding to the dominant frequencies. - "t_start" (int): Start time of the analysis. - "t_end" (int): End time of the analysis. - "total_bytes" (int): Total bytes involved in the analysis. - AnalysisFigures - - SharedSignalData: Contains sampled data used for sharing (e.g., autocorrelation) containing - the following fields: - - "b_sampled" (np.ndarray): The sampled bandwidth data. - - "freq" (np.ndarray): Frequencies corresponding to the sampled data. - - "t_start" (int): Start time of the sampled data. - - "t_end" (int): End time of the sampled data. - - "total_bytes" (int): Total bytes from the sampled data. """ #! Init @@ -182,19 +174,17 @@ def freq_analysis( #! Perform transformation if "dft" in args.transformation: - prediction, analysis_figures, share = ftio_dft( + prediction, analysis_figures = ftio_dft( args, bandwidth, time_b, total_bytes, ranks, text ) elif "wave_disc" in args.transformation: - prediction, analysis_figures, share = ftio_wavelet_disc( + prediction, analysis_figures = ftio_wavelet_disc( args, bandwidth, time_b, ranks, total_bytes ) elif "wave_cont" in args.transformation: - prediction, analysis_figures, share = ftio_wavelet_cont( - args, bandwidth, time_b, ranks - ) + prediction, analysis_figures = ftio_wavelet_cont(args, bandwidth, time_b, ranks) elif any(t in args.transformation for t in ("astft", "efd", "vmd")): # TODO: add a way to pass the results to FTIO @@ -211,7 +201,7 @@ def freq_analysis( from ftio.freq._astft_workflow import ftio_astft - prediction, analysis_figures, share = ftio_astft( + prediction, analysis_figures = ftio_astft( args, bandwidth, time_b, total_bytes, ranks, text ) sys.exit() @@ -221,7 +211,7 @@ def freq_analysis( from ftio.freq._amd_workflow import ftio_amd - prediction, analysis_figures, share = ftio_amd( + prediction, analysis_figures = ftio_amd( args, bandwidth, time_b, total_bytes, ranks, text ) sys.exit() @@ -229,7 +219,7 @@ def freq_analysis( else: raise Exception("Unsupported decomposition specified") - return prediction, analysis_figures, share + return prediction, analysis_figures def run(): diff --git a/ftio/freq/_dft_workflow.py b/ftio/freq/_dft_workflow.py index 9a350e4..f10377b 100644 --- a/ftio/freq/_dft_workflow.py +++ b/ftio/freq/_dft_workflow.py @@ -21,7 +21,6 @@ from ftio.freq._dft import dft from ftio.freq._filter import filter_signal from ftio.freq._fourier_fit import fourier_fit -from ftio.freq._share_signal_data import SharedSignalData from ftio.freq.discretize import sample_data from ftio.freq.helper import MyConsole from ftio.freq.prediction import Prediction @@ -35,7 +34,7 @@ def ftio_dft( total_bytes: int = 0, ranks: int = 1, text: str = "", -) -> tuple[Prediction, AnalysisFigures, SharedSignalData]: +) -> tuple[Prediction, AnalysisFigures]: """ Performs a Discrete Fourier Transform (DFT) on the sampled bandwidth data, finds the dominant frequency, followed by outlier detection to spot the dominant frequency. This function also prepares the necessary outputs for plotting or reporting. @@ -52,10 +51,8 @@ def ftio_dft( tuple: - prediction (Prediction): Contains prediction results including dominant frequency, confidence, amplitude, etc. - analysis_figures (AnalysisFigures): Data and plot figures. - - share (SharedSignalData): Contains shared information, including sampled bandwidth and total bytes. """ #! Default values for variables - share = SharedSignalData() prediction = Prediction(args.transformation) analysis_figures = AnalysisFigures(args) console = MyConsole(verbose=args.verbose) @@ -156,8 +153,8 @@ def ftio_dft( plot_dft(args, prediction, analysis_figures) console.print(" --- Done --- \n") - if args.autocorrelation: - share.set_data_from_predicition(b_sampled, prediction) + if args.autocorrelation or args.machine_learning: + prediction.b_sampled = b_sampled precision_text = "" # precision_text = precision_dft(amp, phi, dominant_index, b_sampled, t_sampled, frequencies, args.engine) @@ -175,4 +172,4 @@ def ftio_dft( console.print( f"\n[cyan]{args.transformation.upper()} + {args.outlier} finished:[/] {time.time() - tik:.3f} s" ) - return prediction, analysis_figures, share + return prediction, analysis_figures diff --git a/ftio/freq/_share_signal_data.py b/ftio/freq/_share_signal_data.py deleted file mode 100644 index 132d6e3..0000000 --- a/ftio/freq/_share_signal_data.py +++ /dev/null @@ -1,115 +0,0 @@ -""" -This module provides the SharedSignalData class to store and manage -common signal analysis data shared between modules such as -autocorrelation and discrete Fourier transform (DFT). - -Author: Ahmad Tarraf -Copyright (c) 2026 TU Darmstadt, Germany -Version: v0.0.7 -Date: May 2025 -Licensed under the BSD 3-Clause License. -For more information, see the LICENSE file in the project root: -https://github.com/tuda-parallel/FTIO/blob/main/LICENSE""" - -from ftio.freq.prediction import Prediction - - -class SharedSignalData: - """ - A container class for sharing signal analysis data (e.g., between autocorrelation and DFT modules). - - Attributes: - data (dict): Internal dictionary storing shared signal parameters. - """ - - def __init__(self): - self.data = {} - - def set_data( - self, - b_sampled=None, - freq=None, - t_start=None, - t_end=None, - total_bytes=None, - ): - """ - Set shared signal analysis data. - - Args: - b_sampled (array-like, optional): Sampled bandwidth or signal data. - freq (float, optional): Frequency value used in analysis. - t_start (float, optional): Start time of the signal. - t_end (float, optional): End time of the signal. - total_bytes (int, optional): Total number of bytes processed or analyzed. - """ - self.data = {} - - if b_sampled is not None: - self.data["b_sampled"] = b_sampled - if freq is not None: - self.data["freq"] = freq - if t_start is not None: - self.data["t_start"] = t_start - if t_end is not None: - self.data["t_end"] = t_end - if total_bytes is not None: - self.data["total_bytes"] = total_bytes - - def set_data_from_predicition(self, b_sampled, prediction: Prediction): - self.data["b_sampled"] = b_sampled - self.data["freq"] = prediction.freq - self.data["t_start"] = prediction.t_start - self.data["t_end"] = prediction.t_end - self.data["total_bytes"] = prediction.total_bytes - - def get_data(self): - """ - Get the shared data dictionary. - - Returns: - dict: The internal data dictionary containing signal parameters. - """ - return self.data - - def get(self, key): - """ - Get a specific field from the shared data. - - Args: - key (str): The key to look for in the data dictionary. - - Returns: - The value corresponding to the key if present, else None. - """ - return self.data.get(key, None) - - def is_empty(self): - """ - Check whether the shared data dictionary is empty. - - Returns: - bool: True if no data has been set, False otherwise. - """ - return len(self.data) == 0 - - def has_key(self, key): - """ - Check if a specific key exists in the shared data. - - Args: - key (str): The key to look for. - - Returns: - bool: True if the key exists, False otherwise. - """ - return key in self.data - - def __str__(self): - """ - Return a string summary of the current shared data. - - Returns: - str: A string showing which keys are currently set. - """ - return f"SharedSignalData with keys: {list(self.data.keys()) if self.data else 'None'}" diff --git a/ftio/freq/_wavelet_cont_workflow.py b/ftio/freq/_wavelet_cont_workflow.py index 1d2fd63..d84c783 100644 --- a/ftio/freq/_wavelet_cont_workflow.py +++ b/ftio/freq/_wavelet_cont_workflow.py @@ -17,7 +17,6 @@ from ftio.freq._analysis_figures import AnalysisFigures from ftio.freq._dft_workflow import ftio_dft -from ftio.freq._share_signal_data import SharedSignalData from ftio.freq._wavelet import wavelet_cont from ftio.freq._wavelet_helpers import get_scales from ftio.freq.discretize import sample_data @@ -49,7 +48,6 @@ def ftio_wavelet_cont( ranks (int): The rank value (default is 0). """ #! Default values for variables - share = SharedSignalData() prediction = Prediction(args.transformation) console = MyConsole(verbose=args.verbose) @@ -90,7 +88,7 @@ def ftio_wavelet_cont( use_dominant_only = False scales = [] t_sampled = time_stamps[0] + np.arange(0, len(b_sampled)) * 1 / args.freq - prediction, analysis_figures, share = ftio_dft(args, b_sampled, t_sampled) + prediction, analysis_figures = ftio_dft(args, b_sampled, t_sampled) dominant_freq, _ = get_dominant_and_conf(prediction) # Adjust wavelet @@ -237,4 +235,4 @@ def ftio_wavelet_cont( f"\n[cyan]{args.transformation.upper()} + {args.outlier} finished:[/] {time.time() - tik:.3f} s" ) - return prediction, analysis_figures, share + return prediction, analysis_figures diff --git a/ftio/freq/_wavelet_disc_workflow.py b/ftio/freq/_wavelet_disc_workflow.py index f8874db..9ebcd07 100644 --- a/ftio/freq/_wavelet_disc_workflow.py +++ b/ftio/freq/_wavelet_disc_workflow.py @@ -18,7 +18,6 @@ from ftio.freq._analysis_figures import AnalysisFigures from ftio.freq._dft_workflow import ftio_dft from ftio.freq._dft_x_dwt import analyze_correlation -from ftio.freq._share_signal_data import SharedSignalData from ftio.freq._wavelet import wavelet_disc from ftio.freq._wavelet_helpers import ( decomposition_level, @@ -55,7 +54,6 @@ def ftio_wavelet_disc( total_bytes (int): total transferred bytes (default is 0). """ # Default values for variables - share = SharedSignalData() prediction = { "source": {args.transformation}, "dominant_freq": [], @@ -90,7 +88,7 @@ def ftio_wavelet_disc( if args.level == 0: if method == "dft": args.transformation = "dft" - prediction, _, _ = ftio_dft(args, bandwidth, time_stamps, total_bytes, ranks) + prediction, _ = ftio_dft(args, bandwidth, time_stamps, total_bytes, ranks) if len(prediction.dominant_freq) > 0: args.level = int(1 / (5 * prediction.get_dominant_freq())) console.print(f"[green]Decomposition level adjusted to {args.level}[/]") @@ -151,7 +149,7 @@ def ftio_wavelet_disc( if "dft_on_approx_coeff" in analysis: args.transformation = "dft" # Option 1: Filter using wavelet and call DFT on lowest last coefficient - prediction, analysis_figures_dft, share = ftio_dft( + prediction, analysis_figures_dft = ftio_dft( args, coefficients_upsampled[0], t_sampled, total_bytes, ranks ) analysis_figures_wavelet += analysis_figures_dft @@ -187,7 +185,7 @@ def ftio_wavelet_disc( elif "dft_x_dwt" in analysis: args.transformation = "dft" # 1): compute DFT on lowest last coefficient - prediction, analysis_figures_dft, share = ftio_dft( + prediction, analysis_figures_dft = ftio_dft( args, coefficients_upsampled[0], t_sampled, total_bytes, ranks ) # 2) compare the results @@ -205,4 +203,4 @@ def ftio_wavelet_disc( ) exit() - return prediction, analysis_figures_wavelet, share + return prediction, analysis_figures_wavelet diff --git a/ftio/freq/autocorrelation.py b/ftio/freq/autocorrelation.py index f3ef66a..3aae60f 100644 --- a/ftio/freq/autocorrelation.py +++ b/ftio/freq/autocorrelation.py @@ -22,7 +22,6 @@ # from rich.padding import Padding import ftio.freq.discretize as dis from ftio.freq._analysis_figures import AnalysisFigures -from ftio.freq._share_signal_data import SharedSignalData from ftio.freq.helper import MyConsole from ftio.freq.prediction import Prediction from ftio.plot.plot_autocorrelation import plot_autocorr_results @@ -32,16 +31,15 @@ def find_autocorrelation( args: Namespace, data: dict, analysis_figures: AnalysisFigures, - share: SharedSignalData, + prediction_freq_analysis: Prediction, ) -> Prediction: """Finds the period using autocorreleation Args: args (argparse.Namespace): Command line arguments data (dict): Sampled data - share (SharedSignalData): shared signal data from freq analysis like DFT: analysis_figures (AnalysisFigures): Data and plot figures. - + prediction_freq_analysis (Prediction): Frequency analysis prediction contains bandwidth and time stamps. Returns: tuple: - prediction (Prediction): Contains prediction results including dominant frequency, confidence, amplitude, etc. @@ -65,12 +63,12 @@ def find_autocorrelation( ) # Take data if already aviable from previous step - if not share.is_empty(): - b_sampled = share.get("b_sampled") - freq = share.get("freq") - t_s = share.get("t_start") - t_e = share.get("t_end") - total_bytes = share.get("total_bytes") + if len(prediction_freq_analysis.b_sampled) > 0: + b_sampled = prediction_freq_analysis.b_sampled + freq = prediction_freq_analysis.freq + t_s = prediction_freq_analysis.t_start + t_e = prediction_freq_analysis.t_end + total_bytes = prediction_freq_analysis.total_bytes else: total_bytes = 0 bandwidth = data["bandwidth"] if "bandwidth" in data else np.array([]) @@ -220,7 +218,7 @@ def find_fd_autocorrelation( if candidates.size > 0: mean = np.average(candidates, weights=weights) std = np.sqrt(np.abs(np.average((candidates - mean) ** 2, weights=weights))) - tmp = [f"{1/i:.4f}" for i in candidates] # Formatting frequencies + tmp = [f"{1 / i:.4f}" for i in candidates] # Formatting frequencies periodicity = mean coef_var = np.abs(std / mean) conf = 1 - coef_var @@ -236,7 +234,7 @@ def find_fd_autocorrelation( f"Found periods are [purple]{candidates}[/]\n" f"Matching frequencies are [purple]{tmp}[/]\n" f"Average period is [purple]{periodicity:.2f} [/]sec\n" - f"Average frequency is [purple]{1/periodicity if periodicity > 0 else np.nan:.4f} [/]Hz\n" + f"Average frequency is [purple]{1 / periodicity if periodicity > 0 else np.nan:.4f} [/]Hz\n" f"Confidence is [purple]{conf * 100:.2f} [/]%\n" ) console = MyConsole() diff --git a/ftio/freq/prediction.py b/ftio/freq/prediction.py index 0612f82..0300da0 100644 --- a/ftio/freq/prediction.py +++ b/ftio/freq/prediction.py @@ -59,6 +59,7 @@ def __init__( self._candidates = np.array([]) self._ranges = np.array([]) self._metric = "" + self._b_sampled = np.array([]) @property def source(self): @@ -249,6 +250,16 @@ def metric(self): def metric(self, value): self._metric = str(value) + @property + def b_sampled(self): + return self._b_sampled + + @b_sampled.setter + def b_sampled(self, value): + if not isinstance(value, (list, np.ndarray)): + raise TypeError("b_sampled must be a list or numpy array") + self._b_sampled = value + def get(self, key: str): """ Retrieve the value for a given attribute. @@ -456,7 +467,7 @@ def disp_dominant_freq_and_conf(self) -> str: if not np.isnan(f_d): text = ( f"[cyan underline]Prediction results:[/]\n[cyan]Frequency:[/] {f_d:.3e} Hz" - f"[cyan]->[/] {np.round(1/f_d, 4)} s\n" + f"[cyan]->[/] {np.round(1 / f_d, 4)} s\n" f"[cyan]Confidence:[/] {color_pred(c_d)}" f"{np.round(c_d * 100, 2)}[/] %\n" ) diff --git a/ftio/ml/models_and_training.py b/ftio/ml/models_and_training.py new file mode 100644 index 0000000..a948b70 --- /dev/null +++ b/ftio/ml/models_and_training.py @@ -0,0 +1,642 @@ +import importlib.util +import os +from math import inf + +import numpy as np +import pandas +import pandas as pd + +TORCH_AVAILABLE = importlib.util.find_spec("torch") is not None +if TORCH_AVAILABLE: + import torch + import torch.nn as nn + import torch.nn.functional as F + import torch.optim as optim + from statsmodels.tsa.arima.model import ARIMA + from statsmodels.tsa.statespace.sarimax import SARIMAX + from statsmodels.tsa.stattools import adfuller, kpss + from torch.utils.data import Dataset +else: + raise RuntimeError( + "Tocrch module not found. Please install it using 'make full' or 'pip install ftio[ml-libs]'." + ) + +from ftio.cli import ftio_core + +""" +Example Description: +This module contains the entry points for extracting data for training and prediction purpose, +the hybrid-model, the ARIMA and SARIMA models and their training and forecasting functions. +As utility it contains a custom dataset intended for the hybrid-model but was initially planned for general purpose use. + +Author: Robert Alles +Copyright (c) 2026 TU Darmstadt, Germany +Date: January 2026 + +Licensed under the BSD 3-Clause License. +For more information, see the LICENSE file in the project root: +https://github.com/tuda-parallel/FTIO/blob/main/LICENSE +""" + + +class PositionalEncoding(nn.Module): + """Position encoding used in the hybrid model to increase the likelihood of understanding sequential data.""" + + def __init__(self, emb_dim, max_len=5555): + """Constructor of the positional encoding. + + Args: + emb_dim: Size of the embedding dimension + max_len: Maximum length of sequence length that can be handled. + """ + super().__init__() + # Learned Positional Encoding + self.positional_encoding = nn.Parameter(torch.zeros(1, max_len, emb_dim)) + + def forward(self, x): + """Forward method of the positional encoding. + + Args: + x: The embedded sequence. + + Returns: The embedded sequence added to the vector of the sliced positional encoding. + + """ + # add embedded sequence + sliced positional encoding (added to device to prevent RuntimeError) + return x + self.positional_encoding[:, : x.size(1)].to(x.device) + + +class BandwidthDataset(Dataset): + """This Dataset accepts lists containing multiple partial sequences of list[ranks, bandwidth, duration, start_time] + that combine into one complete sequence. + The conversion of the list into a new representation in torch.tensors allows for direct training or prediction. + + """ + + def __init__(self, data_list, num_parts=3): + """Constructor of the Dataset. Convertion of data into torch.tensors. + + Args: + data_list: Lists containing the bandwidth values of the sequence. + num_parts: Int deciding the number of slices to be made from the sequence. + """ + self.data = [] + self.num_parts = num_parts + + if not self.num_parts: + self.num_parts = 3 + + for seq in data_list: + seq = torch.tensor(seq, dtype=torch.float32) + bandwidth = seq[:, 0].unsqueeze(-1) + + # normalize per-sequence + min_val, max_val = bandwidth.min(), bandwidth.max() + bandwidth = (bandwidth - min_val) / (max_val - min_val + 1e-8) + + L = len(bandwidth) + part_len = L // self.num_parts + + # split into num_parts equal parts + parts = [ + bandwidth[i * part_len : (i + 1) * part_len] + for i in range(self.num_parts - 1) + ] + parts.append(bandwidth[(self.num_parts - 1) * part_len :]) + # training input + past = torch.cat(parts[:-1], dim=0) + + # target = last part + future = parts[-1] + self.data.append((past, future)) + + def __len__(self): + """Method providing the amount of datapoints inside the dataset. + + Returns: Amount of datapoints inside the dataset. + + """ + return len(self.data) + + def __getitem__(self, idx): + """Method providing datapoint from specified index. + + Args: + idx: Index of the specific datapoint. + + Returns: The datapoint from the specified index. + + """ + return self.data[idx] + + +class HybridModel(nn.Module): + """A hybrid model leveraging transformer and long short-term memory.""" + + def __init__(self, emb_dim=128, n_heads=4, ff_dim=128, num_layers=6, lstm_hidden=128): + """Constructor of the hybrid model. Currently only supports the most important parameters. + + Args: + emb_dim: Size of the embedding dimension + n_heads: Amount of attention heads of transformer approach + ff_dim: Size of the Feedforward network dimension + num_layers: Amount of Transformer encoder layers + lstm_hidden: Amount of hidden units in the LSTM part after transformer + """ + super().__init__() + self.bandwidth_embedding = nn.Linear(1, emb_dim) + self.positional_encoding = PositionalEncoding(emb_dim) + self.transformer = nn.TransformerEncoder( + nn.TransformerEncoderLayer( + d_model=emb_dim, nhead=n_heads, dim_feedforward=ff_dim, batch_first=True + ), + num_layers=num_layers, + ) + self.lstm = nn.LSTM( + input_size=emb_dim, + hidden_size=lstm_hidden, + batch_first=True, + bidirectional=True, + ) + self.fc_bandwidth = nn.Linear(lstm_hidden * 2, 1) + + def forward(self, past_seq, prediction_length=None): + x = self.bandwidth_embedding(past_seq) + x = self.positional_encoding(x) + x = self.transformer(x) + x, _ = self.lstm(x) + + # shrink output to target length if needed + if prediction_length is not None: + x = x[:, -prediction_length:, :] # take last part only + + return self.fc_bandwidth(x) + + +def train_hybrid_model( + file_or_directory, + bandwidth_cutoff=-1, + load_state_dict_and_optimizer_state=None, + emb_dim=128, + n_heads=4, + ff_dim=128, + num_layers=3, + epochs=10, + lr=1e-3, + save=False, + additional_ftio_args=None, +) -> HybridModel: + """Trains the hybrid model contained in this file and saves the trained model as a .pth file containing + the model's state dict and optimizer state. + + + Args: + file_or_directory: Path to either a singular file or a directory of files to train the model with. + bandwidth_cutoff: Any partial bandwidth sequence below this thresh hold will be cut off from the training data. + load_state_dict_and_optimizer_state: .pth file checkpoint to continue training the model from + emb_dim: embedded dimensions of the model + n_heads: heads of the model + ff_dim: feedforward dimension of the model + num_layers: number of layers of the model + epochs: number of epochs used to train the model + lr: intensity of the learn rate + save: boolean representing if the model is supposed to be saved. Doesn't affect the return value. + additional_ftio_args: additional supported ftio arguments that aren't the initial ftio call and files + + Returns: + HybridModel trained with provided data + + """ + + # checks if file_or_directory is either just a singular file OR a path to a directory with files + # and then saves the individual files paths + files = [] + if os.path.isfile(file_or_directory): + files.append(file_or_directory) + elif os.path.isdir(file_or_directory): + files = [ + os.path.join(file_or_directory, f) + for f in os.listdir(file_or_directory) + if os.path.isfile(os.path.join(file_or_directory, f)) + ] + + if not files: + raise ValueError("No file(s) found.") + + # the initialised model + model = HybridModel( + emb_dim=emb_dim, n_heads=n_heads, ff_dim=ff_dim, num_layers=num_layers + ) + load_optimizer = None + + # in case the function caller wants to train a previously trained model + if load_state_dict_and_optimizer_state is not None: + loaded_data = torch.load(load_state_dict_and_optimizer_state) + model.load_state_dict(loaded_data["model_state_dict"]) + load_optimizer = loaded_data["optimizer_state_dict"] + + for x in files: + set = [] + frequency = [] + # extraction of data through frequency analysis for darshan & json + if x.endswith(".darshan") | x.endswith(".jsonl"): + cmd_input = ["ftio", x] + if additional_ftio_args is not None: + cmd_input += additional_ftio_args + frequency, set = extract(cmd_input=cmd_input) + + # extraction from .csv files of the sdumont traces ; not a general method to extract from any .csv file + if x.endswith(".csv"): + csv_file = pandas.read_csv(x) + n = 0 + for y in csv_file["both"]: + set.append([y]) + n = n + 1 + + # max size of positional encoding of the model, limitation due to the system used for computation + if n == 5555: + break + + dataset = BandwidthDataset([set], num_parts=frequency) + load_optimizer = train_model( + model, dataset, epochs=epochs, lr=lr, load_optimizer=load_optimizer + ) + predict_next_sequence(model, "", only_data=(set, frequency)) + + # save the model + # currently saving it in the currently active archive + if save: + torch.save( + { + "model_state_dict": model.state_dict(), + "optimizer_state_dict": load_optimizer, + }, + "model_and_optimizer.pth", + ) + + return model + + +def train_model(model, dataset, epochs=3, lr=1e-3, load_optimizer=None, optimizer=None): + """Training method for the hybrid model. + + Args: + model: The hybrid model to be trained. + dataset: The bandwidth dataset containing all datapoints to train the model. + epochs: The amount of epochs used to train the model. + lr: The learn rate of the model. + load_optimizer: Optimizer dic in case the model was previously trained and not lose the optimizer state. + optimizer: The optimizer to be used in the training process. + + Returns: + The state dict of the optimizer for saving/reusing purposes. + + """ + # check if customizer needs to be loaded or initialized + if optimizer is None: + optimizer = optim.AdamW(model.parameters(), lr=lr) + if load_optimizer is not None: + optimizer.load_state_dict(load_optimizer) + + # beginning of the training loop + scheduler = optim.lr_scheduler.ReduceLROnPlateau( + optimizer, mode="min", factor=0.5, patience=3 + ) + model.train() + for epoch in range(epochs): + total_loss = 0 + for past, future in dataset: + past = past.unsqueeze(0) + future = future.unsqueeze(0) + # zero gradient + optimizer.zero_grad() + pred_future = model(past, prediction_length=future.size(1)) + # loss & backwards propagation + loss = (0.3 * F.huber_loss(pred_future, future)) + ( + 0.7 * spectral_loss(pred_future, future) + ) + loss.backward() + torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) + optimizer.step() + total_loss += loss.item() + # just in case to not divide by zero, it shouldn't happen but making sure it doesn't + if len(dataset) == 0: + a = 1 + else: + a = len(dataset) + scheduler.step(total_loss / a) + # provides the total loss for some insight during the epochs IF a human is overseeing the training + if epoch % 2 == 0: + print(f"Last Loss: {total_loss:.4f}") + return optimizer.state_dict() + + +def predict_next_sequence( + model_or_pthfile, + file_or_directory, + bandwidth_cutoff=0, + additional_ftio_args=None, + only_data=None, +): + """Entry point of the prediction process. + + Args: + model_or_pthfile: Accepts either a HybridModel or a .pth file (to initialize a model) to use for the prediction. + file_or_directory: Path to either a singular file or a directory of files to train the model with. + bandwidth_cutoff: Any bandwidth below this thresh hold will be cut off from the data used for prediction. + additional_ftio_args: additional supported ftio arguments that aren't the initial ftio call and files + only_data: in the case of directly using extracted data instead of files or a path to files + Returns: + A list of the following form [[ranks, bandwidths], ...] where inner lists represent singular predictions. + """ + + # checks if file_or_path is either just a singular file OR a path to a directory with files + # and then saves the individual files paths + files = [] + if os.path.isfile(file_or_directory): + files.append(file_or_directory) + elif os.path.isdir(file_or_directory): + files = [ + os.path.join(file_or_directory, f) + for f in os.listdir(file_or_directory) + if os.path.isfile(os.path.join(file_or_directory, f)) + ] + + # in case no files nor data was supplied + if (not files) and (only_data is None): + print("No files found. Aborting prediction.") + return [] + + # either use the given model or instantiate with .pth file or test with untrained model + model = None + if isinstance(model_or_pthfile, HybridModel): + model = model_or_pthfile + elif model_or_pthfile.endswith(".pth"): + model = HybridModel(emb_dim=128, n_heads=4, ff_dim=128, num_layers=3) + model.load_state_dict((torch.load(model_or_pthfile))["model_state_dict"]) + + # prediction with an untrained model... questionable but maybe someone desires this as a test? + if model is None: + model = HybridModel(emb_dim=128, n_heads=4, ff_dim=128, num_layers=3) + + predictions = [] + + if only_data is not None: + set, frequency = only_data + dataset = BandwidthDataset([set], num_parts=frequency) + + # min-max normalization + min1, max1 = min(set)[0], max(set)[0] + for u in range(len(set)): + set[u][0] = (set[u][0] - min1) / (max1 - min1 + 1e-8) + + # prediction process + prediction_current = __predict_next_trace(model, dataset) + predictions.append(prediction_current) + + actual = [] + for c in dataset.data[0][1]: + actual.append([c[0]]) + return predictions + + for x in files: + set = [] + frequency = [] + # extraction of data through frequency analysis for darshan & json + if x.endswith(".darshan") | x.endswith(".jsonl"): + cmd_input = ["ftio", x] + if additional_ftio_args is not None: + cmd_input += additional_ftio_args + frequency, set = extract(cmd_input=cmd_input) + # extract data from traces obtained from sdumont dataset, most certainly not an approach for any .csv file + if x.endswith(".csv"): + csv_file = pandas.read_csv(x) + n = 0 + for y in csv_file["both"]: + set.append([y]) + n = n + 1 + + # size limit on local machine, can be changed IF positional encoding in hybrid model is also changed + if n == 5555: + break + # min-max normalization + min1, max1 = min(set)[0], max(set)[0] + for u in range(len(set)): + set[u][0] = (set[u][0] - min1) / (max1 - min1 + 1e-8) + + # prediction + dataset = BandwidthDataset([set], num_parts=frequency) + prediction_current = __predict_next_trace(model, dataset) + predictions.append(prediction_current) + + return predictions + + +def spectral_loss(pred, target): + """Basic spectral loss function based on one-dimensional fourier transform + Args: + pred: the predicted values + target: the actual values + """ + # pred, target: [batch, seq_len] + pred_fft = torch.fft.rfft(pred, dim=1) + target_fft = torch.fft.rfft(target, dim=1) + return F.mse_loss(torch.abs(pred_fft), torch.abs(target_fft)) + + +def __predict_next_trace(model, dataset): + """Uses the provided hybrid model to predict the next bandwidth sequence. + + Args: + model: the hybrid model of this file + dataset: dataset containing the bandwidth sequences used for the prediction + + """ + + model.eval() + with torch.no_grad(): + prediction = [] + for past, future in dataset: + # prediction + squeezed_past = past.unsqueeze(0) + pred_future = model(squeezed_past, prediction_length=future.size(0)) + pred_sequence = pred_future.squeeze(0).tolist() + # only positive values are interesting + pred_sequence = np.abs(pred_sequence) + + min_val, max_val = pred_sequence.min(), pred_sequence.max() + pred_sequence = (pred_sequence - min_val) / (max_val - min_val + 1e-8) + + prediction.append(pred_sequence) + + # currently assumes that past and future have the same length + # calculate MAE, MSE and RMSE + mae = 0 + mse = 0 + for actual, predicted in zip(future, pred_sequence, strict=False): + inner = actual.item() - predicted[0] + mae = mae + abs(inner) + mse = mse + (inner * inner) + + n = future.size(dim=0) + if n == 0: + n = 1 + mae = mae / n + mse = mse / n + + print("Predicted Bandwidth Trace:") + print(pred_sequence) + return prediction + + +def extract(cmd_input, msgs=None) -> list: + """Extraction method leveraging frequency analysis for the initial data in dataframe form. And calculates the amount + of expected partial patterns + + Args: + cmd_input: The ftio arguments in the form of a list of strings. + msgs: ZMQ message (not used / no use intended yet) + + Returns: + list[ n , list[bandwidth], ...] + """ + result = [] + n = 3 + cmd_input.append("--machine_learning") + prediction_list, args = ftio_core.main(cmd_input) + # take only the latest for now + if prediction_list: + prediction = prediction_list[-1] + b_sampled = prediction._b_sampled + for x in b_sampled: + result.append([x]) + # calculates the amount of partial patterns using the predicted dominant frequency of FTIO + if (prediction.dominant_freq.size != 0) and (prediction.dominant_freq[0] != 0): + n = int(prediction.t_end / (1 / prediction.dominant_freq[0])) + return [n, result] + + +def train_arima( + file_or_directory, max_depth=3, model_architecture="SARIMA", additional_ftio_args=None +): + """The entry point for training the ARIMA and SARIMA models + + Args: + file_or_directory: the file or directory to train the ARIMA model + max_depth: the maximum depth of the ARIMA model + model_architecture: choose between SARIMA or ARIMA architecture + additional_ftio_args: additional supported ftio arguments that aren't the initial ftio call and files + + """ + # checks if file_or_path is either just a singular file OR a path to a directory with files + # and then saves the individual files paths + files = [] + if os.path.isfile(file_or_directory): + files.append(file_or_directory) + elif os.path.isdir(file_or_directory): + files = [ + os.path.join(file_or_directory, f) + for f in os.listdir(file_or_directory) + if os.path.isfile(os.path.join(file_or_directory, f)) + ] + + # in the case of no supplied files or filepath + if not files: + print("No files found. Aborting prediction.") + return [] + + # use extract method for extraction of data + predictions = [] + sequences = [] + for file in files: + cmd_input = ["ftio", file] + if additional_ftio_args is not None: + cmd_input += additional_ftio_args + n, bandwidth_sequence = extract(cmd_input=cmd_input) + sequences.append([n, bandwidth_sequence]) + + for sequence in sequences: + # initial min-max normalization + min_val_initial, max_val_initial = inf, -inf + for value in sequence[1]: + if value[0] < min_val_initial: + min_val_initial = value[0] + if value[0] > max_val_initial: + max_val_initial = value[0] + + for value in sequence[1]: + value[0] = (value[0] - min_val_initial) / ( + max_val_initial - min_val_initial + 1e-8 + ) + + # split data into trainings and comparison data + split_index = int(len(sequence[1]) / sequence[0]) * (sequence[0] - 1) + trainings_part = sequence[1][:split_index] + future_part = sequence[1][split_index:] + d = 0 + # main loop to apply KPSS and ADF to find depth d + for y in range(1): + partial_sequence = pd.Series( + [x[y] for x in trainings_part], + pd.Index(np.arange(len([x[y] for x in trainings_part]), dtype="int64")), + ) + + try: + kps = kpss(partial_sequence.dropna())[1] + except: + kps = 0.0 + + try: + adf = adfuller(partial_sequence.dropna())[1] + except: + adf = 1.0 + + while d is not max_depth and kps > 0.05 > adf: + partial_sequence = partial_sequence.diff() + d = d + 1 + + try: + kps = kpss(partial_sequence.dropna())[1] + except: + kps = 0.0 + + try: + adf = adfuller(partial_sequence.dropna())[1] + except: + adf = 1.0 + # in case NA, INF and -INF values are introduced by differencing + partial_sequence = partial_sequence.fillna(0.0) + partial_sequence = partial_sequence.replace([np.inf, -np.inf], 0) + + final_model = None + best_aic = inf + # search grid based on AIC, limited to max p = 5 and q = 8, going deeper requires longer computing + for p in range(5): + for q in range(8): + try: + if model_architecture == "SARIMA": + model = SARIMAX( + partial_sequence, + order=(p, d, q), + seasonal_order=(0, 0, 0, len(future_part)), + ) + else: + model = ARIMA(partial_sequence, order=(p, d, q)) + model = model.fit() + + # application of the AIC + aic = model.aic + if aic < best_aic: + final_model = model + best_aic = aic + print("p : d : q", (p, d, q)) + # some variations will throw exceptions and warnings, this will exclude them + except: + continue + # the forecast + prediction = final_model.forecast(len(future_part)) + # min-max normalization for uniform displaying of results to actual + min_val, max_val = prediction.min(), prediction.max() + prediction = (prediction - min_val) / (max_val - min_val + 1e-8) + predictions.append(prediction) + return predictions diff --git a/ftio/parse/args.py b/ftio/parse/args.py index 38a68cb..225d6cf 100644 --- a/ftio/parse/args.py +++ b/ftio/parse/args.py @@ -250,6 +250,14 @@ def parse_args(argv: list, name="") -> argparse.Namespace: help="if set, autocorrelation is calculated in addition to DFT. The results are merged to a single prediction at the end", ) parser.set_defaults(autocorrelation=False) + parser.add_argument( + "-ml", + "--machine_learning", + dest="machine_learning", + action="store_true", + help="if set, machine learning is enabled (api call only)", + ) + parser.set_defaults(machine_learning=False) parser.add_argument( "-w", "--window_adaptation", diff --git a/ftio/plot/dash_files/dash_app.py b/ftio/plot/dash_files/dash_app.py index dbbeba7..76bfc94 100644 --- a/ftio/plot/dash_files/dash_app.py +++ b/ftio/plot/dash_files/dash_app.py @@ -9,7 +9,6 @@ """ import importlib.util -import sys import ftio.plot.dash_files.constants.id as id import ftio.plot.dash_files.constants.io_mode as io_mode @@ -17,7 +16,7 @@ DASH_AVAILABLE = importlib.util.find_spec("dash") is not None if not DASH_AVAILABLE: - sys.exit( + raise RuntimeError( "Dash module not found. Please install it using 'make full' or 'pip install dash dash-extensions'." ) else: diff --git a/pyproject.toml b/pyproject.toml index df3a5fe..eec2a54 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -127,6 +127,11 @@ plot-libs = [ "trace_updater", ] +ml-libs = [ + "torch", + "statsmodels" +] + [tool.setuptools.dynamic] version = { attr = "ftio.__version__" } readme = { file = ["README.md"] } @@ -136,6 +141,11 @@ log_cli = true log_cli_level = "INFO" log_cli_format = "%(asctime)s [%(levelname)8s] %(message)s (%(filename)s:%(lineno)s)" log_cli_date_format = "%Y-%m-%d %H:%M:%S" +filterwarnings = [ + "ignore::statsmodels.tools.sm_exceptions.InterpolationWarning", + "ignore::statsmodels.tools.sm_exceptions.ConvergenceWarning", + "ignore::UserWarning", +] [tool.coverage.run] source = ["ftio"] diff --git a/test/test_ml_models.py b/test/test_ml_models.py new file mode 100644 index 0000000..319ec00 --- /dev/null +++ b/test/test_ml_models.py @@ -0,0 +1,89 @@ +import importlib.util +import os + +import pytest + +if importlib.util.find_spec("torch") is None: + pytest.skip("Torch not available, skipping ML tests.", allow_module_level=True) + +from ftio.ml.models_and_training import ( + BandwidthDataset, + extract, + predict_next_sequence, + train_arima, + train_hybrid_model, +) + +""" +Tests for the active workflow (and application) of the implemented models, dataset and functions. +""" + + +def test_hybrid_model(): + """ + Tests the training and prediction of the hybrid-model. + """ + file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") + model = train_hybrid_model( + file, epochs=10, lr=0.003, additional_ftio_args=["-e", "no"] + ) + _ = predict_next_sequence(model, file, additional_ftio_args=["-e", "no"]) + assert True + + +def test_hybrid_model_resume_training(): + """ + Tests the saving of the hybrid model's checkpoint and resuming of training of the model. + """ + file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") + _ = train_hybrid_model( + file, epochs=10, lr=0.003, save=True, additional_ftio_args=["-e", "no"] + ) + model = train_hybrid_model( + file, + epochs=10, + lr=0.003, + load_state_dict_and_optimizer_state="model_and_optimizer.pth", + additional_ftio_args=["-e", "no"], + ) + _ = predict_next_sequence(model, file, additional_ftio_args=["-e", "no"]) + assert True + + +def test_extract(): + """ + Tests the extract functionality when providing FTIO arguments. + """ + file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") + args = ["ftio", file, "-e", "no"] + n, data = extract(args) + assert True + + +def test_dataset(): + """ + Tests the correct initialization of the dataset. + """ + file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") + args = ["ftio", file, "-e", "no"] + n, data = extract(args) + _ = BandwidthDataset([data], num_parts=n) + assert True + + +def test_arima_model(): + """ + Tests the training and prediction of the ARIMA model. + """ + file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") + _ = train_arima(file, model_architecture="ARIMA", additional_ftio_args=["-e", "no"]) + assert True + + +def test_sarima_model(): + """ + Tests the training and prediction of the SARIMA model. + """ + file = os.path.join(os.path.dirname(__file__), "../examples/tmio/JSONL/8.jsonl") + _ = train_arima(file, model_architecture="SARIMA", additional_ftio_args=["-e", "no"]) + assert True