diff --git a/README.md b/README.md index 90384dae..97316553 100644 --- a/README.md +++ b/README.md @@ -161,17 +161,10 @@ Built-in procedures for searching over configuration knob combinations, includin ```text rapidfireai/ -├── fit - ├── automl/ # Search and AutoML algorithms for knob tuning - ├── backend/ # Core backend components (controller, scheduler, worker) - ├── db/ # Database interface and SQLite operations - ├── dispatcher/ # Flask-based web API for UI communication - ├── frontend/ # Frontend components (dashboard, IC Ops implementation) - ├── ml/ # ML training utilities and trainer classes - └── utils/ # Utility functions and helper modules +├── automl/ # Search and AutoML algorithms for knob tuning +├── cli.py # CLI script ├── evals ├── actors/ # Ray-based workers for doc and query processing - ├── automl/ # Search and AutoML algorithms for knob tuning ├── data/ # Data sharding and handling ├── db/ # Database interface and SQLite operations ├── dispatcher/ # Flask-based web API for UI communication @@ -179,7 +172,15 @@ rapidfireai/ ├── rag/ # Stages of RAG pipeline ├── scheduling/ # Fair scheduler for multi-config resource sharing └── utils/ # Utility functions and helper modules -└── experiment.py # Main experiment lifecycle management +├── experiment.py # Main experiment lifecycle management +├── fit + ├── backend/ # Core backend components (controller, scheduler, worker) + ├── db/ # Database interface and SQLite operations + ├── dispatcher/ # Flask-based web API for UI communication + ├── frontend/ # Frontend components (dashboard, IC Ops implementation) + ├── ml/ # ML training utilities and trainer classes + └── utils/ # Utility functions and helper modules +└── utils.py # Utility functions and helper modules ``` ## Architecture @@ -327,7 +328,9 @@ used to overwrite the defaults. - `RF_LOG_FILENAME` - Default log file name (default: rapidfire.log) - `RF_TRAINING_LOG_FILENAME` - Default training log file name (default: training.log) - `RF_DB_PATH` - Base directory for database files (default: ${RF_HOME}/db) -- `RF_TRACKING_BACKEND` - Tracking backend used (default: mlflow on Non-Google Colab and tensorboard on Google Colab) +- `RF_MLFLOW_ENABLED` - Enable MLFlow tracking backend +- `RF_TENSORBOARD_ENABLED` - Enable Tensorboard tracking backend +- `RF_TRACKIO_ENABLED` - Enable TrackIO tracking backend - `RF_COLAB_MODE` - Whether running on colab (default: false on Non-Google Colab and true on Google Colab) - `RF_TUTORIAL_PATH` - Location that `rapidfireai init` copies `tutorial_notebooks` to (default: ./tutorial_notebooks) - `RF_TEST_PATH` - Location that `rapidfireai --test-noteobooks` copies test notebooks to (default: ./tutorial_notebooks/tests) diff --git a/pyproject.toml b/pyproject.toml index eadff978..b3f60fb0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,21 +25,40 @@ classifiers = [ "Topic :: Software Development :: Libraries :: Python Modules", "Topic :: Software Development :: Libraries :: Application Frameworks", ] +# dependencies = [ +# # REST API (Dispatcher) +# "flask>=3.1.1", +# "flask-cors>=6.0.1", +# "waitress>=3.0.2", + +# # JSON Query Tool +# "jq>=1.10.0", +# # "protobuf==5.29.5", + +# # Other +# "dill>=0.3.0,<0.3.9", +# "jedi>=0.16", +# # "pytest>=8.4.2", +# "uv>=0.8.14", +# ] dependencies = [ # REST API (Dispatcher) - "flask>=3.1.1", - "flask-cors>=6.0.1", - "waitress>=3.0.2", + "flask", + "flask-cors", + "waitress", # JSON Query Tool - "jq>=1.10.0", + "jq", # "protobuf==5.29.5", # Other - "dill>=0.3.0,<0.3.9", - "jedi>=0.16", + "dill", + "jedi", # "pytest>=8.4.2", - "uv>=0.8.14", + "uv", + "trackio", + "mlflow", + "fsspec<=2025.10.0", ] [project.optional-dependencies] @@ -104,7 +123,7 @@ local_evals = [ "ray==2.44.1", # LLM Inference - "transformers==4.56.1", + "transformers>=4.56.1", "vllm==0.7.2", # OpenAI API diff --git a/rapidfireai/__init__.py b/rapidfireai/__init__.py index 05184e91..743042cb 100644 --- a/rapidfireai/__init__.py +++ b/rapidfireai/__init__.py @@ -7,13 +7,6 @@ __author__ = "RapidFire AI Inc." __email__ = "support@rapidfire.ai" -# Core imports - always available -# from rapidfireai.experiment import Experiment - -# Optional evals imports - gracefully handle missing dependencies -# get_dispatcher_url = None -# get_dispatcher_headers = None -# get_colab_auth_token = None try: from rapidfireai.experiment import Experiment @@ -47,7 +40,4 @@ def __repr__(self): "Experiment", "__version__", "__version_info__", - # "get_dispatcher_url", - # "get_dispatcher_headers", - # "get_colab_auth_token", ] diff --git a/rapidfireai/automl/model_config.py b/rapidfireai/automl/model_config.py index 6ae9214e..dcbbd659 100644 --- a/rapidfireai/automl/model_config.py +++ b/rapidfireai/automl/model_config.py @@ -1,7 +1,5 @@ """Model configuration for AutoML training and evaluation.""" - -# from __future__ import annotations - +from __future__ import annotations import copy import inspect from abc import ABC, abstractmethod @@ -11,6 +9,7 @@ from rapidfireai.automl.datatypes import List, Range + # Fit mode dependencies (peft, trl) try: from peft import LoraConfig @@ -140,8 +139,8 @@ class RFModelConfig: formatting_func: Callable | List | None = None compute_metrics: Callable | List | None = None peft_config: RFLoraConfig | List | None = None - # training_args: RFSFTConfig | RFDPOConfig | RFGRPOConfig | None = None - training_args = None + training_args: RFSFTConfig | RFDPOConfig | RFGRPOConfig | None = None + # training_args = None model_type: str | None = "causal_lm" model_kwargs: dict[str, Any] | None = None ref_model_name: str | None = None diff --git a/rapidfireai/cli.py b/rapidfireai/cli.py index 42887bc8..d0d35db0 100644 --- a/rapidfireai/cli.py +++ b/rapidfireai/cli.py @@ -397,10 +397,12 @@ def main(): parser.add_argument("--version", action="version", version=f"RapidFire AI {__version__}") parser.add_argument( - "--tracking-backend", - choices=["mlflow", "tensorboard", "both"], - default=os.getenv("RF_TRACKING_BACKEND", "mlflow" if not ColabConfig.ON_COLAB else "tensorboard"), - help="Tracking backend to use for metrics (default: mlflow)", + "--tracking-backends", + choices=["mlflow", "tensorboard", "trackio"], + default=["mlflow"] if not ColabConfig.ON_COLAB else ["tensorboard"], + help="Tracking backend to use for metrics (default: mlflow on Non-Google Colab and tensorboard on Google Colab)", + nargs="*", + action="extend" ) parser.add_argument( @@ -421,6 +423,8 @@ def main(): help="Copy test notebooks to the tutorial_notebooks directory", ) + parser.add_argument("--force", "-f", action="store_true", help="Force action without confirmation") + parser.add_argument("--evals", action="store_true", help="Initialize with evaluation dependencies") parser.add_argument("--log-lines", type=int, default=10, help="Number of lines to log to the console") @@ -429,14 +433,26 @@ def main(): # Set environment variables from CLI args - if args.tracking_backend: - os.environ["RF_TRACKING_BACKEND"] = args.tracking_backend + if args.tracking_backends: + os.environ["RF_MLFLOW_ENABLED"] = "false" + os.environ["RF_TENSORBOARD_ENABLED"] = "false" + os.environ["RF_TRACKIO_ENABLED"] = "false" + if "mlflow" in args.tracking_backends: + os.environ["RF_MLFLOW_ENABLED"] = "true" + if "tensorboard" in args.tracking_backends: + os.environ["RF_TENSORBOARD_ENABLED"] = "true" + if "trackio" in args.tracking_backends: + os.environ["RF_TRACKIO_ENABLED"] = "true" if args.tensorboard_log_dir: os.environ["RF_TENSORBOARD_LOG_DIR"] = args.tensorboard_log_dir if args.colab: os.environ["RF_COLAB_MODE"] = "true" elif ColabConfig.ON_COLAB and os.getenv("RF_COLAB_MODE") is None: os.environ["RF_COLAB_MODE"] = "true" + + # Handle force command separately + if args.force: + os.environ["RF_FORCE"] = "true" # Handle doctor command separately if args.command == "doctor": diff --git a/rapidfireai/evals/db/rf_db.py b/rapidfireai/evals/db/rf_db.py index 43854c60..a9d2bbc6 100644 --- a/rapidfireai/evals/db/rf_db.py +++ b/rapidfireai/evals/db/rf_db.py @@ -43,12 +43,22 @@ def _initialize_schema(self): self.db.conn.executescript(schema_sql) self.db.conn.commit() - # Migration: Add mlflow_run_id column to pipelines table if it doesn't exist + # Migration: Add metric_run_id to pipelines table if they don't exist try: cursor = self.db.conn.execute("PRAGMA table_info(pipelines)") columns = [row[1] for row in cursor.fetchall()] - if "mlflow_run_id" not in columns: - self.db.conn.execute("ALTER TABLE pipelines ADD COLUMN mlflow_run_id TEXT") + if "metric_run_id" not in columns: + self.db.conn.execute("ALTER TABLE pipelines ADD COLUMN metric_run_id TEXT") + self.db.conn.commit() + except Exception: + pass + + # Migration: Add metric_experiment_id to experiments table if they don't exist + try: + cursor = self.db.conn.execute("PRAGMA table_info(experiments)") + columns = [row[1] for row in cursor.fetchall()] + if "metric_experiment_id" not in columns: + self.db.conn.execute("ALTER TABLE experiments ADD COLUMN metric_experiment_id TEXT") self.db.conn.commit() except Exception: pass @@ -67,7 +77,7 @@ def create_experiment( num_actors: int, num_cpus: int = None, num_gpus: int = None, - mlflow_experiment_id: str = None, + metric_experiment_id: str = None, status: ExperimentStatus = ExperimentStatus.RUNNING, num_shards: int = 0, ) -> int: @@ -79,7 +89,7 @@ def create_experiment( num_actors: Number of query processing actors num_cpus: Number of CPUs allocated num_gpus: Number of GPUs allocated - mlflow_experiment_id: Optional MLflow experiment ID + metric_experiment_id: Optional MetricLogger experiment ID status: Initial status (default: ExperimentStatus.RUNNING) num_shards: Number of shards for the dataset (default: 0) @@ -89,7 +99,7 @@ def create_experiment( query = """ INSERT INTO experiments ( experiment_name, num_actors, num_shards, num_cpus, num_gpus, - mlflow_experiment_id, status, error + metric_experiment_id, status, error ) VALUES (?, ?, ?, ?, ?, ?, ?, '') """ self.db.execute( @@ -100,7 +110,7 @@ def create_experiment( num_shards, num_cpus, num_gpus, - mlflow_experiment_id, + metric_experiment_id, status.value, ), commit=True, @@ -241,7 +251,7 @@ def get_experiment(self, experiment_id: int) -> dict[str, Any] | None: """ query = """ SELECT experiment_id, experiment_name, num_actors, num_cpus, num_gpus, - mlflow_experiment_id, status, num_shards, error, created_at + metric_experiment_id, status, num_shards, error, created_at FROM experiments WHERE experiment_id = ? """ @@ -254,7 +264,7 @@ def get_experiment(self, experiment_id: int) -> dict[str, Any] | None: "num_actors": row[2], "num_cpus": row[3], "num_gpus": row[4], - "mlflow_experiment_id": row[5], + "metric_experiment_id": row[5], "status": row[6], "num_shards": row[7], "error": row[8], @@ -295,7 +305,7 @@ def get_running_experiment(self) -> dict[str, Any] | None: Dictionary with all experiment fields, or None if no running experiment """ query = """ - SELECT experiment_id, experiment_name, mlflow_experiment_id, num_shards, + SELECT experiment_id, experiment_name, metric_experiment_id, num_shards, num_actors, num_cpus, num_gpus, status, error, created_at FROM experiments WHERE status = ? @@ -308,7 +318,7 @@ def get_running_experiment(self) -> dict[str, Any] | None: return { "experiment_id": row[0], "experiment_name": row[1], - "mlflow_experiment_id": row[2], + "metric_experiment_id": row[2], "num_shards": row[3], "num_actors": row[4], "num_cpus": row[5], @@ -509,7 +519,7 @@ def create_pipeline( INSERT INTO pipelines ( context_id, pipeline_type, pipeline_config, pipeline_config_json, status, error, - current_shard_id, shards_completed, total_samples_processed, mlflow_run_id + current_shard_id, shards_completed, total_samples_processed, metric_run_id ) VALUES (?, ?, ?, ?, ?, '', '', 0, 0, NULL) """ self.db.execute( @@ -538,7 +548,7 @@ def set_pipeline_progress(self, pipeline_id: int) -> dict[str, Any] | None: query = """ SELECT pipeline_id, context_id, pipeline_type, pipeline_config, pipeline_config_json, status, current_shard_id, - shards_completed, total_samples_processed, mlflow_run_id, error, created_at + shards_completed, total_samples_processed, metric_run_id, error, created_at FROM pipelines WHERE pipeline_id = ? """ @@ -561,7 +571,7 @@ def set_pipeline_progress(self, pipeline_id: int) -> dict[str, Any] | None: "current_shard_id": row[6], "shards_completed": row[7], "total_samples_processed": row[8], - "mlflow_run_id": row[9], + "metric_run_id": row[9], "error": row[10], "created_at": row[11], } @@ -580,7 +590,7 @@ def get_pipeline(self, pipeline_id: int) -> dict[str, Any] | None: query = """ SELECT pipeline_id, context_id, pipeline_type, pipeline_config, pipeline_config_json, status, current_shard_id, - shards_completed, total_samples_processed, mlflow_run_id, error, created_at + shards_completed, total_samples_processed, metric_run_id, error, created_at FROM pipelines WHERE pipeline_id = ? """ @@ -603,7 +613,7 @@ def get_pipeline(self, pipeline_id: int) -> dict[str, Any] | None: "current_shard_id": row[6], "shards_completed": row[7], "total_samples_processed": row[8], - "mlflow_run_id": row[9], + "metric_run_id": row[9], "error": row[10], "created_at": row[11], } @@ -673,7 +683,7 @@ def get_all_pipelines(self) -> list[dict[str, Any]]: query = """ SELECT pipeline_id, context_id, pipeline_type, pipeline_config, pipeline_config_json, status, current_shard_id, - shards_completed, total_samples_processed, mlflow_run_id, error, created_at + shards_completed, total_samples_processed, metric_run_id, error, created_at FROM pipelines ORDER BY pipeline_id DESC """ @@ -698,7 +708,7 @@ def get_all_pipelines(self) -> list[dict[str, Any]]: "current_shard_id": row[6], "shards_completed": row[7], "total_samples_processed": row[8], - "mlflow_run_id": row[9], + "metric_run_id": row[9], "error": row[10], "created_at": row[11], } @@ -765,16 +775,16 @@ def set_pipeline_error(self, pipeline_id: int, error: str): query = "UPDATE pipelines SET error = ? WHERE pipeline_id = ?" self.db.execute(query, (error, pipeline_id), commit=True) - def set_pipeline_mlflow_run_id(self, pipeline_id: int, mlflow_run_id: str): + def set_pipeline_metric_run_id(self, pipeline_id: int, metric_run_id: str): """ - Set MLflow run ID for a pipeline. + Set MetricLogger run ID for a pipeline. Args: pipeline_id: ID of the pipeline - mlflow_run_id: MLflow run ID + metric_run_id: MetricLogger run ID """ - query = "UPDATE pipelines SET mlflow_run_id = ? WHERE pipeline_id = ?" - self.db.execute(query, (mlflow_run_id, pipeline_id), commit=True) + query = "UPDATE pipelines SET metric_run_id = ? WHERE pipeline_id = ?" + self.db.execute(query, (metric_run_id, pipeline_id), commit=True) # ============================================================================ # ACTOR_TASKS TABLE METHODS diff --git a/rapidfireai/evals/db/tables.sql b/rapidfireai/evals/db/tables.sql index fed096cd..ea9f215f 100644 --- a/rapidfireai/evals/db/tables.sql +++ b/rapidfireai/evals/db/tables.sql @@ -9,7 +9,7 @@ CREATE TABLE IF NOT EXISTS experiments ( experiment_id INTEGER PRIMARY KEY AUTOINCREMENT, experiment_name TEXT NOT NULL, - mlflow_experiment_id TEXT, + metric_experiment_id TEXT, num_shards INTEGER DEFAULT 0, num_actors INTEGER NOT NULL, num_cpus INTEGER, @@ -50,7 +50,7 @@ CREATE TABLE IF NOT EXISTS pipelines ( current_shard_id INTEGER DEFAULT 0, -- Next shard to process shards_completed INTEGER DEFAULT 0, -- Number of shards completed total_samples_processed INTEGER DEFAULT 0, - mlflow_run_id TEXT, -- MLflow run ID for this pipeline + metric_run_id TEXT, -- MetricLogger run ID for this pipeline error TEXT DEFAULT '', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, diff --git a/rapidfireai/evals/scheduling/controller.py b/rapidfireai/evals/scheduling/controller.py index 994bdcf5..d1722f50 100644 --- a/rapidfireai/evals/scheduling/controller.py +++ b/rapidfireai/evals/scheduling/controller.py @@ -41,7 +41,7 @@ def __init__( self, experiment_name: str, experiment_path: str = RF_EXPERIMENT_PATH, - mlflow_manager=None, + metric_manager=None, ): """ Initialize the controller. @@ -49,14 +49,14 @@ def __init__( Args: experiment_name: Name of the experiment experiment_path: Path to experiment logs/artifacts - mlflow_manager: Optional MLflowManager instance for logging metrics + metric_manager: Optional MetricLogger instance for logging metrics """ self.aggregator = Aggregator() self.dataloader = DataLoader() self.scheduler = Scheduler(strategy="round_robin") self.experiment_name = experiment_name self.experiment_path = experiment_path - self.mlflow_manager = mlflow_manager + self.metric_manager = metric_manager # Initialize logger logging_manager = RFLogger(experiment_name=self.experiment_name, experiment_path=self.experiment_path) @@ -501,37 +501,38 @@ def _register_pipelines( status=PipelineStatus.NEW, ) - # Create MLflow run for this pipeline if MLflow is enabled - if self.mlflow_manager: + # Create MetricLogger run for this pipeline + metric_run_id = None + if self.metric_manager: try: pipeline_name = pipeline_config.get("pipeline_name", f"Pipeline {pipeline_id}") - mlflow_run_id = self.mlflow_manager.create_run(f"{pipeline_name}_{pipeline_id}") - db.set_pipeline_mlflow_run_id(pipeline_id, mlflow_run_id) + metric_run_id = self.metric_manager.create_run(f"{pipeline_name}_{pipeline_id}") + db.set_pipeline_metric_run_id(pipeline_id, metric_run_id) pipeline = pipeline_config.get("pipeline") if pipeline: if hasattr(pipeline, "model_config") and pipeline.model_config: model_name = pipeline.model_config.get("model", "unknown") - self.mlflow_manager.log_param(mlflow_run_id, "model", model_name) + self.metric_manager.log_param(metric_run_id, "model", model_name) if hasattr(pipeline, "rag") and pipeline.rag: if hasattr(pipeline.rag, "search_type"): - self.mlflow_manager.log_param(mlflow_run_id, "rag_search_type", str(pipeline.rag.search_type)) + self.metric_manager.log_param(metric_run_id, "rag_search_type", str(pipeline.rag.search_type)) if hasattr(pipeline.rag, "search_kwargs") and pipeline.rag.search_kwargs: k = pipeline.rag.search_kwargs.get("k") if k is not None: - self.mlflow_manager.log_param(mlflow_run_id, "rag_k", str(k)) + self.metric_manager.log_param(metric_run_id, "rag_k", str(k)) # Extract sampling params if hasattr(pipeline, "sampling_params") and pipeline.sampling_params: import json sampling_str = json.dumps(pipeline.sampling_params) if isinstance(pipeline.sampling_params, dict) else str(pipeline.sampling_params) - self.mlflow_manager.log_param(mlflow_run_id, "sampling_params", sampling_str) + self.metric_manager.log_param(metric_run_id, "sampling_params", sampling_str) - self.logger.debug(f"Created MLflow run {mlflow_run_id} for pipeline {pipeline_id}") + self.logger.debug(f"Created Metrics run {metric_run_id} for pipeline {pipeline_id}") except Exception as e: - self.logger.warning(f"Failed to create MLflow run for pipeline {pipeline_id}: {e}") - + self.logger.warning(f"Failed to create Metrics run for pipeline {pipeline_id}: {e}") + pipeline_ids.append(pipeline_id) pipeline_id_to_config[pipeline_id] = pipeline_config @@ -673,12 +674,12 @@ def _compute_final_metrics_for_pipelines( metrics=display_metrics ) - # Log final metrics to MLflow - if self.mlflow_manager: + # Log final metrics to MetricLogger + if self.metric_manager: try: pipeline = db.get_pipeline(pipeline_id) - mlflow_run_id = pipeline.get("mlflow_run_id") if pipeline else None - if mlflow_run_id: + metric_run_id = pipeline.get("metric_run_id") if pipeline else None + if metric_run_id: total_samples = pipeline.get("total_samples_processed", 0) if total_dataset_size and total_dataset_size > 0: percentage_processed = (total_samples / total_dataset_size * 100) @@ -702,30 +703,30 @@ def _compute_final_metrics_for_pipelines( # Log main metric value if isinstance(metric_value, (int, float)): try: - self.mlflow_manager.log_metric(mlflow_run_id, metric_name, float(metric_value), step=step) + self.metric_manager.log_metric(metric_run_id, metric_name, float(metric_value), step=step) except Exception as e: - self.logger.debug(f"Failed to log final metric {metric_name} to MLflow: {e}") + self.logger.debug(f"Failed to log final metric {metric_name} to MetricLogger: {e}") # Log lower_bound if available if lower_bound is not None and isinstance(lower_bound, (int, float)): try: - self.mlflow_manager.log_metric(mlflow_run_id, f"{metric_name}_lower_bound", float(lower_bound), step=step) + self.metric_manager.log_metric(metric_run_id, f"{metric_name}_lower_bound", float(lower_bound), step=step) except Exception as e: - self.logger.debug(f"Failed to log final metric {metric_name}_lower_bound to MLflow: {e}") + self.logger.debug(f"Failed to log final metric {metric_name}_lower_bound to MetricLogger: {e}") # Log upper_bound if available if upper_bound is not None and isinstance(upper_bound, (int, float)): try: - self.mlflow_manager.log_metric(mlflow_run_id, f"{metric_name}_upper_bound", float(upper_bound), step=step) + self.metric_manager.log_metric(metric_run_id, f"{metric_name}_upper_bound", float(upper_bound), step=step) except Exception as e: - self.logger.debug(f"Failed to log final metric {metric_name}_upper_bound to MLflow: {e}") + self.logger.debug(f"Failed to log final metric {metric_name}_upper_bound to MetricLogger: {e}") try: - self.mlflow_manager.end_run(mlflow_run_id) + self.metric_manager.end_run(metric_run_id) except Exception as e: - self.logger.debug(f"Failed to end MLflow run {mlflow_run_id}: {e}") + self.logger.debug(f"Failed to end MetricLogger run {metric_run_id}: {e}") except Exception as e: - self.logger.debug(f"Failed to log final metrics to MLflow for pipeline {pipeline_id}: {e}") + self.logger.debug(f"Failed to log final metrics to MetricLogger for pipeline {pipeline_id}: {e}") self.logger.info(f"Pipeline {pipeline_id} ({pipeline_name}) completed successfully") @@ -812,7 +813,7 @@ def run_multi_pipeline_inference( # Set up aggregators for each pipeline pipeline_aggregators = {} pipeline_results = {} # {pipeline_id: {"results": {}, "metrics": {}}} - total_dataset_size = len(dataset) # Store for MLflow percentage calculation + total_dataset_size = len(dataset) # Store for MetricLogger percentage calculation for pipeline_id in pipeline_ids: aggregator = Aggregator() @@ -1082,58 +1083,58 @@ def run_multi_pipeline_inference( throughput = samples_processed / elapsed_time display_metrics["Throughput"] = {"value": throughput} - if self.mlflow_manager: + pipeline = db.get_pipeline(pipeline_id) + metric_run_id = pipeline.get("metric_run_id") if pipeline else None + + if self.metric_manager and metric_run_id: try: - pipeline = db.get_pipeline(pipeline_id) - mlflow_run_id = pipeline.get("mlflow_run_id") if pipeline else None - if mlflow_run_id: - actual_samples_processed = pipeline.get("total_samples_processed", samples_processed) - percentage_processed = (actual_samples_processed / total_dataset_size * 100) if total_dataset_size > 0 else 0 - step = int(percentage_processed) + actual_samples_processed = pipeline.get("total_samples_processed", samples_processed) + percentage_processed = (actual_samples_processed / total_dataset_size * 100) if total_dataset_size > 0 else 0 + step = int(percentage_processed) + + for metric_name, metric_data in metrics_with_ci.items(): + if metric_name in ["run_id", "model_name", "Samples Processed", "Processing Time", "Samples Per Second"]: + continue + + if isinstance(metric_data, dict): + metric_value = metric_data.get("value", 0) + lower_bound = metric_data.get("lower_bound") + upper_bound = metric_data.get("upper_bound") + else: + metric_value = metric_data + lower_bound = None + upper_bound = None + + # Log main metric value + if isinstance(metric_value, (int, float)): + try: + self.metric_manager.log_metric(metric_run_id, metric_name, float(metric_value), step=step) + except Exception as e: + self.logger.debug(f"Failed to log metric {metric_name} to MetricLogger: {e}") - for metric_name, metric_data in metrics_with_ci.items(): - if metric_name in ["run_id", "model_name", "Samples Processed", "Processing Time", "Samples Per Second"]: - continue - - if isinstance(metric_data, dict): - metric_value = metric_data.get("value", 0) - lower_bound = metric_data.get("lower_bound") - upper_bound = metric_data.get("upper_bound") - else: - metric_value = metric_data - lower_bound = None - upper_bound = None - - # Log main metric value - if isinstance(metric_value, (int, float)): - try: - self.mlflow_manager.log_metric(mlflow_run_id, metric_name, float(metric_value), step=step) - except Exception as e: - self.logger.debug(f"Failed to log metric {metric_name} to MLflow: {e}") - - # Log lower_bound if available - if lower_bound is not None and isinstance(lower_bound, (int, float)): - try: - self.mlflow_manager.log_metric(mlflow_run_id, f"{metric_name}_lower_bound", float(lower_bound), step=step) - except Exception as e: - self.logger.debug(f"Failed to log metric {metric_name}_lower_bound to MLflow: {e}") - - # Log upper_bound if available - if upper_bound is not None and isinstance(upper_bound, (int, float)): - try: - self.mlflow_manager.log_metric(mlflow_run_id, f"{metric_name}_upper_bound", float(upper_bound), step=step) - except Exception as e: - self.logger.debug(f"Failed to log metric {metric_name}_upper_bound to MLflow: {e}") + # Log lower_bound if available + if lower_bound is not None and isinstance(lower_bound, (int, float)): + try: + self.metric_manager.log_metric(metric_run_id, f"{metric_name}_lower_bound", float(lower_bound), step=step) + except Exception as e: + self.logger.debug(f"Failed to log metric {metric_name}_lower_bound to MetricLogger: {e}") - if "Throughput" in display_metrics: - throughput_value = display_metrics["Throughput"]["value"] - if isinstance(throughput_value, (int, float)): - try: - self.mlflow_manager.log_metric(mlflow_run_id, "Throughput", float(throughput_value), step=step) - except Exception as e: - self.logger.debug(f"Failed to log Throughput to MLflow: {e}") + # Log upper_bound if available + if upper_bound is not None and isinstance(upper_bound, (int, float)): + try: + self.metric_manager.log_metric(metric_run_id, f"{metric_name}_upper_bound", float(upper_bound), step=step) + except Exception as e: + self.logger.debug(f"Failed to log metric {metric_name}_upper_bound to MetricLogger: {e}") + + if "Throughput" in display_metrics: + throughput_value = display_metrics["Throughput"]["value"] + if isinstance(throughput_value, (int, float)): + try: + self.metric_manager.log_metric(metric_run_id, "Throughput", float(throughput_value), step=step) + except Exception as e: + self.logger.debug(f"Failed to log Throughput to MetricLogger: {e}") except Exception as e: - self.logger.debug(f"Failed to log metrics to MLflow: {e}") + self.logger.debug(f"Failed to log metrics to MetricLogger: {e}") except Exception as e: self.logger.debug(f"Could not compute live metrics: {e}") diff --git a/rapidfireai/evals/utils/experiment_utils.py b/rapidfireai/evals/utils/experiment_utils.py index bc301e9f..5b33c9c8 100644 --- a/rapidfireai/evals/utils/experiment_utils.py +++ b/rapidfireai/evals/utils/experiment_utils.py @@ -22,6 +22,7 @@ def _disable_ml_warnings_display(self) -> None: """Disable warnings""" # Suppress warnings warnings.filterwarnings("ignore", message=".*torch.cuda.amp.autocast.*") + warnings.filterwarnings("ignore", message=".*torch.amp.autocast.*") warnings.filterwarnings("ignore", message=".*generation flags are not valid.*") warnings.filterwarnings("ignore", message=".*decoder-only architecture.*") warnings.filterwarnings("ignore", message=".*attention mask is not set.*") diff --git a/rapidfireai/evals/utils/mlflow_manager.py b/rapidfireai/evals/utils/mlflow_manager.py deleted file mode 100644 index 49ea63c0..00000000 --- a/rapidfireai/evals/utils/mlflow_manager.py +++ /dev/null @@ -1,120 +0,0 @@ -"""This module contains the MLflowManager class which is responsible for managing the MLflow runs.""" - -import mlflow -from mlflow.tracking import MlflowClient - - -class MLflowManager: - def __init__(self, tracking_uri: str): - """ - Initialize MLflow Manager with tracking URI. - - Args: - tracking_uri: MLflow tracking server URI - """ - self.client = MlflowClient(tracking_uri=tracking_uri) - self.experiment_id = None - - def create_experiment(self, experiment_name: str) -> str: - """Create a new experiment and set it as active.""" - self.experiment_id = self.client.create_experiment(experiment_name) - # IMPORTANT: Set this as the active experiment in MLflow context - mlflow.set_experiment(experiment_name) - return self.experiment_id - - def get_experiment(self, experiment_name: str) -> str: - """Get existing experiment by name and set it as active.""" - experiment = self.client.get_experiment_by_name(experiment_name) - if experiment is None: - raise ValueError(f"Experiment '{experiment_name}' not found") - self.experiment_id = experiment.experiment_id - return self.experiment_id - - def create_run(self, run_name: str) -> str: - """Create a new run and return mlflow_run_id.""" - if self.experiment_id is None: - raise ValueError("No experiment set. Call create_experiment() or get_experiment() first.") - run = self.client.create_run(self.experiment_id, run_name=run_name) - return run.info.run_id - - def log_param(self, mlflow_run_id: str, key: str, value: str) -> None: - """Log parameters to a specific run.""" - self.client.log_param(mlflow_run_id, key, value) - - def log_metric(self, mlflow_run_id: str, key: str, value: float, step: int = None) -> None: - """Log a metric to a specific run.""" - self.client.log_metric(mlflow_run_id, key, value, step=step) - - def get_run_metrics(self, mlflow_run_id: str) -> dict[str, list[tuple[int, float]]]: - """ - Get all metrics for a specific run. - """ - try: - run = self.client.get_run(mlflow_run_id) - if run is None: - return {} - - run_data = run.data - metric_dict = {} - for metric_key in run_data.metrics.keys(): - try: - metric_history = self.client.get_metric_history(mlflow_run_id, metric_key) - metric_dict[metric_key] = [(metric.step, metric.value) for metric in metric_history] - except Exception as e: - print(f"Error getting metric history for {metric_key}: {e}") - continue - return metric_dict - except Exception as e: - print(f"Error getting metrics for run {mlflow_run_id}: {e}") - return {} - - def end_run(self, mlflow_run_id: str) -> None: - """End a specific run.""" - # Check if run exists before terminating - run = self.client.get_run(mlflow_run_id) - if run is not None: - # First terminate the run on the server - self.client.set_terminated(mlflow_run_id) - - # Then clear the local MLflow context if this is the active run - try: - current_run = mlflow.active_run() - # Make sure we end the run on the correct worker - if current_run and current_run.info.run_id == mlflow_run_id: - mlflow.end_run() - else: - print(f"Run {mlflow_run_id} is not the active run, no local context to clear") - except Exception as e: - print(f"Error clearing local MLflow context: {e}") - else: - print(f"MLflow run {mlflow_run_id} not found, cannot terminate") - - def delete_run(self, mlflow_run_id: str) -> None: - """Delete a specific run.""" - # Check if run exists before deleting - run = self.client.get_run(mlflow_run_id) - if run is not None: - self.client.delete_run(mlflow_run_id) - else: - raise ValueError(f"Run '{mlflow_run_id}' not found") - - def clear_context(self) -> None: - """Clear the MLflow context by ending any active run.""" - try: - current_run = mlflow.active_run() - if current_run: - run_id = current_run.info.run_id - - # Try to end the run properly using the client first - try: - self.client.end_run(run_id) - except Exception: - # Fallback to global mlflow.end_run() - mlflow.end_run() - print(f"Run {run_id} ended using global mlflow.end_run") - - print("MLflow context cleared successfully") - else: - print("No active MLflow run to clear") - except Exception as e: - print(f"Error clearing MLflow context: {e}") diff --git a/rapidfireai/experiment.py b/rapidfireai/experiment.py index 3fee1ca0..1c0e1ca0 100644 --- a/rapidfireai/experiment.py +++ b/rapidfireai/experiment.py @@ -12,8 +12,15 @@ from pathlib import Path import pandas as pd -from rapidfireai.utils.constants import ColabConfig, RayConfig, RF_EXPERIMENT_PATH, RF_LOG_FILENAME, RF_TRAINING_LOG_FILENAME, RF_LOG_PATH -from rapidfireai.utils.ping import ping_server +from rapidfireai.utils.constants import ( + ColabConfig, + RayConfig, + RF_EXPERIMENT_PATH, + RF_LOG_FILENAME, + RF_TRAINING_LOG_FILENAME, + RF_LOG_PATH, + RF_MLFLOW_ENABLED +) class Experiment: @@ -118,11 +125,11 @@ def _init_evals_mode(self) -> None: from rapidfireai.evals.dispatcher import start_dispatcher_thread from rapidfireai.evals.scheduling.controller import Controller from rapidfireai.utils.colab import get_colab_auth_token - from rapidfireai.utils.constants import DispatcherConfig, MLFlowConfig + from rapidfireai.utils.constants import DispatcherConfig from rapidfireai.evals.utils.constants import get_dispatcher_url from rapidfireai.evals.utils.experiment_utils import ExperimentUtils from rapidfireai.evals.utils.logger import RFLogger - from rapidfireai.evals.utils.mlflow_manager import MLflowManager + from rapidfireai.utils.metric_rfmetric_manager import RFMetricLogger from rapidfireai.evals.utils.notebook_ui import NotebookUI # Store ray reference for later use @@ -190,26 +197,27 @@ def _init_evals_mode(self) -> None: # Create database reference self.db = RFDatabase() - # Initialize MLflow (optional, gracefully disabled if server not available) - if ping_server(MLFlowConfig.HOST, MLFlowConfig.PORT, 2): - # Server is reachable, proceed with MLflow - self.mlflow_manager = MLflowManager(MLFlowConfig.URL) - mlflow_experiment_id = self.mlflow_manager.create_experiment(self.experiment_name) + try: + metric_loggers = RFMetricLogger.get_default_metric_loggers(experiment_name=self.experiment_name) + self.metric_loggers = RFMetricLogger(metric_loggers, logger=self.logger) + metric_experiment_id = self.metric_loggers.create_experiment(self.experiment_name) self.db.db.execute( - "UPDATE experiments SET mlflow_experiment_id = ? WHERE experiment_id = ?", - (mlflow_experiment_id, self.experiment_id), + "UPDATE experiments SET metric_experiment_id = ? WHERE experiment_id = ?", + (metric_experiment_id, self.experiment_id), ) self.db.db.conn.commit() - self.logger.info(f"Initialized MLflow experiment: {mlflow_experiment_id}") - else: - self.logger.info(f"MLflow server not available at {MLFlowConfig.URL}. MLflow logging will be disabled.") - self.mlflow_manager = None + self.logger.info(f"Initialized MetricLogger experiment: {metric_experiment_id}") + except Exception as e: + self.logger.warning(f"Failed to initialize MetricLogger: {e}. MetricLogger logging will be disabled.") + self.metric_loggers = None + + # Initialize the controller self.controller = Controller( experiment_name=self.experiment_name, experiment_path=self.experiment_path, - mlflow_manager=self.mlflow_manager if hasattr(self, "mlflow_manager") else None, + metric_manager=self.metric_loggers, ) # Start dispatcher in background thread for interactive control @@ -415,7 +423,7 @@ def run_evals( def get_results(self) -> pd.DataFrame: """ - Get the MLflow training metrics for all runs in the experiment. + Get the training metrics for all runs in the experiment. Returns: DataFrame with training metrics @@ -426,37 +434,40 @@ def get_results(self) -> pd.DataFrame: if self.mode != "fit": raise ValueError("get_results() is only available in 'fit' mode") - from rapidfireai.utils.constants import MLFlowConfig - ExperimentException = self._ExperimentException try: runs_info_df = self.experiment_utils.get_runs_info() - # Check if there are any mlflow_run_ids before importing MLflow - has_mlflow_runs = ( - runs_info_df.get("mlflow_run_id") is not None and runs_info_df["mlflow_run_id"].notna().any() + # Check if there are any metric_run_ids before importing metrics + has_metric_runs = ( + runs_info_df.get("metric_run_id") is not None and runs_info_df["metric_run_id"].notna().any() ) - if not has_mlflow_runs: - # No MLflow runs to fetch, return empty DataFrame + if not has_metric_runs or RF_MLFLOW_ENABLED != "true": + # No metric runs to fetch, return empty DataFrame return pd.DataFrame(columns=["run_id", "step"]) - # Lazy import - only import when we actually have MLflow runs to fetch - from rapidfireai.fit.utils.mlflow_manager import MLflowManager + # Lazy import - only import when we actually have metric runs to fetch + from rapidfireai.utils.metric_rfmetric_manager import RFMetricLogger + try: + metric_loggers = RFMetricLogger.get_default_metric_loggers(experiment_name=self.experiment_name) + self.metric_loggers = RFMetricLogger(metric_loggers, logger=self.logger) - mlflow_manager = MLflowManager(MLFlowConfig.URL) + except Exception as e: + self.logger.warning(f"Failed to initialize MetricLogger: {e}.") + return pd.DataFrame(columns=["run_id", "step"]) metrics_data = [] for _, run_row in runs_info_df.iterrows(): run_id = run_row["run_id"] - mlflow_run_id = run_row.get("mlflow_run_id") + metric_run_id = run_row.get("metric_run_id") - if not mlflow_run_id: + if not metric_run_id: continue - run_metrics = mlflow_manager.get_run_metrics(mlflow_run_id) + run_metrics = self.metric_loggers.get_run_metrics(metric_run_id) step_metrics = {} for metric_name, metric_values in run_metrics.items(): diff --git a/rapidfireai/fit/backend/controller.py b/rapidfireai/fit/backend/controller.py index f390ce9a..e9148907 100644 --- a/rapidfireai/fit/backend/controller.py +++ b/rapidfireai/fit/backend/controller.py @@ -13,14 +13,13 @@ from torch.utils.data import Dataset from rapidfireai.automl import AutoMLAlgorithm -from rapidfireai.utils.constants import MLFlowConfig from rapidfireai.utils.os_utils import mkdir_p +from rapidfireai.automl import AutoMLAlgorithm from rapidfireai.fit.backend.chunks import DatasetChunks from rapidfireai.fit.backend.scheduler import Scheduler from rapidfireai.fit.db.rf_db import RfDb from rapidfireai.automl import get_flattened_config_leaf, get_runs from rapidfireai.fit.utils.constants import ( - TENSORBOARD_LOG_DIR, ControllerTask, ExperimentTask, RunEndedBy, @@ -28,12 +27,11 @@ RunStatus, TaskStatus, WorkerTask, - get_tracking_backend, ) from rapidfireai.fit.utils.datapaths import DataPath from rapidfireai.fit.utils.exceptions import ControllerException, NoGPUsFoundException from rapidfireai.fit.utils.logging import RFLogger -from rapidfireai.fit.utils.metric_logger import create_metric_logger +from rapidfireai.utils.metric_rfmetric_manager import RFMetricLogger from rapidfireai.fit.utils.serialize import encode_payload from rapidfireai.fit.utils.shm_manager import SharedMemoryManager from rapidfireai.fit.utils.worker_manager import WorkerManager @@ -77,21 +75,14 @@ def __init__(self, experiment_id: int, experiment_name: str) -> None: # create worker manager self.worker_manager: WorkerManager = WorkerManager(self.num_workers, registry, process_lock) - # create metric logger - # Initialize DataPath temporarily to get experiment path for tensorboard logs - experiment_path = self.db.get_experiments_path(self.experiment_name) - DataPath.initialize(self.experiment_name, experiment_path) - tensorboard_log_dir = TENSORBOARD_LOG_DIR or str(DataPath.experiments_path / "tensorboard_logs") - - self.metric_logger = create_metric_logger( - backend=get_tracking_backend(), - mlflow_tracking_uri=MLFlowConfig.URL, - tensorboard_log_dir=tensorboard_log_dir, + default_metric_loggers = RFMetricLogger.get_default_metric_loggers(experiment_name=self.experiment_name) + self.metric_logger = RFMetricLogger( + default_metric_loggers, + logger=self.logger, ) - # Get experiment if using MLflow - if hasattr(self.metric_logger, "get_experiment"): - self.metric_logger.get_experiment(self.experiment_name) - + if self.metric_logger is None: + raise ControllerException("MetricLogger is not initialized. Please check the metric logger configuration.") + self.metric_logger.get_experiment(self.experiment_name) self.logger.debug("Controller initialized") def _create_models( @@ -150,28 +141,32 @@ def _create_models( raise ControllerException(f"Failed to create required Run DataPath directories: {e}") from e # create new tracking run + metric_run_id = None try: - # create new tracking run and get the mlflow_run_id - mlflow_run_id = self.metric_logger.create_run(str(run_id)) - + # create new tracking run and get the metric_run_id + metric_run_id = self.metric_logger.create_run(str(run_id)) # populate tracking backend with model config info for key, value in flattened_config.items(): - self.metric_logger.log_param(mlflow_run_id, key, value) + self.metric_logger.log_param(metric_run_id, key, value) if warm_started_from: - self.metric_logger.log_param(mlflow_run_id, "warm-start", str(warm_started_from)) + self.metric_logger.log_param(metric_run_id, "warm-start", str(warm_started_from)) if cloned_from: - self.metric_logger.log_param(mlflow_run_id, "parent-run", str(cloned_from)) - self.logger.debug(f"Populated MLFlow with model config info for run {run_id}.") + self.metric_logger.log_param(metric_run_id, "parent-run", str(cloned_from)) + self.logger.debug(f"Populated MetricLogger with model config info for run {run_id}.") self.db.set_run_details( run_id=run_id, - mlflow_run_id=mlflow_run_id, + metric_run_id=metric_run_id, flattened_config=flattened_config, ) except Exception as e: # Catch any metric logger exceptions (MLflow, TensorBoard, etc.) msg = f"Error creating new tracking run for run {run_id} - {e}." print(msg) - self.metric_logger.end_run(mlflow_run_id) + if metric_run_id: + try: + self.metric_logger.end_run(metric_run_id) + except Exception: + pass self.logger.error(msg, exc_info=True) total_runs = len(runs) @@ -232,9 +227,9 @@ def _process_interactive_control( # TODO: commented out to prevent clone of deleted runs issue (see Issue # 22) # self._clear_run_from_shm(run_id) - # delete run from MLFlow - mlflow_run_id = self.db.get_run(run_id)["mlflow_run_id"] - self.metric_logger.delete_run(mlflow_run_id) + # delete run from MetricLogger + metric_run_id = self.db.get_run(run_id)["metric_run_id"] + self.metric_logger.delete_run(metric_run_id) # mark run as deleted self.db.set_run_details( run_id=run_id, diff --git a/rapidfireai/fit/backend/worker.py b/rapidfireai/fit/backend/worker.py index c40f41d8..36efcb2e 100644 --- a/rapidfireai/fit/backend/worker.py +++ b/rapidfireai/fit/backend/worker.py @@ -16,7 +16,6 @@ import torch -from rapidfireai.utils.constants import MLFlowConfig from rapidfireai.fit.backend.chunks import DatasetChunks from rapidfireai.fit.db.rf_db import RfDb from rapidfireai.fit.ml.checkpoint_utils import ( @@ -26,18 +25,16 @@ ) from rapidfireai.fit.ml.trainer import create_trainer_instance from rapidfireai.fit.utils.constants import ( - TENSORBOARD_LOG_DIR, USE_SHARED_MEMORY, RunStatus, SHMObjectType, TaskStatus, WorkerTask, - get_tracking_backend, ) from rapidfireai.fit.utils.datapaths import DataPath from rapidfireai.fit.utils.exceptions import WorkerException from rapidfireai.fit.utils.logging import RFLogger, TrainingLogger -from rapidfireai.fit.utils.metric_logger import create_metric_logger +from rapidfireai.utils.metric_rfmetric_manager import RFMetricLogger from rapidfireai.fit.utils.serialize import decode_db_payload from rapidfireai.fit.utils.shm_manager import SharedMemoryManager from rapidfireai.fit.utils.trainer_config import TrainerConfig @@ -84,15 +81,11 @@ def __init__( DataPath.initialize(self.experiment_name, self.db.get_experiments_path(self.experiment_name)) # create metric logger - tensorboard_log_dir = TENSORBOARD_LOG_DIR or str(DataPath.experiments_path / "tensorboard_logs") - self.metric_logger = create_metric_logger( - backend=get_tracking_backend(), - mlflow_tracking_uri=MLFlowConfig.URL, - tensorboard_log_dir=tensorboard_log_dir, - ) - # Get experiment if using MLflow - if hasattr(self.metric_logger, "get_experiment"): - self.metric_logger.get_experiment(self.experiment_name) + default_metric_loggers = RFMetricLogger.get_default_metric_loggers(experiment_name=self.experiment_name) + self.metric_logger = RFMetricLogger(default_metric_loggers, logger=self.logger) + if self.metric_logger is None: + raise WorkerException("MetricLogger is not initialized. Please check the metric logger configuration.") + self.metric_logger.get_experiment(self.experiment_name) # load datasets self.train_dataset, self.eval_dataset, self.num_chunks = self.load_datasets() @@ -122,7 +115,8 @@ def run_fit( # get run details run_details = self.db.get_run(run_id) config_leaf = run_details["config_leaf"] - mlflow_run_id = run_details["mlflow_run_id"] + metric_run_id = run_details["metric_run_id"] + # set seed # torch.manual_seed(run_details["seed"]) @@ -144,7 +138,7 @@ def run_fit( trainer_config = TrainerConfig( worker_id=self.worker_id, run_id=run_id, - mlflow_run_id=mlflow_run_id, + metric_run_id=metric_run_id, config_leaf=config_leaf, total_steps=run_details["total_steps"], completed_steps=run_details["completed_steps"], @@ -166,7 +160,7 @@ def run_fit( stdout_buffer = StringIO() stderr_buffer = StringIO() with redirect_stdout(stdout_buffer), redirect_stderr(stderr_buffer): - trainer_instance, base_model_name = create_trainer_instance( + trainer_instance, _ = create_trainer_instance( trainer_config, self.shm_manager, USE_SHARED_MEMORY, self.metric_logger, chunk_id ) diff --git a/rapidfireai/fit/db/CLAUDE.md b/rapidfireai/fit/db/CLAUDE.md index 8b5a02af..f4551bb8 100644 --- a/rapidfireai/fit/db/CLAUDE.md +++ b/rapidfireai/fit/db/CLAUDE.md @@ -95,7 +95,7 @@ The db module provides the persistence layer for RapidFire using SQLite. It stor - `run_id` (PK): Unique run identifier - `experiment_id` (FK): Parent experiment - `run_name`: Generated name (e.g., "run_1") -- `mlflow_run_id`: MLflow tracking ID +- `metric_run_id`: Metric tracking ID - `status`: RunStatus enum (NEW, ONGOING, COMPLETED, FAILED, STOPPED, KILLED) - `source`: RunSource enum (USER, CLONE_MODIFY) - `ended_by`: RunEndedBy enum (COMPLETED, FAILED, KILLED, STOPPED) @@ -170,7 +170,7 @@ self.db.execute(query, (new_status, run_id), commit=True) run_id = db.create_run( experiment_id=1, run_name="run_1", - mlflow_run_id="abc123", + metric_run_id="abc123", config_leaf=encode_payload(config_dict), source=RunSource.USER, seed=42, diff --git a/rapidfireai/fit/db/rf_db.py b/rapidfireai/fit/db/rf_db.py index 30664eb3..b8cb308e 100644 --- a/rapidfireai/fit/db/rf_db.py +++ b/rapidfireai/fit/db/rf_db.py @@ -46,7 +46,23 @@ def create_tables(self): with open(tables_file, encoding="utf-8") as f: sql_content = f.read() _ = self.db.conn.executescript(sql_content) - + else: + try: + cursor = self.db.conn.execute("PRAGMA table_info(runs)") + columns = [column[1] for column in cursor.fetchall()] + if "metric_run_id" not in columns: + self.db.conn.execute("ALTER TABLE runs ADD COLUMN metric_run_id TEXT") + self.db.conn.commit() + except sqlite3.Error: + pass + try: + cursor = self.db.conn.execute("PRAGMA table_info(experiments)") + columns = [column[1] for column in cursor.fetchall()] + if "metric_experiment_id" not in columns: + self.db.conn.execute("ALTER TABLE experiments ADD COLUMN metric_experiment_id TEXT") + self.db.conn.commit() + except sqlite3.Error: + pass except FileNotFoundError as e: raise DBException(f"tables.sql file not found at {tables_file}") from e except sqlite3.Error as e: @@ -113,12 +129,12 @@ def reset_experiment_states(self) -> None: def create_experiment( self, experiment_name: str, - mlflow_experiment_id: str | None, + metric_experiment_id: str | None, config_options: dict[str, Any], ) -> int: """Create a new experiment""" query = """ - INSERT INTO experiments (experiment_name, mlflow_experiment_id, config_options, + INSERT INTO experiments (experiment_name, metric_experiment_id, config_options, status, current_task, error) VALUES (?, ?, ?, ?, ?, ?) RETURNING experiment_id @@ -128,7 +144,7 @@ def create_experiment( query, ( experiment_name, - mlflow_experiment_id, + metric_experiment_id, encode_payload(config_options), ExperimentStatus.RUNNING.value, ExperimentTask.IDLE.value, @@ -148,7 +164,7 @@ def create_experiment( def get_running_experiment(self) -> dict[str, Any]: """Get an experiment's details by its ID""" query = """ - SELECT experiment_id, experiment_name, status, error, mlflow_experiment_id, config_options + SELECT experiment_id, experiment_name, status, error, metric_experiment_id, config_options FROM experiments WHERE status = ? ORDER BY experiment_id DESC @@ -163,7 +179,7 @@ def get_running_experiment(self) -> dict[str, Any]: "experiment_name": experiment_details[1], "status": ExperimentStatus(experiment_details[2]), "error": experiment_details[3], - "mlflow_experiment_id": experiment_details[4], + "metric_experiment_id": experiment_details[4], "config_options": decode_db_payload(experiment_details[5]), } return experiment_details @@ -271,7 +287,7 @@ def create_run( self, config_leaf: dict[str, Any], status: RunStatus, - mlflow_run_id: str | None = None, + metric_run_id: str | None = None, flattened_config: dict[str, Any] | None = None, completed_steps: int = 0, total_steps: int = 0, @@ -286,7 +302,7 @@ def create_run( ) -> int: """Create a new run""" query = """ - INSERT INTO runs (status, mlflow_run_id, flattened_config, config_leaf, + INSERT INTO runs (status, metric_run_id, flattened_config, config_leaf, completed_steps, total_steps, num_chunks_visited_curr_epoch, num_epochs_completed, chunk_offset, error, source, ended_by, warm_started_from, cloned_from) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) @@ -295,7 +311,7 @@ def create_run( query, ( status.value, - mlflow_run_id, + metric_run_id, json.dumps(flattened_config) if flattened_config else "{}", encode_payload(config_leaf) if config_leaf else "{}", completed_steps, @@ -320,7 +336,7 @@ def set_run_details( self, run_id: int, status: RunStatus | None = None, - mlflow_run_id: str | None = None, + metric_run_id: str | None = None, flattened_config: dict[str, Any] | None = None, config_leaf: dict[str, Any] | None = None, completed_steps: int | None = None, @@ -338,7 +354,7 @@ def set_run_details( # Initialize a dictionary to hold the column-value pairs columns = { "status": status.value if status else None, - "mlflow_run_id": mlflow_run_id, + "metric_run_id": metric_run_id, "flattened_config": json.dumps(flattened_config) if flattened_config else None, "config_leaf": encode_payload(config_leaf) if config_leaf else None, "completed_steps": completed_steps, @@ -376,7 +392,7 @@ def set_run_details( def get_run(self, run_id: int) -> dict[str, Any]: """Get a run's details""" query = """ - SELECT status, mlflow_run_id, flattened_config, config_leaf, completed_steps, total_steps, + SELECT status, metric_run_id, flattened_config, config_leaf, completed_steps, total_steps, num_chunks_visited_curr_epoch, num_epochs_completed, chunk_offset, error, source, ended_by, warm_started_from, cloned_from FROM runs @@ -388,7 +404,7 @@ def get_run(self, run_id: int) -> dict[str, Any]: run_details = run_details[0] formatted_details = { "status": RunStatus(run_details[0]), - "mlflow_run_id": run_details[1], + "metric_run_id": run_details[1], "flattened_config": json.loads(run_details[2]), "config_leaf": decode_db_payload(run_details[3]) if run_details[3] and run_details[3] != "{}" else {}, "completed_steps": run_details[4], @@ -413,7 +429,7 @@ def get_runs_by_status(self, statuses: list[RunStatus]) -> dict[int, dict[str, A # Create placeholders for SQL IN clause placeholders = ",".join(["?"] * len(statuses)) query = f""" - SELECT run_id, status, mlflow_run_id, flattened_config, config_leaf, completed_steps, total_steps, + SELECT run_id, status, metric_run_id, flattened_config, config_leaf, completed_steps, total_steps, num_chunks_visited_curr_epoch, num_epochs_completed, chunk_offset, error, source, ended_by, warm_started_from, cloned_from FROM runs @@ -427,7 +443,7 @@ def get_runs_by_status(self, statuses: list[RunStatus]) -> dict[int, dict[str, A for run in run_details: formatted_details[run[0]] = { "status": RunStatus(run[1]), - "mlflow_run_id": run[2], + "metric_run_id": run[2], "flattened_config": json.loads(run[3]), "config_leaf": decode_db_payload(run[4]) if run[4] and run[4] != "{}" else {}, "completed_steps": run[5], @@ -446,7 +462,7 @@ def get_runs_by_status(self, statuses: list[RunStatus]) -> dict[int, dict[str, A def get_all_runs(self) -> dict[int, dict[str, Any]]: """Get all runs for UI display (ignore all complex fields)""" query = """ - SELECT run_id, status, mlflow_run_id, flattened_config, config_leaf, completed_steps, total_steps, + SELECT run_id, status, metric_run_id, flattened_config, config_leaf, completed_steps, total_steps, num_chunks_visited_curr_epoch, num_epochs_completed, chunk_offset, error, source, ended_by, warm_started_from, cloned_from FROM runs @@ -458,7 +474,7 @@ def get_all_runs(self) -> dict[int, dict[str, Any]]: for run in run_details: formatted_details[run[0]] = { "status": RunStatus(run[1]), - "mlflow_run_id": run[2], + "metric_run_id": run[2], "flattened_config": json.loads(run[3]), "config_leaf": decode_db_payload(run[4]) if run[4] and run[4] != "{}" else {}, "completed_steps": run[5], diff --git a/rapidfireai/fit/db/tables.sql b/rapidfireai/fit/db/tables.sql index 7ab8e41f..ad21379d 100644 --- a/rapidfireai/fit/db/tables.sql +++ b/rapidfireai/fit/db/tables.sql @@ -2,7 +2,7 @@ CREATE TABLE IF NOT EXISTS experiments ( experiment_id INTEGER PRIMARY KEY AUTOINCREMENT, experiment_name TEXT NOT NULL, - mlflow_experiment_id TEXT, + metric_experiment_id TEXT, config_options TEXT NOT NULL, status TEXT NOT NULL, current_task TEXT NOT NULL, @@ -13,7 +13,7 @@ CREATE TABLE IF NOT EXISTS experiments ( CREATE TABLE IF NOT EXISTS runs ( run_id INTEGER PRIMARY KEY AUTOINCREMENT, status TEXT NOT NULL, - mlflow_run_id TEXT, + metric_run_id TEXT, flattened_config TEXT DEFAULT '{}', config_leaf TEXT DEFAULT '{}', completed_steps INTEGER DEFAULT 0, diff --git a/rapidfireai/fit/dispatcher/dispatcher.py b/rapidfireai/fit/dispatcher/dispatcher.py index ba1a29cc..87989adb 100644 --- a/rapidfireai/fit/dispatcher/dispatcher.py +++ b/rapidfireai/fit/dispatcher/dispatcher.py @@ -121,7 +121,7 @@ def get_all_runs(self) -> tuple[Response, int]: { "run_id": run_id, "status": result["status"].value, - "mlflow_run_id": result["mlflow_run_id"], + "metric_run_id": result["metric_run_id"], "config": result["config_leaf"], "flattened_config": result["flattened_config"], "completed_steps": result["completed_steps"], @@ -161,7 +161,7 @@ def get_run(self) -> tuple[Response, int]: safe_result = { "run_id": data["run_id"], "status": result["status"].value, - "mlflow_run_id": result["mlflow_run_id"], + "metric_run_id": result["metric_run_id"], "config": result["config_leaf"], "flattened_config": result["flattened_config"], "completed_steps": result["completed_steps"], diff --git a/rapidfireai/fit/ml/CLAUDE.md b/rapidfireai/fit/ml/CLAUDE.md index d607baec..f61c7d92 100644 --- a/rapidfireai/fit/ml/CLAUDE.md +++ b/rapidfireai/fit/ml/CLAUDE.md @@ -129,8 +129,8 @@ checkpoint = { - Batches generation for efficiency - Supports custom generation configs (temperature, top_p, max_tokens) -**MLflowLoggingCallback**: -- Logs training metrics (loss, learning rate, grad norm) to MLflow +**MetricLoggingCallback**: +- Logs training metrics (loss, learning rate, grad norm) to Metric Logger - Handles step offset for resumed runs (continued step numbering) - Filters out None values and non-numeric metrics - Logs at appropriate intervals based on `logging_steps` @@ -144,7 +144,7 @@ checkpoint = { **Usage Pattern**: ```python callbacks = [ - MLflowLoggingCallback(mlflow_manager, mlflow_run_id, completed_steps), + MetricLoggingCallback(metric_manager, metric_run_id, completed_steps), GenerationMetricsCallback(tokenizer, eval_dataset, generation_config, compute_metrics), LogLevelCallback(), ] diff --git a/rapidfireai/fit/ml/callbacks.py b/rapidfireai/fit/ml/callbacks.py index 836af9c4..55feb254 100644 --- a/rapidfireai/fit/ml/callbacks.py +++ b/rapidfireai/fit/ml/callbacks.py @@ -16,7 +16,7 @@ def __init__( compute_metrics: Callable = None, batch_size: int = 8, metric_logger=None, - mlflow_run_id: str = None, + metric_run_id: str = None, completed_steps: int = 0, ): self.tokenizer = tokenizer @@ -32,7 +32,7 @@ def __init__( "eos_token_id": tokenizer.eos_token_id, } self.metric_logger = metric_logger - self.mlflow_run_id = mlflow_run_id + self.metric_run_id = metric_run_id self.completed_steps = completed_steps def on_evaluate( @@ -58,12 +58,13 @@ def on_evaluate( state.log_history.append(metrics) for key, value in metrics.items(): + step = self.completed_steps + state.global_step if self.metric_logger: self.metric_logger.log_metric( - self.mlflow_run_id, + self.metric_run_id, key, value, - step=self.completed_steps + state.global_step, + step=step, ) def _prepare_data(self, eval_dataset: Dataset) -> tuple: @@ -158,7 +159,7 @@ def _compute_generation_metrics(self, model, step: int) -> dict[str, float]: with torch.no_grad(): for i in tqdm(range(0, len(indices), self.batch_size), desc="Generating for metrics"): input_ids_batch = input_ids[i : i + self.batch_size] - with torch.inference_mode(), torch.cuda.amp.autocast(): + with torch.inference_mode(), torch.amp.autocast("cuda"): outputs_batch = model.generate(input_ids_batch, **self.generation_config) generated_texts = self.tokenizer.batch_decode( outputs_batch[:, input_ids_batch.shape[1] :], @@ -183,20 +184,20 @@ def _compute_generation_metrics(self, model, step: int) -> dict[str, float]: return metrics -class MLflowLoggingCallback(TrainerCallback): +class MetricLoggingCallback(TrainerCallback): """Callback for logging metrics to tracking backend during training""" def __init__( self, metric_logger, - mlflow_run_id: str, + metric_run_id: str, excluded_keys: list = None, completed_steps: int = 0, chunk_id: int = 0, num_epochs_completed: int = 0, ): self.metric_logger = metric_logger - self.mlflow_run_id = mlflow_run_id + self.metric_run_id = metric_run_id self.completed_steps = completed_steps self.excluded_keys = excluded_keys or [ "step", @@ -215,30 +216,33 @@ def on_log( ): """Called when the trainer logs metrics""" if logs is not None: + step = self.completed_steps + state.global_step for key, value in logs.items(): if isinstance(value, (int, float)) and key not in self.excluded_keys: try: - self.metric_logger.log_metric( - self.mlflow_run_id, - key, - value, - step=self.completed_steps + state.global_step, - ) + if self.metric_logger: + self.metric_logger.log_metric( + self.metric_run_id, + key, + value, + step=step, + ) except Exception as e: print(f"Warning: Failed to log metric {key} to tracking backend: {e}") if "eval_loss" not in logs and "train_runtime" not in logs: - self.metric_logger.log_metric( - self.mlflow_run_id, - "chunk number", - self.chunk_id, - step=self.completed_steps + state.global_step, - ) - self.metric_logger.log_metric( - self.mlflow_run_id, - "num_epochs_completed", - self.num_epochs_completed, - step=self.completed_steps + state.global_step, - ) + if self.metric_logger: + self.metric_logger.log_metric( + self.metric_run_id, + "chunk number", + self.chunk_id, + step=step, + ) + self.metric_logger.log_metric( + self.metric_run_id, + "num_epochs_completed", + self.num_epochs_completed, + step=step, + ) class LogLevelCallback(TrainerCallback): diff --git a/rapidfireai/fit/ml/trainer.py b/rapidfireai/fit/ml/trainer.py index 3511f35f..e74cf5c7 100644 --- a/rapidfireai/fit/ml/trainer.py +++ b/rapidfireai/fit/ml/trainer.py @@ -7,7 +7,7 @@ from trl import DPOConfig, DPOTrainer, GRPOConfig, GRPOTrainer, SFTConfig, SFTTrainer from rapidfireai.utils.constants import RF_TRAINER_OUTPUT -from rapidfireai.fit.ml.callbacks import GenerationMetricsCallback, LogLevelCallback, MLflowLoggingCallback +from rapidfireai.fit.ml.callbacks import GenerationMetricsCallback, LogLevelCallback, MetricLoggingCallback from rapidfireai.fit.ml.checkpoint_utils import ( ensure_gradient_compatibility, load_checkpoint_from_disk, @@ -301,15 +301,15 @@ def _setup_callbacks( """Setup callbacks for the trainer.""" callbacks = [] - if metric_logger is not None and trainer_config.mlflow_run_id is not None: - mlflow_callback = MLflowLoggingCallback( + if metric_logger is not None and trainer_config.metric_run_id is not None: + metric_callback = MetricLoggingCallback( metric_logger=metric_logger, - mlflow_run_id=trainer_config.mlflow_run_id, + metric_run_id=trainer_config.metric_run_id, completed_steps=trainer_config.completed_steps, chunk_id=chunk_id, num_epochs_completed=trainer_config.num_epochs_completed, ) - callbacks.append(mlflow_callback) + callbacks.append(metric_callback) if compute_metrics is not None and additional_trainer_kwargs.get("generation_config") is not None: compute_metrics_function = compute_metrics @@ -325,7 +325,7 @@ def _setup_callbacks( compute_metrics=compute_metrics_function, batch_size=training_args.get("per_device_eval_batch_size"), metric_logger=metric_logger, - mlflow_run_id=trainer_config.mlflow_run_id, + metric_run_id=trainer_config.metric_run_id, completed_steps=trainer_config.completed_steps, ) callbacks.append(generation_callback) diff --git a/rapidfireai/fit/utils/CLAUDE.md b/rapidfireai/fit/utils/CLAUDE.md index 3c28a49f..20309789 100644 --- a/rapidfireai/fit/utils/CLAUDE.md +++ b/rapidfireai/fit/utils/CLAUDE.md @@ -127,18 +127,18 @@ logger.error("Failed to schedule", run_id=5) - `{log_dir}/worker_0.log` - `{log_dir}/dispatcher.log` -### mlflow_manager.py -**Purpose**: Wrapper around MLflow tracking API +### metric_manager.py +**Purpose**: Wrapper around metrics tracking API **Key Responsibilities**: -- Creates and retrieves MLflow experiments +- Creates and retrieves metric experiments - Logs metrics, parameters, and artifacts -- Creates MLflow runs -- Handles MLflow server communication +- Creates metric runs +- Handles metric server communication **Key Methods**: -- `get_experiment(name)`: Get or create MLflow experiment -- `create_run(experiment_id)`: Create MLflow run, return run_id +- `get_experiment(name)`: Get or create metric experiment +- `create_run(experiment_id)`: Create metric run, return run_id - `log_metric(run_id, key, value, step)`: Log metric - `log_param(run_id, key, value)`: Log parameter - `log_artifact(run_id, artifact_path)`: Log artifact file @@ -146,9 +146,9 @@ logger.error("Failed to schedule", run_id=5) **Usage**: ```python -from rapidfireai.fit.utils.mlflow_manager import MLflowManager +from rapidfireai.utils.metric_mlflow_manager import MLflowMetricLogger -mlflow = MLflowManager("http://localhost:8852") +mlflow = MLflowMetricLogger("http://localhost:8852") experiment = mlflow.get_experiment("my_experiment") run_id = mlflow.create_run(experiment.experiment_id) mlflow.log_metric(run_id, "loss", 0.5, step=100) @@ -360,7 +360,7 @@ else: - `experiment_name`: Experiment name - `chunk_id`: Current chunk being trained - `epoch`: Current epoch -- `mlflow_run_id`: MLflow run ID +- `metric_run_id`: Metric logger run ID **Usage**: ```python @@ -373,7 +373,7 @@ config = TrainerConfig( experiment_name="my_exp", chunk_id=2, epoch=0, - mlflow_run_id="abc123" + metric_run_id="abc123" ) trainer = create_trainer_instance(config, shm_manager) diff --git a/rapidfireai/fit/utils/constants.py b/rapidfireai/fit/utils/constants.py index d3dbcfc3..71beae02 100644 --- a/rapidfireai/fit/utils/constants.py +++ b/rapidfireai/fit/utils/constants.py @@ -1,25 +1,8 @@ import os from enum import Enum -from rapidfireai.utils.constants import RF_TRACKING_BACKEND, RF_TENSORBOARD_LOG_DIR, RF_TRAINING_LOG_FILENAME, RF_DB_PATH - -# Tracking Backend Configuration -def get_tracking_backend() -> str: - """ - Get the tracking backend from environment variable at runtime. - - Returns: - str: The tracking backend ('mlflow', 'tensorboard', or 'both') - - Note: This reads from os.environ at runtime to allow setting the env var - after module import (important for notebook environments like Colab). - """ - backend = RF_TRACKING_BACKEND - return backend - +from rapidfireai.utils.constants import RF_TRAINING_LOG_FILENAME, RF_DB_PATH # Backwards compatibility: Keep constant but it will be stale if env var changes after import -TRACKING_BACKEND = get_tracking_backend() # Options: 'mlflow', 'tensorboard', 'both' -TENSORBOARD_LOG_DIR = RF_TENSORBOARD_LOG_DIR # Default set by experiment path # Shared Memory Constants SHM_WARN_THRESHOLD = 80 diff --git a/rapidfireai/fit/utils/experiment_utils.py b/rapidfireai/fit/utils/experiment_utils.py index 1939b0e4..818c037b 100644 --- a/rapidfireai/fit/utils/experiment_utils.py +++ b/rapidfireai/fit/utils/experiment_utils.py @@ -14,9 +14,10 @@ from tqdm import tqdm from transformers import logging as transformers_logging -from rapidfireai.utils.constants import MLFlowConfig +from rapidfireai.utils.constants import MLFlowConfig, RF_MLFLOW_ENABLED +from rapidfireai.utils.metric_rfmetric_manager import RFMetricLogger from rapidfireai.fit.db.rf_db import RfDb -from rapidfireai.fit.utils.constants import ExperimentStatus, ExperimentTask, get_tracking_backend +from rapidfireai.fit.utils.constants import ExperimentStatus, ExperimentTask from rapidfireai.fit.utils.datapaths import DataPath from rapidfireai.fit.utils.exceptions import DBException, ExperimentException from rapidfireai.fit.utils.logging import RFLogger @@ -45,6 +46,8 @@ def _disable_ml_warnings_display(self) -> None: # Suppress the FutureWarning warnings.filterwarnings("ignore", message=".*torch.cuda.amp.autocast.*") + warnings.filterwarnings("ignore", message=".*torch.amp.autocast.*") + warnings.filterwarnings("ignore", message=".*fan_in_fan_out is set to False.*") warnings.filterwarnings("ignore", message=".*generation flags are not valid.*") warnings.filterwarnings("ignore", message=".*decoder-only architecture.*") warnings.filterwarnings("ignore", message=".*attention mask is not set.*") @@ -85,8 +88,7 @@ def create_experiment(self, given_name: str, experiments_path: str) -> tuple[int # Clear any existing MLflow context before starting new experiment # Only if using MLflow backend - tracking_backend = get_tracking_backend() - if tracking_backend in ["mlflow", "both"]: + if RF_MLFLOW_ENABLED=="true": import mlflow # Lazy import to avoid connection attempts in tensorboard-only mode try: @@ -127,15 +129,15 @@ def create_experiment(self, given_name: str, experiments_path: str) -> tuple[int ) else: self.end_experiment(internal=True) - experiment_id, experiment_name, mlflow_experiment_id = self._create_experiment_internal( + experiment_id, experiment_name, metric_experiment_id = self._create_experiment_internal( given_name, experiments_path, ) - if mlflow_experiment_id: + if metric_experiment_id: msg = ( f"The previously running experiment {running_experiment['experiment_name']} was forcibly ended." f" Created a new experiment '{experiment_name}' with Experiment ID: {experiment_id}" - f" and MLflow Experiment ID: {mlflow_experiment_id} at {experiments_path}/{experiment_name}" + f" and Metric Experiment ID: {metric_experiment_id} at {experiments_path}/{experiment_name}" ) else: msg = ( @@ -147,15 +149,15 @@ def create_experiment(self, given_name: str, experiments_path: str) -> tuple[int log_messages.append(msg) # check if experiment name already exists elif given_name in self.db.get_all_experiment_names(): - experiment_id, experiment_name, mlflow_experiment_id = self._create_experiment_internal( + experiment_id, experiment_name, metric_experiment_id = self._create_experiment_internal( given_name, experiments_path, ) - if mlflow_experiment_id: + if metric_experiment_id: msg = ( "An experiment with the same name already exists." f" Created a new experiment '{experiment_name}' with Experiment ID: {experiment_id}" - f" and MLflow Experiment ID: {mlflow_experiment_id} at {experiments_path}/{experiment_name}" + f" and Metric Experiment ID: {metric_experiment_id} at {experiments_path}/{experiment_name}" ) else: msg = ( @@ -166,14 +168,14 @@ def create_experiment(self, given_name: str, experiments_path: str) -> tuple[int print(msg) log_messages.append(msg) else: - experiment_id, experiment_name, mlflow_experiment_id = self._create_experiment_internal( + experiment_id, experiment_name, metric_experiment_id = self._create_experiment_internal( given_name, experiments_path, ) - if mlflow_experiment_id: + if metric_experiment_id: msg = ( f"Experiment {experiment_name} created with Experiment ID: {experiment_id}" - f" and MLflow Experiment ID: {mlflow_experiment_id} at {experiments_path}/{experiment_name}" + f" and Metric Experiment ID: {metric_experiment_id} at {experiments_path}/{experiment_name}" ) else: msg = ( @@ -213,25 +215,23 @@ def end_experiment(self, internal: bool = False) -> None: self.db.reset_all_tables() # Clear MLflow context only if using MLflow backend - tracking_backend = get_tracking_backend() - if tracking_backend in ["mlflow", "both"]: + if RF_MLFLOW_ENABLED=="true": import mlflow # Lazy import to avoid connection attempts in tensorboard-only mode - from rapidfireai.fit.utils.mlflow_manager import MLflowManager - try: if mlflow.active_run(): print("Ending active MLflow run before ending experiment") mlflow.end_run() - # Also clear context through MLflowManager if available + # Also clear context through RFMetricLogger if available try: - mlflow_manager = MLflowManager(MLFlowConfig.URL) - mlflow_manager.clear_context() + metric_logger_config = RFMetricLogger.get_default_metric_loggers(experiment_name=experiment_name) + metric_logger = RFMetricLogger(metric_logger_config, logger=logger) + metric_logger.clear_context() except Exception as e2: - print(f"Error clearing MLflow context through MLflowManager: {e2}") + print(f"Error clearing Metric context through RFMetricLogger: {e2}") except Exception as e: - print(f"Error clearing MLflow context: {e}") + print(f"Error clearing Metric context: {e}") # print experiment ended message msg = f"Experiment {experiment_name} ended" @@ -348,35 +348,35 @@ def _create_experiment_internal(self, given_name: str, experiments_path: str) -> """Create new experiment - if given_name already exists - increment suffix and create new experiment if given_name is new - create new experiment with given name - Returns: experiment_id, experiment_name, mlflow_experiment_id (or None if tensorboard-only) + Returns: experiment_id, experiment_name, metric_experiment_id (or None) """ try: given_name = given_name if given_name else "rf-exp" experiment_name = self._generate_unique_experiment_name(given_name, self.db.get_all_experiment_names()) - # Create MLflow experiment only if using MLflow backend - mlflow_experiment_id = None - tracking_backend = get_tracking_backend() - if tracking_backend in ["mlflow", "both"]: + # Create Metricexperiment only if available + metric_experiment_id = None + if RF_MLFLOW_ENABLED=="true": import mlflow # Lazy import to avoid connection attempts in tensorboard-only mode - from rapidfireai.fit.utils.mlflow_manager import MLflowManager - try: - mlflow_manager = MLflowManager(MLFlowConfig.URL) - mlflow_experiment_id = mlflow_manager.create_experiment(experiment_name) + # create logger + logger = RFLogger().create_logger(experiment_name) + metric_logger_config = RFMetricLogger.get_default_metric_loggers(experiment_name=experiment_name) + metric_logger = RFMetricLogger(metric_logger_config, logger=logger) + metric_experiment_id = metric_logger.create_experiment(experiment_name) mlflow.tracing.disable_notebook_display() except Exception as e: # Catch MLflow-specific exceptions (mlflow.exceptions.RestException, etc.) - raise ExperimentException(f"Error creating MLFlow experiment: {e}") from e + raise ExperimentException(f"Error creating Metric experiment: {e}") from e # write new experiment details to database experiment_id = self.db.create_experiment( experiment_name, - mlflow_experiment_id, # Will be None for tensorboard-only + metric_experiment_id, # Will be None for tensorboard-only config_options={"experiments_path": experiments_path}, ) - return experiment_id, experiment_name, mlflow_experiment_id + return experiment_id, experiment_name, metric_experiment_id except ExperimentException: # Re-raise ExperimentExceptions (including MLflow errors from above) raise diff --git a/rapidfireai/fit/utils/logging.py b/rapidfireai/fit/utils/logging.py index cd84887e..303760b7 100644 --- a/rapidfireai/fit/utils/logging.py +++ b/rapidfireai/fit/utils/logging.py @@ -19,10 +19,10 @@ class BaseRFLogger(ABC): def __init__(self, level: str = "DEBUG"): try: db = RfDb() - experiment_name = db.get_running_experiment()["experiment_name"] - log_file_path = self.get_log_file_path(experiment_name) - except Exception as e: - raise Exception("Error getting experiment name and log file path from database") from e + experiment_name = db.get_running_experiment()["experiment_name"] + except Exception: + experiment_name = "no_active_experiment" + log_file_path = self.get_log_file_path(experiment_name) with BaseRFLogger._lock: # Reset loggers if experiment changed diff --git a/rapidfireai/fit/utils/metric_logger.py b/rapidfireai/fit/utils/metric_logger.py deleted file mode 100644 index 53b9def6..00000000 --- a/rapidfireai/fit/utils/metric_logger.py +++ /dev/null @@ -1,394 +0,0 @@ -""" -Metric Logger abstraction layer for RapidFire AI. - -This module provides a unified interface for logging metrics to different backends -(MLflow, TensorBoard, or both). This abstraction allows minimal changes to core ML code -while supporting multiple tracking systems. -""" - -import os -from abc import ABC, abstractmethod -from pathlib import Path -from typing import Optional - -from rapidfireai.utils.os_utils import mkdir_p - -# Note: MLflowManager is imported lazily in MLflowMetricLogger to avoid -# connection attempts when using tensorboard-only mode - - -class MetricLogger(ABC): - """ - Abstract base class for metric logging. - - Provides a unified interface for logging metrics, parameters, and managing runs - across different tracking backends (MLflow, TensorBoard, etc.). - """ - - @abstractmethod - def create_run(self, run_name: str) -> str: - """ - Create a new run and return run_id. - - Args: - run_name: Name for the run - - Returns: - Run ID string - """ - pass - - @abstractmethod - def log_param(self, run_id: str, key: str, value: str) -> None: - """ - Log a parameter to a specific run. - - Args: - run_id: Run identifier - key: Parameter name - value: Parameter value - """ - pass - - @abstractmethod - def log_metric(self, run_id: str, key: str, value: float, step: Optional[int] = None) -> None: - """ - Log a metric to a specific run. - - Args: - run_id: Run identifier - key: Metric name - value: Metric value - step: Optional step number for the metric - """ - pass - - @abstractmethod - def end_run(self, run_id: str) -> None: - """ - End a specific run. - - Args: - run_id: Run identifier - """ - pass - - @abstractmethod - def get_run_metrics(self, run_id: str) -> dict: - """ - Get all metrics for a specific run. - - Args: - run_id: Run identifier - - Returns: - Dictionary of metrics - """ - pass - - def delete_run(self, run_id: str) -> None: - """ - Delete a specific run (optional, not all backends support this). - - Args: - run_id: Run identifier - """ - pass - - def clear_context(self) -> None: - """Clear the tracking context (optional, not all backends need this).""" - pass - - -class MLflowMetricLogger(MetricLogger): - """ - MLflow implementation of MetricLogger. - - Wraps the existing MLflowManager to provide the MetricLogger interface. - """ - - def __init__(self, tracking_uri: str): - """ - Initialize MLflow metric logger. - - Args: - tracking_uri: MLflow tracking server URI - """ - # Lazy import to avoid connection attempts in tensorboard-only mode - from rapidfireai.fit.utils.mlflow_manager import MLflowManager - self.mlflow_manager = MLflowManager(tracking_uri) - - def get_experiment(self, experiment_name: str) -> str: - """ - Get existing experiment by name. - - Args: - experiment_name: Name of the experiment - - Returns: - Experiment ID - """ - return self.mlflow_manager.get_experiment(experiment_name) - - def create_run(self, run_name: str) -> str: - """Create a new MLflow run.""" - return self.mlflow_manager.create_run(run_name) - - def log_param(self, run_id: str, key: str, value: str) -> None: - """Log a parameter to MLflow.""" - self.mlflow_manager.log_param(run_id, key, value) - - def log_metric(self, run_id: str, key: str, value: float, step: Optional[int] = None) -> None: - """Log a metric to MLflow.""" - self.mlflow_manager.log_metric(run_id, key, value, step=step) - - def end_run(self, run_id: str) -> None: - """End an MLflow run.""" - self.mlflow_manager.end_run(run_id) - - def get_run_metrics(self, run_id: str) -> dict: - """Get metrics from MLflow.""" - return self.mlflow_manager.get_run_metrics(run_id) - - def delete_run(self, run_id: str) -> None: - """Delete an MLflow run.""" - self.mlflow_manager.delete_run(run_id) - - def clear_context(self) -> None: - """Clear MLflow context.""" - self.mlflow_manager.clear_context() - - -class TensorBoardMetricLogger(MetricLogger): - """ - TensorBoard implementation of MetricLogger. - - Uses torch.utils.tensorboard.SummaryWriter to log metrics to TensorBoard. - """ - - def __init__(self, log_dir: str): - """ - Initialize TensorBoard metric logger. - - Args: - log_dir: Directory for TensorBoard logs - """ - from torch.utils.tensorboard import SummaryWriter - - self.log_dir = Path(log_dir) - try: - mkdir_p(self.log_dir, notify=False) - except (PermissionError, OSError) as e: - print(f"Error creating directory: {e}") - raise - self.writers = {} # Map run_id -> SummaryWriter - - def create_run(self, run_name: str) -> str: - """ - Create a new TensorBoard run. - - For TensorBoard, we use run_name as the run_id and create a subdirectory - in the log directory. - """ - from torch.utils.tensorboard import SummaryWriter - - run_log_dir = os.path.join(self.log_dir, run_name) - try: - mkdir_p(run_log_dir, notify=False) - except (PermissionError, OSError) as e: - print(f"Error creating directory: {e}") - raise - - # Create SummaryWriter for this run - writer = SummaryWriter(log_dir=run_log_dir) - self.writers[run_name] = writer - - return run_name - - def log_param(self, run_id: str, key: str, value: str) -> None: - """ - Log a parameter to TensorBoard. - - TensorBoard doesn't have native parameter logging, so we log as text. - """ - if run_id not in self.writers: - self.create_run(run_id) - - writer = self.writers[run_id] - writer.add_text(f"params/{key}", str(value), global_step=0) - writer.flush() - - def log_metric(self, run_id: str, key: str, value: float, step: Optional[int] = None) -> None: - """ - Log a metric to TensorBoard. - - Args: - run_id: Run identifier - key: Metric name - value: Metric value - step: Step number (required for TensorBoard time series) - """ - if run_id not in self.writers: - self.create_run(run_id) - - writer = self.writers[run_id] - # Use step=0 if not provided (fallback) - writer.add_scalar(key, value, global_step=step if step is not None else 0) - # Flush immediately to ensure real-time updates - writer.flush() - - def end_run(self, run_id: str) -> None: - """End a TensorBoard run by closing the writer.""" - if run_id in self.writers: - self.writers[run_id].close() - del self.writers[run_id] - - def get_run_metrics(self, run_id: str) -> dict: - """ - Get metrics from TensorBoard. - - Note: TensorBoard doesn't provide easy API access to logged metrics. - This returns an empty dict. For viewing metrics, use TensorBoard UI. - """ - return {} - - def delete_run(self, run_id: str) -> None: - """ - Delete a TensorBoard run by moving its directory outside the log tree (soft delete). - - This is a soft delete - the data is moved to a sibling '{log_dir}_deleted' directory - outside TensorBoard's scan path, so it won't appear in the UI. Data can be manually - recovered if needed by moving it back to the log_dir. - - Args: - run_id: Run identifier (directory name) - """ - import shutil - import time - - # Close and remove writer if active - if run_id in self.writers: - self.writers[run_id].close() - del self.writers[run_id] - - # Move the run directory to sibling deleted folder (outside log_dir tree) - run_log_dir = os.path.join(self.log_dir, run_id) - if os.path.exists(run_log_dir) and os.path.isdir(run_log_dir): - # Create deleted directory as sibling, not child, of log_dir - deleted_dir = os.path.join(self.log_dir.parent, f"{self.log_dir.name}_deleted") - try: - mkdir_p(deleted_dir, notify=False) - except (PermissionError, OSError) as e: - print(f"Error creating directory: {e}") - raise - - # Add timestamp to avoid name collisions - timestamp = int(time.time()) - destination = os.path.join(deleted_dir, f"{run_id}_{timestamp}") - - shutil.move(run_log_dir, destination) - - def __del__(self): - """Clean up all writers on deletion.""" - for writer in self.writers.values(): - writer.close() - - -class DualMetricLogger(MetricLogger): - """ - Dual implementation that logs to both MLflow and TensorBoard. - - This allows users to benefit from both tracking systems simultaneously: - - MLflow for experiment comparison and model registry - - TensorBoard for real-time training visualization (especially useful in Colab) - """ - - def __init__(self, mlflow_tracking_uri: str, tensorboard_log_dir: str): - """ - Initialize dual metric logger. - - Args: - mlflow_tracking_uri: MLflow tracking server URI - tensorboard_log_dir: Directory for TensorBoard logs - """ - self.mlflow_logger = MLflowMetricLogger(mlflow_tracking_uri) - self.tensorboard_logger = TensorBoardMetricLogger(tensorboard_log_dir) - - def get_experiment(self, experiment_name: str) -> str: - """Get experiment from MLflow (TensorBoard doesn't have experiments).""" - return self.mlflow_logger.get_experiment(experiment_name) - - def create_run(self, run_name: str) -> str: - """Create run in both MLflow and TensorBoard.""" - mlflow_run_id = self.mlflow_logger.create_run(run_name) - self.tensorboard_logger.create_run(run_name) - # Return MLflow run_id as the canonical ID - return mlflow_run_id - - def log_param(self, run_id: str, key: str, value: str) -> None: - """Log parameter to both backends.""" - self.mlflow_logger.log_param(run_id, key, value) - self.tensorboard_logger.log_param(run_id, key, value) - - def log_metric(self, run_id: str, key: str, value: float, step: Optional[int] = None) -> None: - """Log metric to both backends.""" - self.mlflow_logger.log_metric(run_id, key, value, step=step) - self.tensorboard_logger.log_metric(run_id, key, value, step=step) - - def end_run(self, run_id: str) -> None: - """End run in both backends.""" - self.mlflow_logger.end_run(run_id) - self.tensorboard_logger.end_run(run_id) - - def get_run_metrics(self, run_id: str) -> dict: - """Get metrics from MLflow (primary source).""" - return self.mlflow_logger.get_run_metrics(run_id) - - def delete_run(self, run_id: str) -> None: - """Delete run from both MLflow and TensorBoard.""" - self.mlflow_logger.delete_run(run_id) - self.tensorboard_logger.delete_run(run_id) - - def clear_context(self) -> None: - """Clear context in both backends.""" - self.mlflow_logger.clear_context() - - -def create_metric_logger( - backend: str, - mlflow_tracking_uri: Optional[str] = None, - tensorboard_log_dir: Optional[str] = None, -) -> MetricLogger: - """ - Factory function to create the appropriate metric logger. - - Args: - backend: Tracking backend to use ('mlflow', 'tensorboard', or 'both') - mlflow_tracking_uri: MLflow tracking server URI (required if backend includes MLflow) - tensorboard_log_dir: TensorBoard log directory (required if backend includes TensorBoard) - - Returns: - MetricLogger instance - - Raises: - ValueError: If backend is invalid or required parameters are missing - """ - backend = backend.lower() - - if backend == "mlflow": - if not mlflow_tracking_uri: - raise ValueError("mlflow_tracking_uri required for MLflow backend") - return MLflowMetricLogger(mlflow_tracking_uri) - - elif backend == "tensorboard": - if not tensorboard_log_dir: - raise ValueError("tensorboard_log_dir required for TensorBoard backend") - return TensorBoardMetricLogger(tensorboard_log_dir) - - elif backend == "both": - if not mlflow_tracking_uri or not tensorboard_log_dir: - raise ValueError("Both mlflow_tracking_uri and tensorboard_log_dir required for dual backend") - return DualMetricLogger(mlflow_tracking_uri, tensorboard_log_dir) - - else: - raise ValueError(f"Invalid backend: {backend}. Must be 'mlflow', 'tensorboard', or 'both'") diff --git a/rapidfireai/fit/utils/trainer_config.py b/rapidfireai/fit/utils/trainer_config.py index c57e3630..d129f49a 100644 --- a/rapidfireai/fit/utils/trainer_config.py +++ b/rapidfireai/fit/utils/trainer_config.py @@ -13,7 +13,7 @@ class TrainerConfig: worker_id: int run_id: int - mlflow_run_id: str + metric_run_id: str config_leaf: dict[str, Any] total_steps: int completed_steps: int diff --git a/rapidfireai/utils/constants.py b/rapidfireai/utils/constants.py index 18b62ff5..936ebbe3 100644 --- a/rapidfireai/utils/constants.py +++ b/rapidfireai/utils/constants.py @@ -13,7 +13,7 @@ RF_LOG_PATH = os.getenv("RF_LOG_PATH", os.path.join(RF_HOME, "logs")) RF_EXPERIMENT_PATH = os.getenv("RF_EXPERIMENT_PATH", os.path.join(RF_HOME, "rapidfire_experiments")) RF_DB_PATH = os.getenv("RF_DB_PATH", os.path.expanduser(os.path.join(RF_HOME, "db"))) -RF_TENSORBOARD_LOG_DIR = os.getenv("RF_TENSORBOARD_LOG_DIR", None) +RF_TENSORBOARD_LOG_DIR = os.getenv("RF_TENSORBOARD_LOG_DIR", f"{RF_EXPERIMENT_PATH}/tensorboard_logs") RF_TRAINING_LOG_FILENAME = os.getenv("RF_TRAINING_LOG_FILENAME", "training.log") RF_TRAINER_OUTPUT = os.getenv("RF_TRAINER_OUTPUT", os.path.join(RF_HOME, "trainer_output")) @@ -88,4 +88,6 @@ class ColabConfig: def __str__(self): return f"ColabConfig(ON_COLAB={self.ON_COLAB}, RF_COLAB_MODE={self.RF_COLAB_MODE})" -RF_TRACKING_BACKEND = os.getenv("RF_TRACKING_BACKEND", "mlflow" if not ColabConfig.ON_COLAB else "tensorboard") +RF_MLFLOW_ENABLED = os.getenv("RF_MLFLOW_ENABLED", "true" if not ColabConfig.ON_COLAB else "false") +RF_TENSORBOARD_ENABLED = os.getenv("RF_TENSORBOARD_ENABLED", "false" if not ColabConfig.ON_COLAB else "true") +RF_TRACKIO_ENABLED = os.getenv("RF_TRACKIO_ENABLED", "false") \ No newline at end of file diff --git a/rapidfireai/utils/doctor.py b/rapidfireai/utils/doctor.py index 6215f9c6..7fa20fbd 100644 --- a/rapidfireai/utils/doctor.py +++ b/rapidfireai/utils/doctor.py @@ -21,7 +21,9 @@ RF_DB_PATH, RF_TENSORBOARD_LOG_DIR, RF_TRAINING_LOG_FILENAME, - RF_TRACKING_BACKEND, + RF_MLFLOW_ENABLED, + RF_TENSORBOARD_ENABLED, + RF_TRACKIO_ENABLED, RF_LOG_FILENAME, ) @@ -202,7 +204,9 @@ def get_doctor_info(log_lines: int = 10): print(f"RF_TENSORBOARD_LOG_DIR: {RF_TENSORBOARD_LOG_DIR}") print(f"RF_LOG_FILENAME: {RF_LOG_FILENAME}") print(f"RF_TRAINING_LOG_FILENAME: {RF_TRAINING_LOG_FILENAME}") - print(f"RF_TRACKING_BACKEND: {RF_TRACKING_BACKEND}") + print(f"RF_MLFLOW_ENABLED: {RF_MLFLOW_ENABLED}") + print(f"RF_TENSORBOARD_ENABLED: {RF_TENSORBOARD_ENABLED}") + print(f"RF_TRACKIO_ENABLED: {RF_TRACKIO_ENABLED}") print(f"JupyterConfig: {JupyterConfig()}") print(f"DispatcherConfig: {DispatcherConfig()}") print(f"MLFlowConfig: {MLFlowConfig()}") diff --git a/rapidfireai/utils/metric_logger.py b/rapidfireai/utils/metric_logger.py new file mode 100644 index 00000000..3a167e16 --- /dev/null +++ b/rapidfireai/utils/metric_logger.py @@ -0,0 +1,122 @@ +""" +Metric Logger abstraction layer for RapidFire AI. + +This module provides a unified interface for logging metrics to different backends +(MLflow, TensorBoard, TrackIO, or combinations). This abstraction allows minimal changes to core ML code +while supporting multiple tracking systems. +""" + +from abc import ABC, abstractmethod +from typing import Optional, TypedDict, Any +from enum import Enum + + +class MetricLogger(ABC): + """ + Abstract base class for metric logging. + + Provides a unified interface for logging metrics, parameters, and managing runs + across different tracking backends (MLflow, TensorBoard, etc.). + """ + + @abstractmethod + def create_experiment(self, experiment_name: str) -> str: + """ + Create a new experiment and return experiment_id. + """ + pass + + @abstractmethod + def get_experiment(self, experiment_name: str) -> str: + """ + Get existing experiment by name and set it as active. + """ + pass + + @abstractmethod + def create_run(self, run_name: str) -> str: + """ + Create a new run and return run_id. + + Args: + run_name: Name for the run + + Returns: + Run ID string + """ + pass + + @abstractmethod + def log_param(self, run_id: str, key: str, value: str) -> None: + """ + Log a parameter to a specific run. + + Args: + run_id: Run identifier + key: Parameter name + value: Parameter value + """ + pass + + @abstractmethod + def log_metric(self, run_id: str, key: str, value: float, step: Optional[int] = None) -> None: + """ + Log a metric to a specific run. + + Args: + run_id: Run identifier + key: Metric name + value: Metric value + step: Optional step number for the metric + """ + pass + + @abstractmethod + def get_run_metrics(self, run_id: str) -> dict: + """ + Get all metrics for a specific run. + + Args: + run_id: Run identifier + + Returns: + Dictionary of metrics + """ + pass + + @abstractmethod + def end_run(self, run_id: str) -> None: + """ + End a specific run. + + Args: + run_id: Run identifier + """ + pass + + @abstractmethod + def delete_run(self, run_id: str) -> None: + """ + Delete a specific run (optional, not all backends support this). + + Args: + run_id: Run identifier + """ + pass + + @abstractmethod + def clear_context(self) -> None: + """Clear the tracking context (optional, not all backends need this).""" + pass + +class MetricLoggerType(Enum): + """Enum for MetricLogger types.""" + MLFLOW = "mlflow" + TENSORBOARD = "tensorboard" + TRACKIO = "trackio" + MULTIPLE = "multiple" + +class MetricLoggerConfig(TypedDict): + """Config for MetricLogger.""" + type: MetricLoggerType + config: Any diff --git a/rapidfireai/fit/utils/mlflow_manager.py b/rapidfireai/utils/metric_mlflow_manager.py similarity index 64% rename from rapidfireai/fit/utils/mlflow_manager.py rename to rapidfireai/utils/metric_mlflow_manager.py index a0407334..af762399 100644 --- a/rapidfireai/fit/utils/mlflow_manager.py +++ b/rapidfireai/utils/metric_mlflow_manager.py @@ -3,18 +3,31 @@ import os import mlflow from mlflow.tracking import MlflowClient +from typing import Any +from rapidfireai.utils.metric_logger import MetricLogger, MetricLoggerType +from rapidfireai.utils.ping import ping_server +from rapidfireai.utils.constants import MLFlowConfig +from rapidfireai.evals.utils.logger import RFLogger -class MLflowManager: - def __init__(self, tracking_uri: str): +class MLflowMetricLogger(MetricLogger): + def __init__(self, tracking_uri: str, logger: RFLogger = None, init_kwargs: dict[str, Any] = None): """ Initialize MLflow Manager with tracking URI. Args: tracking_uri: MLflow tracking server URI + init_kwargs: Initialization kwargs for MLflow """ - mlflow.set_tracking_uri(tracking_uri) - self.client = MlflowClient(tracking_uri=tracking_uri) + self.type = MetricLoggerType.MLFLOW + self.client = None + self.logger = logger if logger is not None else RFLogger() + self.init_kwargs = init_kwargs # Not currently used + if not ping_server(MLFlowConfig.HOST, MLFlowConfig.PORT, 2): + raise ConnectionRefusedError(f"MLflow server not available at {MLFlowConfig.URL}. MLflow logging will be disabled.") + else: + mlflow.set_tracking_uri(tracking_uri) + self.client = MlflowClient(tracking_uri=tracking_uri) self.experiment_id = None def create_experiment(self, experiment_name: str) -> str: @@ -39,20 +52,20 @@ def create_run(self, run_name: str) -> str: run = self.client.create_run(self.experiment_id, run_name=run_name) return run.info.run_id - def log_param(self, mlflow_run_id: str, key: str, value: str) -> None: + def log_param(self, run_id: str, key: str, value: str) -> None: """Log parameters to a specific run.""" - self.client.log_param(mlflow_run_id, key, value) + self.client.log_param(run_id, key, value) - def log_metric(self, mlflow_run_id: str, key: str, value: float, step: int = None) -> None: + def log_metric(self, run_id: str, key: str, value: float, step: int = None) -> None: """Log a metric to a specific run.""" - self.client.log_metric(mlflow_run_id, key, value, step=step) + self.client.log_metric(run_id, key, value, step=step) - def get_run_metrics(self, mlflow_run_id: str) -> dict[str, list[tuple[int, float]]]: + def get_run_metrics(self, run_id: str) -> dict[str, list[tuple[int, float]]]: """ Get all metrics for a specific run. """ try: - run = self.client.get_run(mlflow_run_id) + run = self.client.get_run(run_id) if run is None: return {} @@ -60,45 +73,45 @@ def get_run_metrics(self, mlflow_run_id: str) -> dict[str, list[tuple[int, float metric_dict = {} for metric_key in run_data.metrics.keys(): try: - metric_history = self.client.get_metric_history(mlflow_run_id, metric_key) + metric_history = self.client.get_metric_history(run_id, metric_key) metric_dict[metric_key] = [(metric.step, metric.value) for metric in metric_history] except Exception as e: print(f"Error getting metric history for {metric_key}: {e}") continue return metric_dict except Exception as e: - print(f"Error getting metrics for run {mlflow_run_id}: {e}") + print(f"Error getting metrics for run {run_id}: {e}") return {} - def end_run(self, mlflow_run_id: str) -> None: + def end_run(self, run_id: str) -> None: """End a specific run.""" # Check if run exists before terminating - run = self.client.get_run(mlflow_run_id) + run = self.client.get_run(run_id) if run is not None: # First terminate the run on the server - self.client.set_terminated(mlflow_run_id) + self.client.set_terminated(run_id) # Then clear the local MLflow context if this is the active run try: current_run = mlflow.active_run() # Make sure we end the run on the correct worker - if current_run and current_run.info.run_id == mlflow_run_id: + if current_run and current_run.info.run_id == run_id: mlflow.end_run() else: - print(f"Run {mlflow_run_id} is not the active run, no local context to clear") + print(f"Run {run_id} is not the active run, no local context to clear") except Exception as e: print(f"Error clearing local MLflow context: {e}") else: - print(f"MLflow run {mlflow_run_id} not found, cannot terminate") + print(f"MLflow run {run_id} not found, cannot terminate") - def delete_run(self, mlflow_run_id: str) -> None: + def delete_run(self, run_id: str) -> None: """Delete a specific run.""" # Check if run exists before deleting - run = self.client.get_run(mlflow_run_id) + run = self.client.get_run(run_id) if run is not None: - self.client.delete_run(mlflow_run_id) + self.client.delete_run(run_id) else: - raise ValueError(f"Run '{mlflow_run_id}' not found") + raise ValueError(f"Run '{run_id}' not found") def clear_context(self) -> None: """Clear the MLflow context by ending any active run.""" diff --git a/rapidfireai/utils/metric_rfmetric_manager.py b/rapidfireai/utils/metric_rfmetric_manager.py new file mode 100644 index 00000000..c3f7aee2 --- /dev/null +++ b/rapidfireai/utils/metric_rfmetric_manager.py @@ -0,0 +1,186 @@ +""" +This module contains the RFMetricLogger class which is responsible for managing the metric loggers. +""" + +from typing import Optional + +from rapidfireai.utils.metric_logger import MetricLogger, MetricLoggerConfig, MetricLoggerType +from rapidfireai.utils.metric_mlflow_manager import MLflowMetricLogger +from rapidfireai.utils.metric_tensorboard_manager import TensorBoardMetricLogger +from rapidfireai.utils.metric_trackio_manager import TrackIOMetricLogger +from rapidfireai.evals.utils.logger import RFLogger +from rapidfireai.utils.constants import ( + MLFlowConfig, + RF_MLFLOW_ENABLED, + RF_TENSORBOARD_ENABLED, + RF_TRACKIO_ENABLED, + RF_TENSORBOARD_LOG_DIR +) + +class RFMetricLogger(MetricLogger): + """ + Implementation of MetricLogger that logs to multiple backends. Currently no + more than one of each type is supported. + + This allows users to benefit from multiple tracking systems simultaneously: + - MLflow for experiment comparison and model registry + - TensorBoard for real-time training visualization (especially useful in Colab) + - TrackIO for local-first experiment tracking + """ + + def __init__(self, metric_loggers: dict[str, MetricLoggerConfig], logger: RFLogger = None): + """ + Initialize RFMetricLogger. + + Args: + metric_loggers: Dictionary of metric loggers to use: + - "name": {"type": MetricLoggerType.MLFLOW, "config": {"tracking_uri": "http://localhost:8852"}} + - "name": {"type": MetricLoggerType.TENSORBOARD, "config": {"log_dir": "logs/tensorboard"}} + - "name": {"type": MetricLoggerType.TRACKIO, "config": {"tracking_uri": None}} + """ + self.type = MetricLoggerType.MULTIPLE + self.logger = logger if logger is not None else RFLogger() + if not isinstance(metric_loggers, dict): + raise ValueError("metric_loggers must be a dictionary") + if len(metric_loggers) == 0: + raise ValueError("metric_loggers must contain at least one metric logger") + self.metric_loggers = {} + for metric_logger_name, metric_logger_config in metric_loggers.items(): + if metric_logger_config.get("type") not in MetricLoggerType: + raise ValueError(f"metric_logger_config for {metric_logger_name} must be a valid MetricLoggerType") + if metric_logger_config.get("type") == MetricLoggerType.MLFLOW: + self.metric_loggers[metric_logger_name] = MLflowMetricLogger(metric_logger_config["config"]["tracking_uri"], logger=self.logger) + self.logger.info(f"Initialized MLflowMetricLogger: {metric_logger_name}") + elif metric_logger_config.get("type") == MetricLoggerType.TENSORBOARD: + self.metric_loggers[metric_logger_name] = TensorBoardMetricLogger(metric_logger_config["config"]["log_dir"], logger=self.logger) + self.logger.info(f"Initialized TensorBoardMetricLogger: {metric_logger_name}") + elif metric_logger_config.get("type") == MetricLoggerType.TRACKIO: + self.metric_loggers[metric_logger_name] = TrackIOMetricLogger( + experiment_name=metric_logger_config["config"]["experiment_name"], + logger=self.logger, + init_kwargs=metric_logger_config["config"].get("init_kwargs") + ) + self.logger.info(f"Initialized TrackIOMetricLogger: {metric_logger_name}") + else: + raise ValueError(f"metric_logger_config for {metric_logger_name} must be a valid MetricLoggerType") + + def add_logger(self, metric_logger_name: str, metric_logger_config: MetricLoggerConfig) -> None: + """Add a metric logger to the dictionary.""" + if metric_logger_config.get("type") not in MetricLoggerType: + raise ValueError(f"metric_logger_config for {metric_logger_name} must be a valid MetricLoggerType") + if metric_logger_config.get("type") == MetricLoggerType.MLFLOW: + self.metric_loggers[metric_logger_name] = MLflowMetricLogger(metric_logger_config["config"]["tracking_uri"]) + elif metric_logger_config.get("type") == MetricLoggerType.TENSORBOARD: + self.metric_loggers[metric_logger_name] = TensorBoardMetricLogger(metric_logger_config["config"]["log_dir"]) + elif metric_logger_config.get("type") == MetricLoggerType.TRACKIO: + self.metric_loggers[metric_logger_name] = TrackIOMetricLogger( + experiment_name=metric_logger_config["config"]["experiment_name"], + init_kwargs=metric_logger_config["config"].get("init_kwargs") + ) + else: + raise ValueError(f"metric_logger_config for {metric_logger_name} must be a valid MetricLoggerType") + + def create_experiment(self, experiment_name: str) -> str: + """Create experiment in MetricLogger.""" + for metric_logger in self.metric_loggers.values(): + if metric_logger.type == MetricLoggerType.MLFLOW: + self.logger.info(f"Creating MLflow experiment: {experiment_name}") + return metric_logger.create_experiment(experiment_name) + return experiment_name + + def get_experiment(self, experiment_name: str) -> str: + """Get experiment from MetricLogger(TensorBoard doesn't have experiments).""" + for metric_logger in self.metric_loggers.values(): + if metric_logger.type == MetricLoggerType.MLFLOW: + return metric_logger.get_experiment(experiment_name) + return experiment_name + + def create_run(self, run_name: str) -> str: + """Create run in MetricLogger.""" + mlflow_run = None + this_run = None + for metric_logger in self.metric_loggers.values(): + this_run = metric_logger.create_run(run_name) + if metric_logger.type == MetricLoggerType.MLFLOW: + mlflow_run = this_run + if mlflow_run is not None: + self.logger.info(f"Created MLflow run: {mlflow_run}") + return mlflow_run + return run_name + + def log_param(self, run_id: str, key: str, value: str) -> None: + """Log parameter to MetricLogger.""" + for metric_logger_name, metric_logger in self.metric_loggers.items(): + if hasattr(metric_logger, "log_param"): + metric_logger.log_param(run_id, key, value) + else: + raise ValueError(f"metric_logger for {metric_logger_name} does not support log_param") + + def log_metric(self, run_id: str, key: str, value: float, step: Optional[int] = None) -> None: + """Log metric to MetricLogger.""" + for metric_logger_name, metric_logger in self.metric_loggers.items(): + if hasattr(metric_logger, "log_metric"): + metric_logger.log_metric(run_id, key, value, step=step) + else: + raise ValueError(f"metric_logger for {metric_logger_name} does not support log_metric") + + def get_run_metrics(self, run_id: str) -> dict: + """Get metrics from MetricLogger.""" + for metric_logger in self.metric_loggers.values(): + if metric_logger.type == MetricLoggerType.MLFLOW: + return metric_logger.get_run_metrics(run_id) + return {} + + def end_run(self, run_id: str) -> None: + """End run in MetricLogger.""" + for metric_logger_name, metric_logger in self.metric_loggers.items(): + self.logger.info(f"Ending run: {run_id} in {metric_logger_name}") + if hasattr(metric_logger, "end_run"): + metric_logger.end_run(run_id) + else: + raise ValueError(f"metric_logger for {metric_logger_name} does not support end_run") + + def delete_run(self, run_id: str) -> None: + """Delete run from MetricLogger.""" + for metric_logger_name, metric_logger in self.metric_loggers.items(): + if hasattr(metric_logger, "delete_run"): + metric_logger.delete_run(run_id) + else: + raise ValueError(f"metric_logger for {metric_logger_name} does not support delete_run") + return None + + def clear_context(self) -> None: + """Clear context in MetricLogger.""" + for metric_logger_name, metric_logger in self.metric_loggers.items(): + if hasattr(metric_logger, "clear_context"): + metric_logger.clear_context() + else: + raise ValueError(f"metric_logger for {metric_logger_name} does not support clear_context") + return None + + @classmethod + def get_default_metric_loggers(cls, experiment_name: str) -> dict[str, MetricLoggerConfig]: + """Get default metric loggers.""" + metric_loggers = {} + if RF_MLFLOW_ENABLED == "true": + metric_loggers["rf_mlflow"] = { + "type": MetricLoggerType.MLFLOW, + "config": { + "tracking_uri": MLFlowConfig.URL, + }, + } + if RF_TENSORBOARD_ENABLED == "true": + metric_loggers["rf_tensorboard"] = { + "type": MetricLoggerType.TENSORBOARD, + "config": { + "log_dir": RF_TENSORBOARD_LOG_DIR, + }, + } + if RF_TRACKIO_ENABLED == "true": + metric_loggers["rf_trackio"] = { + "type": MetricLoggerType.TRACKIO, + "config": { + "experiment_name": experiment_name, + }, + } + return metric_loggers diff --git a/rapidfireai/utils/metric_tensorboard_manager.py b/rapidfireai/utils/metric_tensorboard_manager.py new file mode 100644 index 00000000..2e0585ef --- /dev/null +++ b/rapidfireai/utils/metric_tensorboard_manager.py @@ -0,0 +1,177 @@ +""" +TensorBoard implementation of MetricLogger. + +Uses torch.utils.tensorboard.SummaryWriter to log metrics to TensorBoard. +""" + +from rapidfireai.utils.metric_logger import MetricLogger, MetricLoggerType +from pathlib import Path +from typing import Optional, Any +import os +from rapidfireai.utils.os_utils import mkdir_p +from rapidfireai.evals.utils.logger import RFLogger + +class TensorBoardMetricLogger(MetricLogger): + """ + TensorBoard implementation of MetricLogger. + + Uses torch.utils.tensorboard.SummaryWriter to log metrics to TensorBoard. + """ + + def __init__(self, log_dir: str, logger: RFLogger = None, init_kwargs: dict[str, Any] = None): + """ + Initialize TensorBoard metric logger. + + Args: + log_dir: Directory for TensorBoard logs + init_kwargs: Initialization kwargs for TensorBoard + """ + from torch.utils.tensorboard import SummaryWriter + + self.type = MetricLoggerType.TENSORBOARD + self.log_dir = Path(log_dir) + self.logger = logger if logger is not None else RFLogger() + self.init_kwargs = init_kwargs # Not currently used + try: + mkdir_p(self.log_dir, notify=False) + except (PermissionError, OSError) as e: + print(f"Error creating directory: {e}") + raise + self.writers = {} # Map run_id -> SummaryWriter + + def create_experiment(self, experiment_name: str) -> str: + """ + Create a new TensorBoard experiment. + """ + return experiment_name + + def get_experiment(self, experiment_name: str) -> str: + """ + Get existing TensorBoard experiment by name and set it as active. + """ + return experiment_name + + def create_run(self, run_name: str) -> str: + """ + Create a new TensorBoard run. + + For TensorBoard, we use run_name as the run_id and create a subdirectory + in the log directory. + """ + from torch.utils.tensorboard import SummaryWriter + + run_log_dir = os.path.join(self.log_dir, run_name) + try: + mkdir_p(run_log_dir, notify=False) + except (PermissionError, OSError) as e: + print(f"Error creating directory: {e}") + raise + + # Create SummaryWriter for this run + writer = SummaryWriter(log_dir=run_log_dir) + self.writers[run_name] = writer + + return run_name + + def log_param(self, run_id: str, key: str, value: str) -> None: + """ + Log a parameter to TensorBoard. + + TensorBoard doesn't have native parameter logging, so we log as text. + """ + if run_id not in self.writers: + self.create_run(run_id) + + writer = self.writers[run_id] + writer.add_text(f"params/{key}", str(value), global_step=0) + writer.flush() + + def log_metric(self, run_id: str, key: str, value: float, step: Optional[int] = None) -> None: + """ + Log a metric to TensorBoard. + + Args: + run_id: Run identifier + key: Metric name + value: Metric value + step: Step number (required for TensorBoard time series) + """ + if run_id not in self.writers: + self.create_run(run_id) + + writer = self.writers[run_id] + # Use step=0 if not provided (fallback) + writer.add_scalar(key, value, global_step=step if step is not None else 0) + # Flush immediately to ensure real-time updates + writer.flush() + + def get_run_metrics(self, run_id: str) -> dict: + """ + Get metrics from TensorBoard. + + Note: TensorBoard doesn't provide easy API access to logged metrics. + This returns an empty dict. For viewing metrics, use TensorBoard UI. + """ + return {} + + def end_run(self, run_id: str) -> None: + """End a TensorBoard run by closing the writer.""" + if run_id in self.writers: + self.writers[run_id].close() + del self.writers[run_id] + + + def delete_run(self, run_id: str) -> None: + """ + Delete a TensorBoard run by moving its directory outside the log tree (soft delete). + + This is a soft delete - the data is moved to a sibling '{log_dir}_deleted' directory + outside TensorBoard's scan path, so it won't appear in the UI. Data can be manually + recovered if needed by moving it back to the log_dir. + + Args: + run_id: Run identifier (directory name) + """ + import shutil + import time + + # Close and remove writer if active + if run_id in self.writers: + self.writers[run_id].close() + del self.writers[run_id] + + # Move the run directory to sibling deleted folder (outside log_dir tree) + run_log_dir = os.path.join(self.log_dir, run_id) + if os.path.exists(run_log_dir) and os.path.isdir(run_log_dir): + # Create deleted directory as sibling, not child, of log_dir + deleted_dir = os.path.join(self.log_dir.parent, f"{self.log_dir.name}_deleted") + try: + mkdir_p(deleted_dir, notify=False) + except (PermissionError, OSError) as e: + print(f"Error creating directory: {e}") + raise + + # Add timestamp to avoid name collisions + timestamp = int(time.time()) + destination = os.path.join(deleted_dir, f"{run_id}_{timestamp}") + + shutil.move(run_log_dir, destination) + + def __del__(self): + """Clean up all writers on deletion.""" + for writer in self.writers.values(): + writer.close() + + def clear_context(self) -> None: + """Clear the TensorBoard context.""" + if self.writers: + # Iterate over a snapshot since `end_run()` mutates `self.writers`. + for run_id in list(self.writers): + self.end_run(run_id) + else: + print("No active TensorBoard runs to clear") + + + + + diff --git a/rapidfireai/utils/metric_trackio_manager.py b/rapidfireai/utils/metric_trackio_manager.py new file mode 100644 index 00000000..559b77ab --- /dev/null +++ b/rapidfireai/utils/metric_trackio_manager.py @@ -0,0 +1,136 @@ +"""This module contains the TrackIOManager class which is responsible for managing the TrackIO runs.""" + +import trackio +from typing import Any +from rapidfireai.utils.metric_logger import MetricLogger, MetricLoggerType +from rapidfireai.evals.utils.logger import RFLogger + + +class TrackIOMetricLogger(MetricLogger): + def __init__(self, experiment_name: str, logger: RFLogger = None, init_kwargs: dict[str, Any] = None): + """ + Initialize TrackIO Manager. + + Args: + init_kwargs: Initialization kwargs for TrackIO + """ + self.init_kwargs = init_kwargs + self.type = MetricLoggerType.TRACKIO + if self.init_kwargs is None: + self.init_kwargs = {"embed": False} + if not isinstance(self.init_kwargs, dict): + raise ValueError("init_kwargs must be a dictionary") + self.experiment_name = experiment_name + self.logger = logger if logger is not None else RFLogger() + self.active_runs = {} # Map run_id -> run_name + self.run_params = {} # Map run_id -> dict of params to log on init + + self._initialized = False + + def _ensure_initialized(self) -> None: + """Ensure TrackIO is initialized with the experiment.""" + if not self._initialized and self.experiment_name: + trackio.init(project=self.experiment_name, **self.init_kwargs) + self._initialized = True + + def create_experiment(self, experiment_name: str) -> str: + """Create a new experiment and set it as active.""" + self.experiment_name = experiment_name + self._ensure_initialized() + return experiment_name + + def get_experiment(self, experiment_name: str) -> str: + """Get existing experiment by name and set it as active.""" + self.experiment_name = experiment_name + self._ensure_initialized() + return experiment_name + + def create_run(self, run_name: str) -> str: + """Create a new run and return run_name as there is no run_id in TrackIO""" + self._ensure_initialized() + + # TrackIO uses run names directly, so we use run_name as the run_id + # Try to finish any existing run first + try: + trackio.finish() + except Exception: + pass # No active run to finish + + # Initialize a new run with the run name + try: + trackio.init(project=self.experiment_name, name=run_name, **self.init_kwargs) + except Exception: + # If init doesn't accept name, try without it + trackio.init(project=self.experiment_name, **self.init_kwargs) + + self.active_runs[run_name] = run_name + # Log any pending params for this run + if run_name in self.run_params: + trackio.log(self.run_params[run_name]) + del self.run_params[run_name] + + return run_name + + def log_param(self, run_id: str, key: str, value: str) -> None: + """Log parameters to a specific run.""" + # TrackIO logs params via the log() method + # Try to log immediately, or store for later if run not active + try: + self._ensure_initialized() + trackio.log({key: value}) + except Exception: + # Run not active, store for later when run is created + if run_id not in self.run_params: + self.run_params[run_id] = {} + self.run_params[run_id][key] = value + + def log_metric(self, _: str, key: str, value: float, step: int = None) -> None: + """Log a metric to a specific run.""" + # TrackIO uses log() with step in the dict + log_dict = {key: value} + if step is not None: + log_dict["step"] = step + self._ensure_initialized() + trackio.log(log_dict) + + def get_run_metrics(self, run_id: str) -> dict[str, list[tuple[int, float]]]: + """ + Get all metrics for a specific run. + + Note: TrackIO stores metrics locally. This method returns an empty dict + as TrackIO doesn't provide a direct API to retrieve historical metrics. + Metrics can be viewed using `trackio.show()`. + """ + # TrackIO doesn't provide a direct API to retrieve metrics programmatically + # Metrics are stored locally and can be viewed via trackio.show() + return {} + + def end_run(self, run_id: str) -> None: + """End a specific run.""" + try: + trackio.finish() + if run_id in self.active_runs: + del self.active_runs[run_id] + except Exception as e: + print(f"Error ending TrackIO run {run_id}: {e}") + + def delete_run(self, run_id: str) -> None: + """Delete a specific run.""" + try: + # TrackIO stores runs locally, deletion would require file system operations + # For now, we just remove from tracking + if run_id in self.active_runs: + del self.active_runs[run_id] + # Note: TrackIO doesn't have a delete_run API, runs are stored as local files + print(f"Note: TrackIO runs are stored locally. To delete run '{run_id}', remove its files manually.") + except Exception as e: + raise ValueError(f"Run '{run_id}' not found: {e}") + + def clear_context(self) -> None: + """Clear the TrackIO context by ending any active run.""" + try: + trackio.finish() + print("TrackIO context cleared successfully") + except Exception: + print("No active TrackIO run to clear") + diff --git a/setup/evals/requirements-local.txt b/setup/evals/requirements-local.txt index c4bde857..4689ee8e 100644 --- a/setup/evals/requirements-local.txt +++ b/setup/evals/requirements-local.txt @@ -34,4 +34,4 @@ datasets jupyter grpcio faiss-gpu-cu12 -mlflow>=3.2.0 +# mlflow>=3.2.0 diff --git a/setup/fit/requirements-local.txt b/setup/fit/requirements-local.txt index 46dcda79..a033a13b 100644 --- a/setup/fit/requirements-local.txt +++ b/setup/fit/requirements-local.txt @@ -13,7 +13,7 @@ click<8.3.0 ray<=2.49.0 # LLM Inference -transformers==4.56.1 +transformers>=4.56.1 # OpenAI API openai==1.106.1 @@ -57,8 +57,8 @@ nltk>=3.9.1 evaluate>=0.4.5 rouge-score>=0.1.2 sentencepiece>=0.2.1 -mlflow>=3.2.0 +# mlflow>=3.2.0 requests>=2.32.0 # Relaxed for Colab (2.32.4) loguru>=0.7.3 ipython>=7.34.0 # Colab compatibility (7.34.0) -jupyter>=1.1.1 \ No newline at end of file +jupyter>=1.1.1 diff --git a/setup/fit/start.sh b/setup/fit/start.sh index 9a579a7a..2a8f1894 100755 --- a/setup/fit/start.sh +++ b/setup/fit/start.sh @@ -31,11 +31,15 @@ RF_TIMEOUT_TIME=${RF_TIMEOUT_TIME:=30} # Colab mode configuration if [ -z "${COLAB_GPU+x}" ]; then - RF_TRACKING_BACKEND=${RF_TRACKING_BACKEND:=mlflow} + RF_MLFLOW_ENABLED=${RF_MLFLOW_ENABLED:=true} + RF_TENSORBOARD_ENABLED=${RF_TENSORBOARD_ENABLED:=false} + RF_TRACKIO_ENABLED=${RF_TRACKIO_ENABLED:=false} RF_COLAB_MODE=${RF_COLAB_MODE:=false} else echo "Google Colab environment detected" - RF_TRACKING_BACKEND=${RF_TRACKING_BACKEND:=tensorboard} + RF_MLFLOW_ENABLED=${RF_MLFLOW_ENABLED:=false} + RF_TENSORBOARD_ENABLED=${RF_TENSORBOARD_ENABLED:=true} + RF_TRACKIO_ENABLED=${RF_TRACKIO_ENABLED:=false} RF_COLAB_MODE=${RF_COLAB_MODE:=true} fi @@ -114,9 +118,13 @@ setup_python_env() { # Function to cleanup processes on exit cleanup() { - # Confirm cleanup - read -p "Do you want to shutdown services and delete the PID file? (y/n) " -n 1 -r REPLY - echo + if [ "$RF_FORCE" != "true" ]; then + # Confirm cleanup + read -p "Do you want to shutdown services and delete the PID file? (y/n) " -n 1 -r REPLY + echo + else + REPLY=y + fi if [[ $REPLY =~ ^[Yy]$ ]]; then print_warning "Shutting down services..." else @@ -351,10 +359,6 @@ start_mlflow() { grep -A 5 -B 2 "Error\|Exception\|Traceback\|Failed\|ImportError\|ModuleNotFoundError" "$RF_LOG_PATH/mlflow.log" | head -20 fi else - if [[ "$RF_COLAB_MODE" == "true" ]] && [[ "$RF_TRACKING_BACKEND" == "tensorboard" ]]; then - print_status "⊗ Skipping MLflow (using TensorBoard-only tracking in Colab mode)" - return 0 - fi print_error "No mlflow.log file found" fi @@ -368,19 +372,6 @@ start_mlflow() { fi } -# Function to conditionally start MLflow based on mode -start_mlflow_if_needed() { - # In Colab mode with pure TensorBoard, skip MLflow - if [[ "$RF_COLAB_MODE" == "true" ]] && [[ "$RF_TRACKING_BACKEND" == "tensorboard" ]]; then - print_status "⊗ Skipping MLflow (using TensorBoard-only tracking in Colab mode)" - return 0 - fi - - # Otherwise start MLflow - start_mlflow - return $? -} - # Function to start API server start_api_server() { print_status "Starting API server with Gunicorn..." @@ -628,7 +619,7 @@ show_status() { fi fi fi - if [[ "$RF_TRACKING_BACKEND" == "mlflow" ]] || [[ "$RF_TRACKING_BACKEND" == "both" ]]; then + if [[ "$RF_MLFLOW_ENABLED" == "true" ]]; then if ping_port $RF_MLFLOW_HOST $RF_MLFLOW_PORT; then print_success "🚀 MLflow server is ready!" else @@ -661,7 +652,7 @@ show_status() { fi # Only check mlflow.log if MLflow is running - if [[ "$RF_COLAB_MODE" != "true" ]] || [[ "$RF_TRACKING_BACKEND" != "tensorboard" ]]; then + if [[ "$RF_MLFLOW_ENABLED" == "true" ]]; then if [[ -f "$RF_LOG_PATH/mlflow.log" ]]; then local size=$(du -h "$RF_LOG_PATH/mlflow.log" | cut -f1) print_status "- $RF_LOG_PATH/mlflow.log: $size" @@ -688,7 +679,7 @@ start_services() { # Calculate total services based on mode # MLflow runs unless tensorboard-only in Colab - if [[ "$RF_COLAB_MODE" != "true" ]] || [[ "$RF_TRACKING_BACKEND" != "tensorboard" ]]; then + if [[ "$RF_MLFLOW_ENABLED" == "true" ]]; then ((total_services++)) fi @@ -705,14 +696,12 @@ start_services() { print_status "Starting $total_services service(s)..." # Start MLflow server (conditionally) - if [[ "$RF_COLAB_MODE" != "true" ]] || [[ "$RF_TRACKING_BACKEND" != "tensorboard" ]]; then + if [[ "$RF_MLFLOW_ENABLED" == "true" ]]; then if start_mlflow; then ((services_started++)) else print_error "Failed to start MLflow server" fi - else - print_status "⊗ Skipping MLflow (using TensorBoard-only tracking in Colab mode)" fi # Start API server (always) diff --git a/tests/conftest.py b/tests/conftest.py index b0b73ebb..8fe67e34 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -31,9 +31,9 @@ def mock_mlflow_manager(): Returns: Mock: Mocked MLflowManager with all required methods """ - from rapidfireai.fit.utils.mlflow_manager import MLflowManager + from rapidfireai.utils.metric_mlflow_manager import MLflowMetricLogger - mock = Mock(spec=MLflowManager) + mock = Mock(spec=MLflowMetricLogger) mock.create_run.return_value = "test_run_id" mock.log_param.return_value = None mock.log_metric.return_value = None diff --git a/tests/test_metric_logger.py b/tests/test_metric_logger.py index ed9e7c7c..ce3f2247 100644 --- a/tests/test_metric_logger.py +++ b/tests/test_metric_logger.py @@ -400,14 +400,14 @@ def test_dual_logger_conforms_to_interface(self, temp_tensorboard_dir): class TestCallbacksIntegration: """Test suite for integration with callbacks.""" - def test_mlflow_logging_callback_accepts_metric_logger(self): - """Test that MLflowLoggingCallback accepts metric_logger parameter.""" - from rapidfireai.ml.callbacks import MLflowLoggingCallback + def test_metric_logging_callback_accepts_metric_logger(self): + """Test that MetricLoggingCallback accepts metric_logger parameter.""" + from rapidfireai.ml.callbacks import MetricLoggingCallback mock_logger = Mock(spec=MetricLogger) - callback = MLflowLoggingCallback( + callback = MetricLoggingCallback( metric_logger=mock_logger, - mlflow_run_id="run_1", + metric_run_id="run_1", completed_steps=0, chunk_id=0, num_epochs_completed=0 @@ -430,7 +430,7 @@ def test_generation_metrics_callback_accepts_metric_logger(self): tokenizer=tokenizer, eval_dataset=dataset, metric_logger=mock_logger, - mlflow_run_id="run_1", + metric_run_id="run_1", completed_steps=0 ) @@ -438,13 +438,13 @@ def test_generation_metrics_callback_accepts_metric_logger(self): def test_callback_calls_log_metric(self): """Test that callback calls metric_logger.log_metric().""" - from rapidfireai.ml.callbacks import MLflowLoggingCallback + from rapidfireai.ml.callbacks import MetricLoggingCallback from transformers import TrainerState, TrainerControl, TrainingArguments mock_logger = Mock(spec=MetricLogger) - callback = MLflowLoggingCallback( + callback = MetricLoggingCallback( metric_logger=mock_logger, - mlflow_run_id="run_1", + metric_run_id="run_1", completed_steps=0, chunk_id=0, num_epochs_completed=0 @@ -463,13 +463,13 @@ def test_callback_calls_log_metric(self): def test_callback_step_offset_works_correctly(self): """Test that callbacks apply completed_steps offset correctly.""" - from rapidfireai.ml.callbacks import MLflowLoggingCallback + from rapidfireai.ml.callbacks import MetricLoggingCallback from transformers import TrainerState, TrainerControl, TrainingArguments mock_logger = Mock(spec=MetricLogger) - callback = MLflowLoggingCallback( + callback = MetricLoggingCallback( metric_logger=mock_logger, - mlflow_run_id="run_1", + metric_run_id="run_1", completed_steps=100, # Resumed run chunk_id=0, num_epochs_completed=0