diff --git a/.gitignore b/.gitignore index 752eab1..4f53485 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,8 @@ __pycache__/ -scripts/notebooks/ \ No newline at end of file +scripts/notebooks/ +matrix.egg-info/ +build/ +.pyre/ +.pyre_configuration +.pyre_configuration.local +.watchmanconfig diff --git a/matrix/app_server/app_api.py b/matrix/app_server/app_api.py index adb7570..732ab50 100644 --- a/matrix/app_server/app_api.py +++ b/matrix/app_server/app_api.py @@ -32,6 +32,7 @@ from matrix.client.endpoint_cache import EndpointCache from matrix.common.cluster_info import ClusterInfo, get_head_http_host from matrix.utils.basics import convert_to_json_compatible, sanitize_app_name +from matrix.utils.logging import get_logger from matrix.utils.os import download_s3_dir, lock_file, run_and_stream, run_async from matrix.utils.ray import ( ACTOR_NAME_SPACE, @@ -42,7 +43,7 @@ kill_matrix_actors, ) -logger = logging.getLogger("ray.serve") +logger = get_logger("ray.serve") DEPLOYMENT_YAML = "deployment.yaml" DEPLOYMENT_SGLANG_YAML = "deployment_sglang.yaml" diff --git a/matrix/app_server/code/code_execution_app.py b/matrix/app_server/code/code_execution_app.py index cb12ea2..6a1e8e5 100644 --- a/matrix/app_server/code/code_execution_app.py +++ b/matrix/app_server/code/code_execution_app.py @@ -15,9 +15,10 @@ from starlette.responses import JSONResponse from matrix.app_server.code.sandbox_runner import SandboxRunner +from matrix.utils.logging import get_logger CODE_EXEC_TIMEOUT = 10 -logger = logging.getLogger("ray.serve") +logger = get_logger("ray.serve") @serve.deployment(ray_actor_options={"num_cpus": 1, "num_gpus": 0}) diff --git a/matrix/app_server/llm/azure_openai_proxy.py b/matrix/app_server/llm/azure_openai_proxy.py index d8205d5..d8b952d 100644 --- a/matrix/app_server/llm/azure_openai_proxy.py +++ b/matrix/app_server/llm/azure_openai_proxy.py @@ -23,7 +23,9 @@ ErrorResponse, ) -logger = logging.getLogger("ray.serve") +from matrix.utils.logging import get_logger + +logger = get_logger("ray.serve") app = FastAPI() @@ -114,7 +116,7 @@ def build_app(cli_args: Dict[str, str]) -> serve.Application: arg_strings.extend([f"--{key}"]) else: arg_strings.extend([f"--{key}", str(value)]) - logger.info(arg_strings) + logger.info(",".join(arg_strings)) args = argparse.parse_args(args=arg_strings) diff --git a/matrix/app_server/llm/bedrock_proxy.py b/matrix/app_server/llm/bedrock_proxy.py index 0261e3f..25371c2 100644 --- a/matrix/app_server/llm/bedrock_proxy.py +++ b/matrix/app_server/llm/bedrock_proxy.py @@ -19,7 +19,9 @@ from starlette.requests import Request from vllm.entrypoints.openai.protocol import ChatCompletionRequest -logger = logging.getLogger("ray.serve") +from matrix.utils.logging import get_logger + +logger = get_logger("ray.serve") logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("openai._base_client").setLevel(logging.WARNING) @@ -124,11 +126,11 @@ def build_app(cli_args: Dict[str, str]) -> serve.Application: arg_strings.extend([f"--{key}"]) else: arg_strings.extend([f"--{key}", str(value)]) - logger.info(arg_strings) + logger.info(",".join(arg_strings)) args = argparse.parse_args(args=arg_strings) - logger.log(logging.INFO, f"args: {args}") + logger.info(f"args: {args}") assert "claude" in args.model_name.lower(), "Only Claude model is supported" return BedrockDeployment.options( # type: ignore[attr-defined] diff --git a/matrix/app_server/llm/deploy_sglang_app.py b/matrix/app_server/llm/deploy_sglang_app.py index a37c0e0..1f38cc3 100644 --- a/matrix/app_server/llm/deploy_sglang_app.py +++ b/matrix/app_server/llm/deploy_sglang_app.py @@ -27,10 +27,11 @@ from matrix.common.cluster_info import ClusterInfo from matrix.utils.http import fetch_url, post_url +from matrix.utils.logging import get_logger from matrix.utils.os import run_and_stream, stop_process from matrix.utils.ray import ACTOR_NAME_SPACE, get_matrix_actors, get_ray_head_node -logger = logging.getLogger("ray.sglang") +logger = get_logger("ray.sglang") handler = logging.StreamHandler() formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s") handler.setFormatter(formatter) diff --git a/matrix/app_server/llm/gemini_proxy.py b/matrix/app_server/llm/gemini_proxy.py index 6029115..6e8a93e 100644 --- a/matrix/app_server/llm/gemini_proxy.py +++ b/matrix/app_server/llm/gemini_proxy.py @@ -17,7 +17,9 @@ from starlette.requests import Request from vllm.entrypoints.openai.protocol import ChatCompletionRequest -logger = logging.getLogger("ray.serve") +from matrix.utils.logging import get_logger + +logger = get_logger("ray.serve") app = FastAPI() @@ -209,7 +211,7 @@ def build_app(cli_args: Dict[str, str]) -> serve.Application: arg_strings.extend([f"--{key}"]) else: arg_strings.extend([f"--{key}", str(value)]) - logger.info(arg_strings) + logger.info(",".join(arg_strings)) args = argparse.parse_args(args=arg_strings) diff --git a/matrix/app_server/llm/metagen_proxy.py b/matrix/app_server/llm/metagen_proxy.py index 3de1a25..6ddc69e 100644 --- a/matrix/app_server/llm/metagen_proxy.py +++ b/matrix/app_server/llm/metagen_proxy.py @@ -24,8 +24,9 @@ ) from matrix.utils.http import post_url +from matrix.utils.logging import get_logger -logger = logging.getLogger("ray.serve") +logger = get_logger("ray.serve") app = FastAPI() @@ -124,7 +125,7 @@ def build_app(cli_args: Dict[str, str]) -> serve.Application: arg_strings.extend([f"--{key}"]) else: arg_strings.extend([f"--{key}", str(value)]) - logger.info(arg_strings) + logger.info(",".join(arg_strings)) args = argparse.parse_args(args=arg_strings) diff --git a/matrix/app_server/llm/ray_serve_fastgen.py b/matrix/app_server/llm/ray_serve_fastgen.py index 9caaa8f..26eedc8 100644 --- a/matrix/app_server/llm/ray_serve_fastgen.py +++ b/matrix/app_server/llm/ray_serve_fastgen.py @@ -31,7 +31,9 @@ from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy from torch.distributed.device_mesh import DeviceMesh, init_device_mesh -logger = logging.getLogger("ray.serve") +from matrix.utils.logging import get_logger + +logger = get_logger("ray.serve") app = FastAPI() @@ -350,7 +352,7 @@ def parse_args(cli_args: Dict[str, str]): arg_strings.extend([f"--{key}"]) else: arg_strings.extend([f"--{key}", str(value)]) - logger.info(arg_strings) + logger.info(",".join(arg_strings)) parsed_args = parser.parse_args(args=arg_strings) return parsed_args diff --git a/matrix/app_server/llm/ray_serve_vllm.py b/matrix/app_server/llm/ray_serve_vllm.py index 2ecb200..c095315 100644 --- a/matrix/app_server/llm/ray_serve_vllm.py +++ b/matrix/app_server/llm/ray_serve_vllm.py @@ -41,6 +41,7 @@ ) from matrix.app_server.llm import openai_pb2 +from matrix.utils.logging import get_logger try: from vllm.entrypoints.openai.serving_engine import ( # type: ignore[attr-defined] @@ -76,7 +77,7 @@ "enable_tools", ] -logger = logging.getLogger("ray.serve") +logger = get_logger("ray.serve") app = FastAPI() @@ -534,7 +535,7 @@ def parse_vllm_args(cli_args: Dict[str, str]): arg_strings.extend([f"--{key}"]) else: arg_strings.extend([f"--{key}", str(value)]) - logger.info(arg_strings) + logger.info(",".join(arg_strings)) parsed_args = parser.parse_args(args=arg_strings) return parsed_args, deploy_args diff --git a/matrix/app_server/llm/sagemaker_proxy.py b/matrix/app_server/llm/sagemaker_proxy.py index 52bafee..647b532 100644 --- a/matrix/app_server/llm/sagemaker_proxy.py +++ b/matrix/app_server/llm/sagemaker_proxy.py @@ -18,7 +18,9 @@ from starlette.requests import Request from vllm.entrypoints.openai.protocol import ChatCompletionRequest -logger = logging.getLogger("ray.serve") +from matrix.utils.logging import get_logger + +logger = get_logger("ray.serve") logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("openai._base_client").setLevel(logging.WARNING) @@ -122,11 +124,11 @@ def build_app(cli_args: Dict[str, str]) -> serve.Application: arg_strings.extend([f"--{key}"]) else: arg_strings.extend([f"--{key}", str(value)]) - logger.info(arg_strings) + logger.info(",".join(arg_strings)) args = argparse.parse_args(args=arg_strings) - logger.log(logging.INFO, f"args: {args}") + logger.info(f"args: {args}") return SageMakerDeployment.options( # type: ignore[attr-defined] placement_group_bundles=pg_resources, diff --git a/matrix/app_server/vision/optical_flow.py b/matrix/app_server/vision/optical_flow.py index 8519b92..2b84756 100644 --- a/matrix/app_server/vision/optical_flow.py +++ b/matrix/app_server/vision/optical_flow.py @@ -21,8 +21,9 @@ from torch.utils.data import DataLoader, Dataset from matrix.app_server.vision.utils import SamplingMode, TorchCodecVideoDataset +from matrix.utils.logging import get_logger -logger = logging.getLogger("ray.serve") +logger = get_logger("ray.serve") logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("openai._base_client").setLevel(logging.WARNING) @@ -207,7 +208,7 @@ def build_app(cli_args: Dict[str, str]) -> serve.Application: for key, value in cli_args.items(): if value is not None: arg_strings.extend([f"--{key}", str(value)]) - logger.info(arg_strings) + logger.info(",".join(arg_strings)) args = argparse.parse_args(arg_strings) diff --git a/matrix/app_server/vision/perception_encoder.py b/matrix/app_server/vision/perception_encoder.py index 929051b..55bd2cd 100644 --- a/matrix/app_server/vision/perception_encoder.py +++ b/matrix/app_server/vision/perception_encoder.py @@ -29,8 +29,9 @@ execute_with_retry, get_image_transform, ) +from matrix.utils.logging import get_logger -logger = logging.getLogger("ray.serve") +logger = get_logger("ray.serve") logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("openai._base_client").setLevel(logging.WARNING) @@ -289,7 +290,7 @@ def build_app(cli_args: Dict[str, str]) -> serve.Application: for key, value in cli_args.items(): if value is not None: arg_strings.extend([f"--{key}", str(value)]) - logger.info(arg_strings) + logger.info(",".join(arg_strings)) args = argparse.parse_args(arg_strings) diff --git a/matrix/app_server/vision/utils.py b/matrix/app_server/vision/utils.py index dcbad4c..c5ce20c 100644 --- a/matrix/app_server/vision/utils.py +++ b/matrix/app_server/vision/utils.py @@ -20,7 +20,9 @@ from torch.utils.data import Dataset from torchcodec.decoders import VideoDecoder -logger = logging.getLogger("matrix.app_server.vision.utils") +from matrix.utils.logging import get_logger + +logger = get_logger("matrix.app_server.vision.utils") logging.getLogger("httpx").setLevel(logging.WARNING) diff --git a/matrix/client/client_utils.py b/matrix/client/client_utils.py index ebb0452..ea77798 100644 --- a/matrix/client/client_utils.py +++ b/matrix/client/client_utils.py @@ -13,8 +13,9 @@ import typing as tp from matrix.client.endpoint_cache import EndpointCache +from matrix.utils.logging import get_logger -logger = logging.getLogger("matrix.client.utils") +logger = get_logger("matrix.client.utils") logging.getLogger("httpx").setLevel(logging.WARNING) diff --git a/matrix/client/container_client.py b/matrix/client/container_client.py index 01b9a75..29fe344 100644 --- a/matrix/client/container_client.py +++ b/matrix/client/container_client.py @@ -11,8 +11,9 @@ import aiohttp from matrix.utils.http import post_url +from matrix.utils.logging import get_logger -logger = logging.getLogger(__name__) +logger = get_logger(__name__) class ContainerClient: diff --git a/matrix/client/endpoint_cache.py b/matrix/client/endpoint_cache.py index ba115a7..0651a48 100644 --- a/matrix/client/endpoint_cache.py +++ b/matrix/client/endpoint_cache.py @@ -11,9 +11,10 @@ from matrix.common.cluster_info import ClusterInfo, get_head_http_host from matrix.utils.http import fetch_url +from matrix.utils.logging import get_logger from matrix.utils.ray import get_ray_dashboard_address -logger = logging.getLogger("endpoint_cache") +logger = get_logger("endpoint_cache") class EndpointCache: diff --git a/matrix/client/execute_code.py b/matrix/client/execute_code.py index 8d758a2..a5fd164 100644 --- a/matrix/client/execute_code.py +++ b/matrix/client/execute_code.py @@ -19,12 +19,13 @@ from matrix.client.client_utils import get_an_endpoint_url, save_to_jsonl from matrix.client.endpoint_cache import EndpointCache +from matrix.utils.logging import get_logger # Configure logging for execute_code.py logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) -logger = logging.getLogger("execute_code") +logger = get_logger("execute_code") # Optionally suppress noisy logs from imported modules if not already done logging.getLogger("httpx").setLevel(logging.WARNING) diff --git a/matrix/client/process_vision_data.py b/matrix/client/process_vision_data.py index 0bb85c2..6f2c956 100644 --- a/matrix/client/process_vision_data.py +++ b/matrix/client/process_vision_data.py @@ -23,11 +23,12 @@ save_to_jsonl, ) from matrix.client.endpoint_cache import EndpointCache +from matrix.utils.logging import get_logger logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) -logger = logging.getLogger("process_vision_data") +logger = get_logger("process_vision_data") logging.getLogger("httpx").setLevel(logging.WARNING) diff --git a/matrix/client/query_llm.py b/matrix/client/query_llm.py index b7f7ea7..5b31816 100644 --- a/matrix/client/query_llm.py +++ b/matrix/client/query_llm.py @@ -32,13 +32,14 @@ from matrix.app_server.llm import openai_pb2, openai_pb2_grpc from matrix.client.client_utils import get_an_endpoint_url, save_to_jsonl from matrix.client.endpoint_cache import EndpointCache +from matrix.utils.logging import get_logger from matrix.utils.os import batch_requests_async, run_async CHAR_PER_TOKEN = 3.61 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) -logger = logging.getLogger("query_llm") +logger = get_logger("query_llm") logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("openai._base_client").setLevel(logging.WARNING) diff --git a/matrix/cluster/ray_dashboard_job.py b/matrix/cluster/ray_dashboard_job.py index 995dafa..4f660ba 100644 --- a/matrix/cluster/ray_dashboard_job.py +++ b/matrix/cluster/ray_dashboard_job.py @@ -15,11 +15,13 @@ import ray +from matrix.utils.logging import get_logger + # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) -logger = logging.getLogger("RayDashboardJob") +logger = get_logger("RayDashboardJob") @ray.remote(max_restarts=10) diff --git a/matrix/data_pipeline/clustering/sample.py b/matrix/data_pipeline/clustering/sample.py index 0954354..59cf304 100644 --- a/matrix/data_pipeline/clustering/sample.py +++ b/matrix/data_pipeline/clustering/sample.py @@ -131,7 +131,7 @@ def select_sample_ids_from_cluster(cluster_df): else: embeddings = np.vstack(cluster_df["embedding"]) centroid = np.mean(embeddings, axis=0) - logger.info(embeddings.shape, centroid.shape) + logger.info(f"{embeddings.shape}, {centroid.shape}") distances_to_centroid = np.linalg.norm(embeddings - centroid, axis=1) # First pick the closest point to centroid diff --git a/matrix/data_pipeline/clustering/utils.py b/matrix/data_pipeline/clustering/utils.py index 868d856..a1c98aa 100644 --- a/matrix/data_pipeline/clustering/utils.py +++ b/matrix/data_pipeline/clustering/utils.py @@ -20,12 +20,13 @@ from sentence_transformers import SentenceTransformer from matrix.utils.basics import get_nested_value, get_user_message_from_llama3_prompt +from matrix.utils.logging import get_logger # Basic Logging Setup logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) -logger = logging.getLogger(__name__) +logger = get_logger(__name__) # --- Model Saving/Loading --- diff --git a/matrix/data_pipeline/generate/vllm_generate.py b/matrix/data_pipeline/generate/vllm_generate.py index d60269d..effbddc 100644 --- a/matrix/data_pipeline/generate/vllm_generate.py +++ b/matrix/data_pipeline/generate/vllm_generate.py @@ -15,7 +15,9 @@ from fire import Fire from vllm import LLM, SamplingParams -logger = logging.getLogger(__name__) +from matrix.utils.logging import get_logger + +logger = get_logger(__name__) USER_MESSAGE = "" diff --git a/matrix/job/eval_utils.py b/matrix/job/eval_utils.py index c43c284..d667fc0 100644 --- a/matrix/job/eval_utils.py +++ b/matrix/job/eval_utils.py @@ -15,9 +15,11 @@ import numpy as np +from matrix.utils.logging import get_logger + # Set up logging logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) +logger = get_logger(__name__) # Checkpoint evaluation constants and helper functions DEFAULT_CONFIG = [ diff --git a/matrix/job/job_api.py b/matrix/job/job_api.py index a6fae3b..0fe4505 100644 --- a/matrix/job/job_api.py +++ b/matrix/job/job_api.py @@ -25,9 +25,10 @@ undeploy_helper, ) from matrix.utils.basics import str_to_callable +from matrix.utils.logging import get_logger from matrix.utils.ray import Action, get_ray_address, get_ray_head_node -logger = logging.getLogger(__name__) +logger = get_logger(__name__) class JobApi: diff --git a/matrix/job/job_manager.py b/matrix/job/job_manager.py index 3298b76..8f053b1 100644 --- a/matrix/job/job_manager.py +++ b/matrix/job/job_manager.py @@ -24,11 +24,12 @@ import ray from matrix.job.job_utils import JobAlreadyExist, JobNotFound, generate_task_id +from matrix.utils.logging import get_logger from matrix.utils.ray import status_is_failure, status_is_pending, status_is_success # Configure logging logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) +logger = get_logger(__name__) # --- Constants --- MAX_RETRIES = 3 diff --git a/matrix/matrix_server.py b/matrix/matrix_server.py index 90e4d02..01f1f5f 100644 --- a/matrix/matrix_server.py +++ b/matrix/matrix_server.py @@ -20,12 +20,13 @@ from matrix import Cli from matrix.job.eval_utils import * from matrix.job.job_api import JobApi +from matrix.utils.logging import get_logger from matrix.utils.os import download_s3_dir, find_free_ports, is_port_available from matrix.utils.ray import get_ray_address # Set up logging logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) +logger = get_logger(__name__) app = FastAPI(title="Job API Service", description="HTTP Service for Matrix Job API") diff --git a/matrix/utils/logging.py b/matrix/utils/logging.py new file mode 100644 index 0000000..fede783 --- /dev/null +++ b/matrix/utils/logging.py @@ -0,0 +1,141 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the license found in the +# LICENSE file in the root directory of this source tree. + +""" +Logging utility that supports both standard logging and Scuba/Hive logging via frogger2. +Automatically uses Scuba/Hive when running in Meta cloud environments. +""" + +import logging +import os +from enum import Enum, auto, unique + +# Try to import frogger2, fall back to None if not available +FROGGER_AVAILABLE: bool = False +OTEL_EXPORTER_OTLP_ENDPOINT: str | None = None +try: + from frogger2.applications.matrix import log # type: ignore + + matrix_logger = log + FROGGER_AVAILABLE = True + OTEL_EXPORTER_OTLP_ENDPOINT = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "") +except ImportError: + matrix_logger = None + FROGGER_AVAILABLE = False + OTEL_EXPORTER_OTLP_ENDPOINT = None + + +@unique +class MatrixLogLevel(Enum): + DEBUG = auto() + INFO = auto() + WARNING = auto() + ERROR = auto() + CRITICAL = auto() + + +class MatrixLogger: + """Logger that supports both standard logging and Scuba logging.""" + + def __init__(self, name: str | None = None, level: int = logging.INFO): + """ + Initialize a Matrix logger. + + Args: + name: Logger name (defaults to __name__) + level: Logging level (default: INFO) + """ + self.logger = logging.getLogger(name or __name__) + self.logger.setLevel(level) + self.use_frogger = FROGGER_AVAILABLE and OTEL_EXPORTER_OTLP_ENDPOINT + + @property + def handlers(self): + """Access underlying logger's handlers for configuration.""" + return self.logger.handlers + + def addHandler(self, handler): + """Add a handler to the underlying logger.""" + self.logger.addHandler(handler) + + def removeHandler(self, handler): + """Remove a handler from the underlying logger.""" + self.logger.removeHandler(handler) + + def setLevel(self, level): + """Set logging level.""" + self.logger.setLevel(level) + + def transmit(self, level: MatrixLogLevel, message: str, **kwargs): + """Log to Scuba if available.""" + try: + matrix_logger(log_level=level.name, log_message=message, **kwargs) + except Exception as exn: + self.logger.error(f"MatrixLogger failed to transmit payload: {exn}") + + def _log(self, level: MatrixLogLevel, message: str, **kwargs) -> None: + """Log info message to both standard logger and Hive/Scuba.""" + local_log = getattr(self.logger, level.name.lower()) + + if self.use_frogger: + # Extract structured fields for OpenTelemetry payload + extra = { + k: v + for k, v in kwargs.items() + if k in ["job_id", "duration_seconds", "num_samples", "status"] + } + local_log(message, extra=extra) + self.transmit(level=level, message=message, **extra) + else: + # Standard logging fallback + if kwargs: + # Format kwargs as `key=value` pairs + kwargs_str: str = " ".join(f"{k}={v}" for k, v in kwargs.items()) + formatted_message: str = f"{message} [{kwargs_str}]" + local_log(formatted_message, extra=kwargs) + else: + local_log(message) + + def debug(self, message: str, **kwargs) -> None: + self._log(level=MatrixLogLevel.DEBUG, message=message, **kwargs) + + def info(self, message: str, **kwargs) -> None: + self._log(level=MatrixLogLevel.INFO, message=message, **kwargs) + + def warning(self, message: str, **kwargs) -> None: + self._log(level=MatrixLogLevel.WARNING, message=message, **kwargs) + + def error(self, message: str, **kwargs) -> None: + self._log(level=MatrixLogLevel.ERROR, message=message, **kwargs) + + def critical(self, message: str, **kwargs) -> None: + self._log(level=MatrixLogLevel.CRITICAL, message=message, **kwargs) + + +def get_logger( + name: str | None = None, level: int = logging.INFO +) -> MatrixLogger | logging.Logger: + """ + Get a logger instance + + Args: + name: Logger name (defaults to calling module's `__name__`) + level: Logger level (default: `INFO`) + + Returns: + `MatrixLogger` or `logging.Logger` + """ + + if FROGGER_AVAILABLE: + return MatrixLogger(name=name, level=level) + else: + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + ) + logger = logging.getLogger(name) + logger.setLevel(level) + return logger diff --git a/matrix/utils/os.py b/matrix/utils/os.py index 84f4c04..34e31ab 100644 --- a/matrix/utils/os.py +++ b/matrix/utils/os.py @@ -26,7 +26,9 @@ import psutil from tqdm import tqdm -logger = logging.getLogger(__name__) +from matrix.utils.logging import get_logger + +logger = get_logger(__name__) def kill_proc_tree(pid, including_parent=True): diff --git a/pyproject.toml b/pyproject.toml index 5a1a9cb..6bbe606 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -181,6 +181,9 @@ classifiers=[ "flash-attn==2.7.3", "fastgen @ git+https://github.com/dongwang218/fastgen.git" ] + meta = [ + "frogger2 @ git+ssh://git@github.com/fairinternal/frogger2.git" + ] [project.scripts] matrix="matrix.cli:main" diff --git a/tests/integration/utils/test_matrix_logging.py b/tests/integration/utils/test_matrix_logging.py new file mode 100644 index 0000000..d5a822b --- /dev/null +++ b/tests/integration/utils/test_matrix_logging.py @@ -0,0 +1,273 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +import logging +from unittest.mock import MagicMock, patch + +import pytest + +from matrix.utils.logging import MatrixLogger, MatrixLogLevel, get_logger + + +@pytest.fixture(scope="session", autouse=True) +def configure_logging(): + """Configure logging for frogger2 transmission.""" + # Set the frogger2 logger to INFO level so transmissions work + logging.getLogger("fair_matrix").setLevel(logging.INFO) + # Also ensure root logger doesn't filter it out + logging.basicConfig(level=logging.INFO) + + +class TestMatrixLogger: + """Test suite for MatrixLogger functionality""" + + def test_matrix_logger_initialization(self): + TEST_LOGGER: str = "test_logger" + logger = MatrixLogger(name=TEST_LOGGER, level=logging.INFO) + assert logger.logger.name == TEST_LOGGER + + def test_get_logger_returns_matrix_logger_with_forgger(self): + with patch("matrix.utils.logging.FROGGER_AVAILABLE", True): + with patch( + "matrix.utils.logging.OTEL_EXPORTER_OTLP_ENDPOINT", + "http://localhost:11000", + ): + logger = get_logger("test") + assert isinstance(logger, MatrixLogger) + + def test_get_logger_returns_standard_logger_without_frogger(self): + with patch("matrix.utils.logging.FROGGER_AVAILABLE", False): + logger = get_logger("test") + assert isinstance(logger, logging.Logger) + assert not isinstance(logger, MatrixLogger) + + def test_matrix_logger_info_with_frogger(self): + with patch("matrix.utils.logging.FROGGER_AVAILABLE", True): + with patch( + "matrix.utils.logging.OTEL_EXPORTER_OTLP_ENDPOINT", + "http://localhost:11000", + ): + with patch("matrix.utils.logging.matrix_logger") as mock_frogger: + logger = MatrixLogger(name="test") + logger.use_frogger = True + + # Mock the underlying standard logger + with patch.object(logger.logger, "info") as mock_info: + logger.info("Test message", job_id="123") + + # Verify standard logger was called + mock_info.assert_called_once() + + # Verify frogger transmit was called + mock_frogger.assert_called_once_with( + log_level=MatrixLogLevel.INFO.name, + log_message="Test message", + job_id="123", + ) + + def test_matrix_logger_info_without_frogger(self): + logger = MatrixLogger(name="test") + logger.use_frogger = False + + with patch.object(logger.logger, "info") as mock_info: + logger.info("Test message", status=True) + + # Verify standard logger was called with formatted message + mock_info.assert_called_once() + call_args = mock_info.call_args + assert "Test message" in call_args[0][0] + assert "status=True" in call_args[0][0] + + def test_matrix_logger_all_levels(self): + logger = MatrixLogger(name="test") + logger.use_frogger = False + + levels = ["debug", "info", "warning", "error", "critical"] + + for level in levels: + with patch.object(logger.logger, level) as mock_log: + log_method = getattr(logger, level) + log_method(f"Test {level} message") + mock_log.assert_called_once() + + def test_matrix_logger_handlers(self): + logger = MatrixLogger(name="test") + + # Test adding handler + handler = logging.StreamHandler() + logger.addHandler(handler) + assert handler in logger.handlers + + # Test removing handler + logger.removeHandler(handler) + assert handler not in logger.handlers + + def test_matrix_logger_set_level(self): + logger = MatrixLogger(name="test", level=logging.INFO) + assert logger.logger.level == logging.INFO + + logger.setLevel(logging.DEBUG) + assert logger.logger.level == logging.DEBUG + + def test_transmit_handles_exceptions(self): + with patch("matrix.utils.logging.FROGGER_AVAILABLE", True): + with patch( + "matrix.utils.logging.OTEL_EXPORTER_OTLP_ENDPOINT", + "http://localhost:4317", + ): + with patch( + "matrix.utils.logging.matrix_logger", + side_effect=Exception("Connection failed"), + ): + logger = MatrixLogger(name="test") + logger.use_frogger = True + + with patch.object(logger.logger, "error") as mock_error: + logger.transmit(MatrixLogLevel.INFO, "Test message") + + mock_error.assert_called_once() + assert "MatrixLogger failed to transmit payload" in str( + mock_error.call_args + ) + + +class TestMatrixLoggerIntegration: + def test_ray_dashboard_job_logging(self): + from matrix.cluster.ray_dashboard_job import logger + + assert hasattr(logger, "info") + assert hasattr(logger, "error") + assert hasattr(logger, "warning") + + with patch.object(logger, "info") as mock_info: + logger.info("Test log message") + mock_info.assert_called_once() + + def test_logger_with_structured_data(self): + logger = MatrixLogger(name="test") + logger.use_frogger = False + + with patch.object(logger.logger, "info") as mock_info: + logger.info( + "Job completed", + job_id="job_123", + duration_seconds=42.5, + num_samples=1000, + status=True, + ) + + # Verify all fields are in the logged message + call_args = mock_info.call_args[0][0] + assert "job_id=job_123" in call_args + assert "duration_seconds=42.5" in call_args + assert "num_samples=1000" in call_args + assert "status=True" in call_args + + +class TestMatrixLoggerWithFrogger: + """requires frogger2 to be installed""" + + from matrix.utils.logging import FROGGER_AVAILABLE, OTEL_EXPORTER_OTLP_ENDPOINT + + @pytest.mark.skipif( + not FROGGER_AVAILABLE or not OTEL_EXPORTER_OTLP_ENDPOINT, + reason="Requires frogger2 and an OTel gateway", + ) + def test_transmission(self): + from matrix.utils.logging import FROGGER_AVAILABLE + + assert FROGGER_AVAILABLE, "frogger2 must be available for this test" + logger = MatrixLogger(name="test_frogger") + assert logger.use_frogger, "Logger should be configured to use frogger2" + + success: bool = False + try: + logger.info( + "Integration test message", + job_id="test_job_123", + duration=1.5, + status=True, + ) + success = True + except Exception as exn: + pytest.fail(f"Transmission failed with error: {str(exn)}") + + assert success, "Transmission should complete without errors" + + @pytest.mark.skipif( + not FROGGER_AVAILABLE or not OTEL_EXPORTER_OTLP_ENDPOINT, + reason="Requires frogger2 and an OTel gateway", + ) + def test_frogger_transmission_with_all_log_levels(self): + """Test transmission at all log levels.""" + logger = MatrixLogger(name="test_levels") + + test_cases = [ + (logger.debug, MatrixLogLevel.DEBUG, "Debug message"), + (logger.info, MatrixLogLevel.INFO, "Info message"), + (logger.warning, MatrixLogLevel.WARNING, "Warning message"), + (logger.error, MatrixLogLevel.ERROR, "Error message"), + (logger.critical, MatrixLogLevel.CRITICAL, "Critical message"), + ] + + for log_method, level, message in test_cases: + try: + log_method(message, job_id=f"test_{level.name.lower()}") + except Exception as exn: + pytest.fail(f"Failed to transmit {level.name}: {exn}") + + @pytest.mark.skipif( + not FROGGER_AVAILABLE or not OTEL_EXPORTER_OTLP_ENDPOINT, + reason="Requires frogger2 and an OTel gateway", + ) + def test_frogger_transmission_with_structured_fields(self): + """Test that structured fields are properly transmitted.""" + logger = MatrixLogger(name="test_structured") + + # These are the fields that should be extracted for OTLP + structured_data = { + "job_id": "integration_test_456", + "duration_seconds": 123.45, + "num_samples": 5000, + "status": "completed", + } + + try: + logger.info("Structured logging test", **structured_data) + except Exception as e: + pytest.fail(f"Structured transmission failed: {e}") + + @pytest.mark.skipif( + not FROGGER_AVAILABLE or not OTEL_EXPORTER_OTLP_ENDPOINT, + reason="Requires frogger2 and an OTel gateway", + ) + def test_verify_local_and_remote_logging_both_occur(self): + """Verify that both local logging and remote transmission happen.""" + logger = MatrixLogger(name="test_dual") + + # Capture local logs + import io + + log_capture = io.StringIO() + handler = logging.StreamHandler(log_capture) + handler.setLevel(logging.INFO) + formatter = logging.Formatter("%(levelname)s - %(message)s") + handler.setFormatter(formatter) + + logger.addHandler(handler) + + # Log a message + test_message = "Dual logging verification test" + logger.info(test_message, job_id="dual_test_001") + + # Verify local log was written + log_output = log_capture.getvalue() + assert test_message in log_output, "Local log should contain the message" + assert len(log_output) > 0, "Local log should not be empty" + + # Note: We can't easily verify remote transmission without querying the OTLP endpoint, + # but if no exception was raised, transmission was attempted + logger.removeHandler(handler)