From fbcf4b336f8134aed89068834ec3f399f5c82e9b Mon Sep 17 00:00:00 2001 From: vile319 Date: Tue, 10 Feb 2026 15:29:42 -0500 Subject: [PATCH] Optimize scikit-learn pipeline and add --scikit_model_name flag --- src/protify/main.py | 37 +++-- src/protify/probes/lazy_predict.py | 233 +++++++++++++++++---------- src/protify/probes/scikit_classes.py | 63 ++++++-- 3 files changed, 222 insertions(+), 111 deletions(-) diff --git a/src/protify/main.py b/src/protify/main.py index 1ae937d..1f142cc 100644 --- a/src/protify/main.py +++ b/src/protify/main.py @@ -1,10 +1,6 @@ +import entrypoint_setup + import os -os.environ["CUBLAS_WORKSPACE_CONFIG"] = ":4096:8" -os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2" -os.environ["TF_ENABLE_ONEDNN_OPTS"] = "0" -os.environ["TOKENIZERS_PARALLELISM"] = "true" -import sys -import subprocess import argparse import yaml from types import SimpleNamespace @@ -73,6 +69,7 @@ def parse_arguments(): parser.add_argument("--scikit_cv", type=int, default=3, help="Number of cross-validation folds for scikit model.") parser.add_argument("--scikit_random_state", type=int, default=None, help="Random state for scikit model (if None, uses global seed).") parser.add_argument("--scikit_model_name", type=str, default=None, help="Name of the scikit model to use.") + parser.add_argument("--scikit_model_args", type=str, default=None, help="JSON string of hyperparameters to use (skips tuning). E.g. '{\"n_estimators\": 500, \"max_depth\": 7}'") parser.add_argument("--use_scikit", action="store_true", default=False, help="Use scikit model (default: False).") parser.add_argument("--n_jobs", type=int, default=1, help="Number of processes to use in scikit.") # TODO integrate with GUI and main @@ -234,7 +231,6 @@ def parse_arguments(): import torch from torchinfo import summary -import numpy as np from probes.get_probe import ProbeArguments, get_probe from base_models.get_base_models import BaseModelArguments, get_tokenizer, get_base_model_for_training @@ -247,8 +243,6 @@ def parse_arguments(): from utils import torch_load, print_message, expand_dms_ids_all from visualization.plot_result import create_plots from hyperopt_utils import HyperoptModule -from benchmarks.proteingym.zero_shot import run_zero_shot -from benchmarks.proteingym.scoring_utils import collect_proteingym_spearman from benchmarks.proteingym.scorer import ProteinGymRunner from benchmarks.proteingym.compare_scoring_methods import compare_scoring_methods from seed_utils import set_global_seed @@ -660,14 +654,25 @@ def run_scikit_scheme(self): for data_name, dataset in self.datasets.items(): ### find best scikit model and parameters via cross validation and lazy predict X_train, y_train, X_valid, y_valid, X_test, y_test, label_type = self.prepare_scikit_dataset(model_name, dataset) - if label_type == 'singlelabel': - results = scikit_probe.find_best_classifier(X_train, y_train, X_valid, y_valid) - elif label_type == 'regression': - results = scikit_probe.find_best_regressor(X_train, y_train, X_valid, y_valid) + + # If a specific model is specified, skip LazyPredict and go straight to that model + if self.scikit_args.model_name is not None: + print_message(f"Skipping LazyPredict, using specified model: {self.scikit_args.model_name}") + results = scikit_probe.run_specific_model(X_train, y_train, X_valid, y_valid, X_test, y_test, model_results=None) else: - raise ValueError(f'Label type {label_type} not supported') - ### train and evaluate best model - results = scikit_probe.run_specific_model(X_train, y_train, X_valid, y_valid, X_test, y_test, results) + # Find best model via LazyPredict + if label_type == 'singlelabel': + results = scikit_probe.find_best_classifier(X_train, y_train, X_valid, y_valid) + elif label_type == 'regression': + results = scikit_probe.find_best_regressor(X_train, y_train, X_valid, y_valid) + else: + raise ValueError(f'Label type {label_type} not supported') + # Train and evaluate best model with optimal hyperparameters + results = scikit_probe.run_specific_model(X_train, y_train, X_valid, y_valid, X_test, y_test, results) + + # Log the results for plotting + metrics_dict = {'test_mcc': results.final_scores} if isinstance(results.final_scores, (int, float)) else results.final_scores + self.log_metrics(data_name, model_name, metrics_dict, split_name='test') @log_method_calls def generate_plots(self): diff --git a/src/protify/probes/lazy_predict.py b/src/protify/probes/lazy_predict.py index 602fe2b..b989ade 100644 --- a/src/protify/probes/lazy_predict.py +++ b/src/protify/probes/lazy_predict.py @@ -55,7 +55,29 @@ "LinearSVC", "Perceptron", "MLPClassifier", - "SGDClassifier" + "SGDClassifier", + # O(n²) memory models - too slow for large datasets + "LabelPropagation", + "LabelSpreading", + "SVC", + "NuSVC", + # Sequential ensemble models - slow for large datasets + "AdaBoostClassifier", + "BaggingClassifier", + # O(n×m) prediction time - slow for large test sets + "KNeighborsClassifier", + # Unbounded tree depth - very slow on high-dim data + "DecisionTreeClassifier", + "ExtraTreeClassifier", + "ExtraTreesClassifier", + # Fails on negative values after StandardScaler + "CategoricalNB", + # O(d²) or O(d³) - slow on high-dimensional data (4608 features) + "LinearDiscriminantAnalysis", + "QuadraticDiscriminantAnalysis", + # Requires estimator argument + "FixedThresholdClassifier", + "TunedThresholdClassifierCV", ] removed_regressors = [ @@ -82,7 +104,16 @@ "LassoLarsCV", "ElasticNetCV", "LinearSVR", - "LassoLarsIC" + "LassoLarsIC", + # Sequential ensemble models - slow for large datasets + "AdaBoostRegressor", + "BaggingRegressor", + # O(n×m) prediction time - slow for large test sets + "KNeighborsRegressor", + # Unbounded tree depth - very slow on high-dim data + "DecisionTreeRegressor", + "ExtraTreeRegressor", + "ExtraTreesRegressor", ] # Tuple of (name, class) @@ -176,6 +207,16 @@ CLASSIFIERS.append(("LGBMClassifier", lightgbm.LGBMClassifier)) # CLASSIFIERS.append(('CatBoostClassifier',catboost.CatBoostClassifier)) +# Update dicts with XGB and LGBM +CLASSIFIER_DICT["XGBClassifier"] = xgboost.XGBClassifier +CLASSIFIER_DICT["LGBMClassifier"] = lightgbm.LGBMClassifier +REGRESSOR_DICT["XGBRegressor"] = xgboost.XGBRegressor +REGRESSOR_DICT["LGBMRegressor"] = lightgbm.LGBMRegressor +ALL_MODEL_DICT["XGBClassifier"] = xgboost.XGBClassifier +ALL_MODEL_DICT["LGBMClassifier"] = lightgbm.LGBMClassifier +ALL_MODEL_DICT["XGBRegressor"] = xgboost.XGBRegressor +ALL_MODEL_DICT["LGBMRegressor"] = lightgbm.LGBMRegressor + numeric_transformer = Pipeline( steps=[("imputer", SimpleImputer(strategy="mean")), ("scaler", StandardScaler())] ) @@ -309,6 +350,13 @@ def fit(self, X_train, X_test, y_train, y_test): ("categorical_high", categorical_transformer_high, categorical_high), ] ) + + # Precompute preprocessing once for all models (major optimization for large datasets) + print_message("Preprocessing data once for all models...") + preprocess_start = time.time() + X_train_transformed = preprocessor.fit_transform(X_train) + X_test_transformed = preprocessor.transform(X_test) + print_message(f"Preprocessing completed in {time.time() - preprocess_start:.1f}s") if self.classifiers == "all": self.classifiers = CLASSIFIERS @@ -323,26 +371,30 @@ def fit(self, X_train, X_test, y_train, y_test): print_message(exception) print_message("Invalid Classifier(s)") - pbar = tqdm(self.classifiers) - for name, model in pbar: - pbar.set_description(f"Training {name}") + # Track failed models + failed_models = [] + total_start = time.time() + + for name, model in tqdm(self.classifiers, desc="Training classifiers"): + print_message(f"Starting {name}...") start = time.time() try: + # Build model kwargs + model_kwargs = {} if "random_state" in model().get_params().keys(): - pipe = Pipeline( - steps=[ - ("preprocessor", preprocessor), - ("classifier", model(random_state=self.random_state)), - ] - ) - else: - pipe = Pipeline( - steps=[("preprocessor", preprocessor), ("classifier", model())] - ) - - pipe.fit(X_train, y_train) - self.models[name] = pipe - y_pred = pipe.predict(X_test) + model_kwargs["random_state"] = self.random_state + # Enable parallelization for models that support it + if "n_jobs" in model().get_params().keys(): + model_kwargs["n_jobs"] = -1 + # Enable verbose for boosting models to show iteration progress + if name in ("XGBClassifier", "LGBMClassifier"): + model_kwargs["verbose"] = 1 + + # Train directly on preprocessed data (no Pipeline needed) + clf = model(**model_kwargs) + clf.fit(X_train_transformed, y_train) + self.models[name] = clf + y_pred = clf.predict(X_test_transformed) accuracy = accuracy_score(y_test, y_pred, normalize=True) b_accuracy = balanced_accuracy_score(y_test, y_pred) f1 = f1_score(y_test, y_pred, average="weighted") @@ -353,51 +405,28 @@ def fit(self, X_train, X_test, y_train, y_test): if self.ignore_warnings is False: print_message("ROC AUC couldn't be calculated for " + name) print_message(exception) + fit_time = time.time() - start names.append(name) Accuracy.append(accuracy) B_Accuracy.append(b_accuracy) ROC_AUC.append(roc_auc) F1.append(f1) - TIME.append(time.time() - start) + TIME.append(fit_time) + + print_message(f" {name} completed in {fit_time:.1f}s | Acc: {accuracy:.3f} | F1: {f1:.3f}") if self.custom_metric is not None: custom_metric = self.custom_metric(y_test, y_pred) CUSTOM_METRIC.append(custom_metric) - if self.verbose > 0: - if self.custom_metric is not None: - print_message( - { - "Model": name, - "Accuracy": accuracy, - "Balanced Accuracy": b_accuracy, - "ROC AUC": roc_auc, - "F1 Score": f1, - "Custom Metric": custom_metric, - "Time taken": time.time() - start, - } - ) - else: - print_message( - { - "Model": name, - "Accuracy": accuracy, - "Balanced Accuracy": b_accuracy, - "ROC AUC": roc_auc, - "F1 Score": f1, - "Time taken": time.time() - start, - } - ) if self.predictions: predictions[name] = y_pred - except Exception as exception: + failed_models.append(name) if self.ignore_warnings is False: print_message(f'\n{name} model failed to execute') print_message(exception) - pbar.update(1) - pbar.close() if self.custom_metric is None: scores = pd.DataFrame( @@ -426,10 +455,28 @@ def fit(self, X_train, X_test, y_train, y_test): "Model" ) + # Print summary + total_time = time.time() - total_start + n_success = len(names) + n_failed = len(failed_models) + best_model = scores.index[0] if len(scores) > 0 else "N/A" + best_score = scores["Balanced Accuracy"].iloc[0] if len(scores) > 0 else 0 + + if self.verbose > 0: + # Full table + failed models + summary = f"\nLazyClassifier Results ({n_success} succeeded, {n_failed} failed, {total_time:.1f}s)\n" + summary += scores.to_string() + if failed_models: + summary += f"\n\nFailed: {', '.join(failed_models)}" + print_message(summary) + else: + # 1-line summary + print_message(f"Completed {n_success + n_failed} classifiers in {total_time:.1f}s | {n_success} succeeded, {n_failed} failed | Best: {best_model} ({best_score:.2f})") + if self.predictions: predictions_df = pd.DataFrame.from_dict(predictions) - - return scores, predictions_df if self.predictions is True else scores + return scores, predictions_df + return scores def provide_models(self, X_train, X_test, y_train, y_test): """ @@ -553,6 +600,13 @@ def fit(self, X_train, X_test, y_train, y_test): ("categorical_high", categorical_transformer_high, categorical_high), ] ) + + # Precompute preprocessing once for all models (major optimization for large datasets) + print_message("Preprocessing data once for all models...") + preprocess_start = time.time() + X_train_transformed = preprocessor.fit_transform(X_train) + X_test_transformed = preprocessor.transform(X_test) + print_message(f"Preprocessing completed in {time.time() - preprocess_start:.1f}s") if self.regressors == "all": self.regressors = REGRESSORS @@ -567,26 +621,30 @@ def fit(self, X_train, X_test, y_train, y_test): print_message(exception) print_message("Invalid Regressor(s)") - pbar = tqdm(self.regressors) - for name, model in pbar: - pbar.set_description(f"Training {name}") + # Track failed models + failed_models = [] + total_start = time.time() + + for name, model in tqdm(self.regressors, desc="Training regressors"): + print_message(f"Starting {name}...") start = time.time() try: + # Build model kwargs + model_kwargs = {} if "random_state" in model().get_params().keys(): - pipe = Pipeline( - steps=[ - ("preprocessor", preprocessor), - ("regressor", model(random_state=self.random_state)), - ] - ) - else: - pipe = Pipeline( - steps=[("preprocessor", preprocessor), ("regressor", model())] - ) - - pipe.fit(X_train, y_train) - self.models[name] = pipe - y_pred = pipe.predict(X_test) + model_kwargs["random_state"] = self.random_state + # Enable parallelization for models that support it + if "n_jobs" in model().get_params().keys(): + model_kwargs["n_jobs"] = -1 + # Enable verbose for boosting models to show iteration progress + if name in ("XGBRegressor", "LGBMRegressor"): + model_kwargs["verbose"] = 1 + + # Train directly on preprocessed data (no Pipeline needed) + reg = model(**model_kwargs) + reg.fit(X_train_transformed, y_train) + self.models[name] = reg + y_pred = reg.predict(X_test_transformed) r_squared = r2_score(y_test, y_pred) adj_rsquared = adjusted_rsquared( @@ -594,40 +652,28 @@ def fit(self, X_train, X_test, y_train, y_test): ) rmse = np.sqrt(mean_squared_error(y_test, y_pred)) + fit_time = time.time() - start names.append(name) R2.append(r_squared) ADJR2.append(adj_rsquared) RMSE.append(rmse) - TIME.append(time.time() - start) + TIME.append(fit_time) + + print_message(f" {name} completed in {fit_time:.1f}s | R²: {r_squared:.3f} | RMSE: {rmse:.3f}") if self.custom_metric: custom_metric = self.custom_metric(y_test, y_pred) CUSTOM_METRIC.append(custom_metric) - if self.verbose > 0: - scores_verbose = { - "Model": name, - "R-Squared": r_squared, - "Adjusted R-Squared": adj_rsquared, - "RMSE": rmse, - "Time taken": time.time() - start, - } - - if self.custom_metric: - scores_verbose[self.custom_metric.__name__] = custom_metric - - print_message(scores_verbose) if self.predictions: predictions[name] = y_pred except Exception as exception: + failed_models.append(name) if self.ignore_warnings is False: print_message(f'\n{name} model failed to execute') print_message(exception) - pbar.update(1) - pbar.close() - scores = { "Model": names, "Adjusted R-Squared": ADJR2, @@ -644,9 +690,28 @@ def fit(self, X_train, X_test, y_train, y_test): "Model" ) + # Print summary + total_time = time.time() - total_start + n_success = len(names) + n_failed = len(failed_models) + best_model = scores.index[0] if len(scores) > 0 else "N/A" + best_score = scores["Adjusted R-Squared"].iloc[0] if len(scores) > 0 else 0 + + if self.verbose > 0: + # Full table + failed models + summary = f"\nLazyRegressor Results ({n_success} succeeded, {n_failed} failed, {total_time:.1f}s)\n" + summary += scores.to_string() + if failed_models: + summary += f"\n\nFailed: {', '.join(failed_models)}" + print_message(summary) + else: + # 1-line summary + print_message(f"Completed {n_success + n_failed} regressors in {total_time:.1f}s | {n_success} succeeded, {n_failed} failed | Best: {best_model} ({best_score:.2f})") + if self.predictions: predictions_df = pd.DataFrame.from_dict(predictions) - return scores, predictions_df if self.predictions is True else scores + return scores, predictions_df + return scores def provide_models(self, X_train, X_test, y_train, y_test): """ diff --git a/src/protify/probes/scikit_classes.py b/src/protify/probes/scikit_classes.py index 68dbfd2..9407dd3 100644 --- a/src/protify/probes/scikit_classes.py +++ b/src/protify/probes/scikit_classes.py @@ -34,18 +34,31 @@ def __init__( random_state: Optional[int] = None, # Specific model arguments (optional) model_name: Optional[str] = None, + scikit_model_name: Optional[str] = None, # CLI arg name + scikit_model_args: Optional[str] = None, # CLI arg - JSON string model_args: Optional[Dict[str, Any]] = None, production_model: bool = False, **kwargs, ): + import json # Tuning arguments self.n_iter = n_iter self.cv = cv self.random_state = random_state or get_global_seed() - # Specific model arguments - self.model_name = model_name - self.model_args = model_args if model_args is not None else {} + # Specific model arguments - scikit_model_name takes precedence (CLI arg) + self.model_name = scikit_model_name or model_name + + # Parse scikit_model_args JSON string if provided (CLI), otherwise use model_args dict + if scikit_model_args is not None: + try: + self.model_args = json.loads(scikit_model_args) + print_message(f"Using pre-specified hyperparameters (skipping tuning): {self.model_args}") + except json.JSONDecodeError as e: + raise ValueError(f"Failed to parse --scikit_model_args JSON: {e}") + else: + self.model_args = model_args if model_args is not None else {} + self.production_model = production_model @@ -93,8 +106,12 @@ def _tune_hyperparameters( """ param_distributions = HYPERPARAMETER_DISTRIBUTIONS.get(model_name, {}) if not param_distributions: + print_message(f"No hyperparameter distributions defined for {model_name}, using defaults") return model_class(), {} + print_message(f"Running RandomizedSearchCV with {self.args.n_iter} iterations, {self.args.cv}-fold CV...") + print_message(f"Hyperparameter search space: {list(param_distributions.keys())}") + random_search = RandomizedSearchCV( model_class(), param_distributions=param_distributions, @@ -102,10 +119,12 @@ def _tune_hyperparameters( scoring=custom_scorer, cv=self.args.cv, random_state=self.args.random_state, - n_jobs=self.n_jobs + n_jobs=self.n_jobs, + verbose=2 # Show progress for each fit ) random_search.fit(X_train, y_train) + print_message(f"Best CV score: {random_search.best_score_:.4f}") return random_search.best_estimator_, random_search.best_params_ def find_best_regressor( @@ -140,7 +159,8 @@ def find_best_regressor( # Get best model name and class best_model_name = initial_scores.index[0] - best_model_class = regressor.models[best_model_name].named_steps['regressor'].__class__ + # Models are now stored directly (not as Pipeline) after optimization + best_model_class = regressor.models[best_model_name].__class__ print_message(f"Best model name: {best_model_name}") print_message(f"Best model class: {best_model_class}") print_message(f"Initial scores: \n{initial_scores}") @@ -202,7 +222,8 @@ def find_best_classifier( # Get best model name and class best_model_name = initial_scores.index[0] - best_model_class = classifier.models[best_model_name].named_steps['classifier'].__class__ + # Models are now stored directly (not as Pipeline) after optimization + best_model_class = classifier.models[best_model_name].__class__ print_message(f"Best model name: {best_model_name}") print_message(f"Best model class: {best_model_class}") print_message(f"Initial scores: \n{initial_scores}") @@ -307,16 +328,36 @@ def run_specific_model( raise ValueError(f"Model {model_name} not supported") model_class = ALL_MODEL_DICT[model_name] - cls = model_class(**self.args.model_args) - cls.fit(X_train, y_train) - final_scores = scorer(cls, X_test, y_test) + + # Skip tuning if model_args is already provided + if self.args.model_args: + print_message(f"Skipping hyperparameter tuning - using provided args: {self.args.model_args}") + best_model = model_class(**self.args.model_args) + best_params = self.args.model_args + else: + # Run hyperparameter tuning + print_message(f"Tuning hyperparameters for {model_name}") + best_model, best_params = self._tune_hyperparameters( + model_class, + model_name, + X_train, + y_train, + scorer + ) + print_message(f"Best parameters: {best_params}") + + # Train final model with best parameters + print_message(f"Training final model with best parameters") + best_model.fit(X_train, y_train) + final_scores = scorer(best_model, X_test, y_test) + print_message(f"Final scores: {final_scores}") return ModelResults( initial_scores=None, best_model_name=model_name, - best_params=None, + best_params=best_params, final_scores=final_scores, - best_model=cls + best_model=best_model ) else: raise ValueError("Either model_name must be specified in args or model_results must be provided")