From 260463fa6ab3544dbda4f0b8b4448c8f2adf3e39 Mon Sep 17 00:00:00 2001 From: Maxim Svistunov Date: Mon, 16 Feb 2026 12:36:57 +0100 Subject: [PATCH 1/4] Add configuration snapshot for telemetry that masks PII --- src/telemetry/__init__.py | 7 + src/telemetry/configuration_snapshot.py | 505 +++++++++++++ tests/unit/telemetry/__init__.py | 1 + tests/unit/telemetry/conftest.py | 388 ++++++++++ .../telemetry/test_configuration_snapshot.py | 703 ++++++++++++++++++ 5 files changed, 1604 insertions(+) create mode 100644 src/telemetry/__init__.py create mode 100644 src/telemetry/configuration_snapshot.py create mode 100644 tests/unit/telemetry/__init__.py create mode 100644 tests/unit/telemetry/conftest.py create mode 100644 tests/unit/telemetry/test_configuration_snapshot.py diff --git a/src/telemetry/__init__.py b/src/telemetry/__init__.py new file mode 100644 index 000000000..3db6e61de --- /dev/null +++ b/src/telemetry/__init__.py @@ -0,0 +1,7 @@ +"""Telemetry module for configuration snapshot collection. + +This module provides functionality for building configuration snapshots +with PII masking for telemetry purposes. Snapshots collect a specific +set of configuration entries from both lightspeed-stack and llama-stack +configurations, applying appropriate masking to prevent PII leakage. +""" diff --git a/src/telemetry/configuration_snapshot.py b/src/telemetry/configuration_snapshot.py new file mode 100644 index 000000000..bf32d3f75 --- /dev/null +++ b/src/telemetry/configuration_snapshot.py @@ -0,0 +1,505 @@ +"""Configuration snapshot with PII masking for telemetry. + +This module creates snapshots of configuration at startup, masking all PII +and using logical feature collection. It collects a specific allowlisted set +of configuration entries from both lightspeed-stack and llama-stack +configurations rather than automatically grabbing the whole configuration. + +The snapshot is built as a JSON-serializable dict ready for telemetry emission. +No integration with ingress is provided here — only methods to build the JSON. +""" + +import logging +from dataclasses import dataclass +from enum import Enum +from pathlib import PurePath +from typing import Any, Optional + +import yaml +from pydantic import SecretStr + +from models.config import Configuration + +logger = logging.getLogger(__name__) + +# Masking output constants +CONFIGURED = "configured" +NOT_CONFIGURED = "not_configured" +NOT_AVAILABLE = "not_available" + + +class MaskingType(Enum): + """Type of masking to apply to a configuration field. + + Attributes: + PASSTHROUGH: Value is returned as-is (booleans, numbers, identifiers). + SENSITIVE: Value is replaced with 'configured' or 'not_configured' + (credentials, URLs, file paths, hostnames). + """ + + PASSTHROUGH = "passthrough" + SENSITIVE = "sensitive" + + +@dataclass(frozen=True) +class FieldSpec: + """Specification for a single configuration field to collect. + + Attributes: + path: Dotted path to the field in the configuration object. + masking: Type of masking to apply to the field value. + """ + + path: str + masking: MaskingType + + +@dataclass(frozen=True) +class ListFieldSpec: + """Specification for a list field with per-item sub-fields to collect. + + Attributes: + path: Dotted path to the list field in the configuration object. + item_fields: Sub-field specifications to extract from each list item. + """ + + path: str + item_fields: tuple[FieldSpec, ...] + + +# ============================================================================= +# Field Registries +# ============================================================================= + +LIGHTSPEED_STACK_FIELDS: tuple[FieldSpec | ListFieldSpec, ...] = ( + # Operational + FieldSpec("name", MaskingType.PASSTHROUGH), + # Core Service Configuration + FieldSpec("service.workers", MaskingType.PASSTHROUGH), + FieldSpec("service.host", MaskingType.SENSITIVE), + FieldSpec("service.port", MaskingType.PASSTHROUGH), + FieldSpec("service.auth_enabled", MaskingType.PASSTHROUGH), + FieldSpec("service.color_log", MaskingType.PASSTHROUGH), + FieldSpec("service.access_log", MaskingType.PASSTHROUGH), + FieldSpec("service.tls_config.tls_certificate_path", MaskingType.SENSITIVE), + FieldSpec("service.tls_config.tls_key_path", MaskingType.SENSITIVE), + FieldSpec("service.tls_config.tls_key_password", MaskingType.SENSITIVE), + FieldSpec("service.cors.allow_origins", MaskingType.SENSITIVE), + FieldSpec("service.cors.allow_credentials", MaskingType.PASSTHROUGH), + FieldSpec("service.cors.allow_methods", MaskingType.PASSTHROUGH), + FieldSpec("service.cors.allow_headers", MaskingType.PASSTHROUGH), + # LLM Integration Architecture + FieldSpec("llama_stack.use_as_library_client", MaskingType.PASSTHROUGH), + FieldSpec("llama_stack.url", MaskingType.SENSITIVE), + FieldSpec("llama_stack.api_key", MaskingType.SENSITIVE), + FieldSpec("llama_stack.library_client_config_path", MaskingType.SENSITIVE), + FieldSpec("inference.default_model", MaskingType.PASSTHROUGH), + FieldSpec("inference.default_provider", MaskingType.PASSTHROUGH), + # Authentication & Authorization + FieldSpec("authentication.module", MaskingType.PASSTHROUGH), + FieldSpec("authentication.skip_tls_verification", MaskingType.PASSTHROUGH), + FieldSpec("authentication.k8s_cluster_api", MaskingType.SENSITIVE), + FieldSpec("authentication.k8s_ca_cert_path", MaskingType.SENSITIVE), + FieldSpec("authentication.jwk_config.url", MaskingType.SENSITIVE), + FieldSpec( + "authentication.jwk_config.jwt_configuration.user_id_claim", + MaskingType.PASSTHROUGH, + ), + FieldSpec( + "authentication.jwk_config.jwt_configuration.username_claim", + MaskingType.PASSTHROUGH, + ), + ListFieldSpec( + "authentication.jwk_config.jwt_configuration.role_rules", + item_fields=( + FieldSpec("jsonpath", MaskingType.PASSTHROUGH), + FieldSpec("operator", MaskingType.PASSTHROUGH), + FieldSpec("value", MaskingType.SENSITIVE), + FieldSpec("roles", MaskingType.PASSTHROUGH), + FieldSpec("negate", MaskingType.PASSTHROUGH), + ), + ), + ListFieldSpec( + "authorization.access_rules", + item_fields=( + FieldSpec("role", MaskingType.PASSTHROUGH), + FieldSpec("actions", MaskingType.PASSTHROUGH), + ), + ), + # User Data Collection Features + FieldSpec("user_data_collection.feedback_enabled", MaskingType.PASSTHROUGH), + FieldSpec("user_data_collection.feedback_storage", MaskingType.SENSITIVE), + FieldSpec("user_data_collection.transcripts_enabled", MaskingType.PASSTHROUGH), + FieldSpec("user_data_collection.transcripts_storage", MaskingType.SENSITIVE), + # AI/ML Capabilities Configuration + FieldSpec("customization.system_prompt", MaskingType.SENSITIVE), + FieldSpec("customization.system_prompt_path", MaskingType.SENSITIVE), + FieldSpec("customization.disable_query_system_prompt", MaskingType.PASSTHROUGH), + # Database & Storage Configuration + FieldSpec("database.sqlite.db_path", MaskingType.SENSITIVE), + FieldSpec("database.postgres.host", MaskingType.SENSITIVE), + FieldSpec("database.postgres.port", MaskingType.PASSTHROUGH), + FieldSpec("database.postgres.db", MaskingType.SENSITIVE), + FieldSpec("database.postgres.user", MaskingType.SENSITIVE), + FieldSpec("database.postgres.password", MaskingType.SENSITIVE), + FieldSpec("database.postgres.namespace", MaskingType.SENSITIVE), + FieldSpec("database.postgres.ssl_mode", MaskingType.PASSTHROUGH), + FieldSpec("database.postgres.gss_encmode", MaskingType.PASSTHROUGH), + FieldSpec("database.postgres.ca_cert_path", MaskingType.SENSITIVE), + # Integration & Connectivity + ListFieldSpec( + "mcp_servers", + item_fields=( + FieldSpec("name", MaskingType.PASSTHROUGH), + FieldSpec("provider_id", MaskingType.PASSTHROUGH), + FieldSpec("url", MaskingType.SENSITIVE), + ), + ), +) + +LLAMA_STACK_FIELDS: tuple[FieldSpec | ListFieldSpec, ...] = ( + # Operational Configuration + FieldSpec("version", MaskingType.PASSTHROUGH), + FieldSpec("image_name", MaskingType.PASSTHROUGH), + FieldSpec("container_image", MaskingType.PASSTHROUGH), + FieldSpec("external_providers_dir", MaskingType.SENSITIVE), + FieldSpec("server.host", MaskingType.SENSITIVE), + FieldSpec("server.port", MaskingType.PASSTHROUGH), + FieldSpec("server.auth", MaskingType.SENSITIVE), + FieldSpec("server.quota", MaskingType.SENSITIVE), + FieldSpec("server.tls_cafile", MaskingType.SENSITIVE), + FieldSpec("server.tls_certfile", MaskingType.SENSITIVE), + FieldSpec("server.tls_keyfile", MaskingType.SENSITIVE), + FieldSpec("logging", MaskingType.PASSTHROUGH), + # APIs + FieldSpec("apis", MaskingType.PASSTHROUGH), + # Models + ListFieldSpec( + "registered_resources.models", + item_fields=( + FieldSpec("model_id", MaskingType.PASSTHROUGH), + FieldSpec("provider_id", MaskingType.PASSTHROUGH), + FieldSpec("provider_model_id", MaskingType.PASSTHROUGH), + FieldSpec("model_type", MaskingType.PASSTHROUGH), + ), + ), + # Shields + ListFieldSpec( + "registered_resources.shields", + item_fields=( + FieldSpec("shield_id", MaskingType.PASSTHROUGH), + FieldSpec("provider_id", MaskingType.PASSTHROUGH), + ), + ), + # Vector stores + ListFieldSpec( + "registered_resources.vector_stores", + item_fields=( + FieldSpec("vector_store_id", MaskingType.PASSTHROUGH), + FieldSpec("provider_id", MaskingType.PASSTHROUGH), + ), + ), + # Providers — extract only provider_id and provider_type per entry + *( + ListFieldSpec( + f"providers.{provider_name}", + item_fields=( + FieldSpec("provider_id", MaskingType.PASSTHROUGH), + FieldSpec("provider_type", MaskingType.PASSTHROUGH), + ), + ) + for provider_name in ( + "inference", + "safety", + "vector_io", + "agents", + "tool_runtime", + "datasetio", + "post_training", + "eval", + "telemetry", + "scoring", + ) + ), + # Simple list fields — pass through as-is (typically enums/identifiers) + FieldSpec("benchmarks", MaskingType.PASSTHROUGH), + FieldSpec("scoring_fns", MaskingType.PASSTHROUGH), + FieldSpec("datasets", MaskingType.PASSTHROUGH), +) + + +# ============================================================================= +# Value Extraction and Masking +# ============================================================================= + + +def get_nested_value(obj: Any, path: str) -> Any: + """Navigate a nested object by dotted path. + + Supports both Pydantic models (via getattr) and dicts (via get). + Returns None if any intermediate value is None or missing. + + Parameters: + obj: The root object to traverse (Pydantic model or dict). + path: Dotted path to the target field (e.g., "service.tls_config.tls_key_path"). + + Returns: + The value at the specified path, or None if not found. + """ + current = obj + for part in path.split("."): + if current is None: + return None + if isinstance(current, dict): + current = current.get(part) + else: + current = getattr(current, part, None) + return current + + +def _serialize_passthrough(value: Any) -> Any: + """Convert a passthrough value to JSON-serializable form. + + Parameters: + value: The value to serialize. + + Returns: + A JSON-serializable representation of the value. + """ + if value is None: + return None + if isinstance(value, (bool, int, float, str)): + return value + if isinstance(value, Enum): + return value.value + if isinstance(value, (list, tuple)): + return [_serialize_passthrough(v) for v in value] + if isinstance(value, dict): + return {str(k): _serialize_passthrough(v) for k, v in value.items()} + # Safety: mask SecretStr and file paths; stringify everything else + return CONFIGURED if isinstance(value, (SecretStr, PurePath)) else str(value) + + +def mask_value(value: Any, masking: MaskingType) -> Any: + """Apply masking to a configuration value. + + Parameters: + value: The raw configuration value. + masking: The masking type to apply. + + Returns: + The masked or serialized value. + """ + if masking == MaskingType.SENSITIVE: + if value is None: + return NOT_CONFIGURED + return CONFIGURED + return _serialize_passthrough(value) + + +def _set_nested_value(target: dict[str, Any], path: str, value: Any) -> None: + """Set a value in a nested dict by dotted path, creating intermediates. + + Parameters: + target: The target dict to modify. + path: Dotted path where the value should be set. + value: The value to set. + """ + parts = path.split(".") + current = target + for part in parts[:-1]: + if part not in current: + current[part] = {} + current = current[part] + current[parts[-1]] = value + + +def _extract_field(source: Any, spec: FieldSpec) -> Any: + """Extract and mask a single field from a source object. + + Parameters: + source: The source object (Pydantic model or dict). + spec: The field specification. + + Returns: + The masked value of the field. + """ + value = get_nested_value(source, spec.path) + return mask_value(value, spec.masking) + + +def _extract_list_field(source: Any, spec: ListFieldSpec) -> list[dict[str, Any]] | str: + """Extract and mask a list field with per-item sub-fields. + + Parameters: + source: The source object (Pydantic model or dict). + spec: The list field specification. + + Returns: + A list of dicts with masked sub-fields, or NOT_CONFIGURED if the + list is None. + """ + items = get_nested_value(source, spec.path) + if items is None: + return NOT_CONFIGURED + if not isinstance(items, (list, tuple)): + return NOT_CONFIGURED + return [ + { + field_spec.path: mask_value( + get_nested_value(item, field_spec.path), + field_spec.masking, + ) + for field_spec in spec.item_fields + } + for item in items + ] + + +def _extract_snapshot_fields( + source: Any, + field_registry: tuple[FieldSpec | ListFieldSpec, ...], +) -> dict[str, Any]: + """Extract and mask fields from a source according to the field registry. + + Parameters: + source: The source object (Pydantic model or dict). + field_registry: Tuple of field specifications defining what to extract. + + Returns: + A nested dict containing the extracted and masked fields. + """ + snapshot: dict[str, Any] = {} + for spec in field_registry: + if isinstance(spec, ListFieldSpec): + value = _extract_list_field(source, spec) + else: + value = _extract_field(source, spec) + _set_nested_value(snapshot, spec.path, value) + return snapshot + + +# ============================================================================= +# Llama Stack Storage Field Extraction +# ============================================================================= + + +def _extract_store_info(ls_config: dict[str, Any], store_name: str) -> dict[str, Any]: + """Extract store type and db_path from llama-stack storage configuration. + + Resolves the store → backend → type/db_path chain in the llama-stack + storage config structure. + + Parameters: + ls_config: The parsed llama-stack configuration dict. + store_name: Name of the store to look up (e.g., "inference", "metadata"). + + Returns: + A dict with 'type' and 'db_path' keys, plus 'namespace' for metadata store. + """ + store = get_nested_value(ls_config, f"storage.stores.{store_name}") + if store is None or not isinstance(store, dict): + return {"type": NOT_CONFIGURED, "db_path": NOT_CONFIGURED} + + backend_name = store.get("backend") + if backend_name is None: + return {"type": NOT_CONFIGURED, "db_path": NOT_CONFIGURED} + + backends = get_nested_value(ls_config, "storage.backends") or {} + backend = backends.get(backend_name, {}) + + result: dict[str, Any] = { + "type": backend.get("type", NOT_CONFIGURED), + "db_path": CONFIGURED if backend.get("db_path") else NOT_CONFIGURED, + } + + if store_name == "metadata": + result["namespace"] = store.get("namespace", NOT_CONFIGURED) + + return result + + +# ============================================================================= +# Public API +# ============================================================================= + + +def build_lightspeed_stack_snapshot( + config: Configuration, +) -> dict[str, Any]: + """Build snapshot of lightspeed-stack configuration with PII masking. + + Extracts only the allowlisted fields from the Configuration object, + applying binary masking to sensitive values (credentials, URLs, file paths) + and passing through non-sensitive values (booleans, numbers, identifiers). + + Parameters: + config: The lightspeed-stack Configuration object. + + Returns: + A nested dict containing the masked configuration snapshot. + """ + return _extract_snapshot_fields(config, LIGHTSPEED_STACK_FIELDS) + + +def build_llama_stack_snapshot( + config_path: Optional[str] = None, +) -> dict[str, Any]: + """Build snapshot of llama-stack configuration with PII masking. + + In library mode, parses the llama-stack YAML config file and extracts + allowlisted fields with masking. In service mode (config_path is None), + returns a status indicating the config is not available locally. + + Parameters: + config_path: Path to the llama-stack YAML config file. If None + (service mode), llama-stack fields are marked as not available. + + Returns: + A nested dict containing the masked llama-stack configuration snapshot, + or a status dict if the config is not available. + """ + if config_path is None: + return {"status": NOT_AVAILABLE} + + try: + with open(config_path, "r", encoding="utf-8") as f: + ls_config = yaml.safe_load(f) + except (OSError, yaml.YAMLError) as e: + logger.warning("Failed to read llama-stack config for snapshot: %s", e) + return {"status": NOT_AVAILABLE} + + if not isinstance(ls_config, dict): + logger.warning("Llama-stack config is not a dict, skipping snapshot") + return {"status": NOT_AVAILABLE} + + snapshot = _extract_snapshot_fields(ls_config, LLAMA_STACK_FIELDS) + snapshot["inference_store"] = _extract_store_info(ls_config, "inference") + snapshot["metadata_store"] = _extract_store_info(ls_config, "metadata") + return snapshot + + +def build_configuration_snapshot( + config: Configuration, + llama_stack_config_path: Optional[str] = None, +) -> dict[str, Any]: + """Build a complete configuration snapshot with PII masking. + + Creates a snapshot containing both lightspeed-stack and llama-stack + configuration data with appropriate PII masking applied. Only collects + fields from an explicit allowlist — does not automatically grab the + whole configuration. + + Parameters: + config: The lightspeed-stack Configuration object. + llama_stack_config_path: Path to the llama-stack YAML config file. + If None (service mode), llama-stack section is marked not available. + + Returns: + A dict with 'lightspeed_stack' and 'llama_stack' keys containing + the respective masked snapshots, ready for JSON serialization. + """ + return { + "lightspeed_stack": build_lightspeed_stack_snapshot(config), + "llama_stack": build_llama_stack_snapshot(llama_stack_config_path), + } diff --git a/tests/unit/telemetry/__init__.py b/tests/unit/telemetry/__init__.py new file mode 100644 index 000000000..c7f63d51f --- /dev/null +++ b/tests/unit/telemetry/__init__.py @@ -0,0 +1 @@ +"""Unit tests for the telemetry module.""" diff --git a/tests/unit/telemetry/conftest.py b/tests/unit/telemetry/conftest.py new file mode 100644 index 000000000..e255c9f37 --- /dev/null +++ b/tests/unit/telemetry/conftest.py @@ -0,0 +1,388 @@ +"""Shared fixtures for telemetry unit tests.""" + +import tempfile +from pathlib import Path, PurePosixPath +from typing import Any, Generator + +import pytest +import yaml +from pydantic import SecretStr + +from models.config import ( + AccessRule, + Action, + AuthenticationConfiguration, + AuthorizationConfiguration, + Configuration, + CORSConfiguration, + Customization, + DatabaseConfiguration, + InferenceConfiguration, + JwkConfiguration, + JsonPathOperator, + JwtConfiguration, + JwtRoleRule, + LlamaStackConfiguration, + ModelContextProtocolServer, + PostgreSQLDatabaseConfiguration, + ServiceConfiguration, + SQLiteDatabaseConfiguration, + TLSConfiguration, + UserDataCollection, +) + +# ============================================================================= +# Known PII values used across tests +# ============================================================================= + +PII_HOST = "192.168.1.100" +PII_TLS_CERT = "/etc/ssl/certs/server.crt" +PII_TLS_KEY = "/etc/ssl/private/server.key" +PII_TLS_PASS = "/etc/ssl/private/key_password.txt" +PII_CORS_ORIGIN = "https://internal.corp.com" +PII_LLAMA_URL = "https://llama.internal.corp.com:8321" +PII_API_KEY = "sk-super-secret-api-key-12345" +PII_LIB_CONFIG = "/opt/llama-stack/run.yaml" +PII_K8S_API = "https://k8s.internal.corp.com:6443" +PII_K8S_CERT = "/var/run/secrets/ca.crt" +PII_JWK_URL = "https://auth.internal.corp.com/.well-known/jwks.json" +PII_ROLE_VALUE = "secret-org-id-99999" +PII_FEEDBACK_STORAGE = "/data/feedback" +PII_TRANSCRIPTS_STORAGE = "/data/transcripts" +PII_SYSTEM_PROMPT = "You are a secret internal assistant for ACME Corp project X." +PII_PROMPT_PATH = "/etc/lightspeed/system_prompt.txt" +PII_SQLITE_PATH = "/var/lib/lightspeed/db.sqlite" +PII_PG_HOST = "db.internal.corp.com" +PII_PG_DB = "lightspeed_prod" +PII_PG_USER = "admin_jsmith" +PII_PG_PASS = "P@ssw0rd!SuperSecret" +PII_PG_NAMESPACE = "production_ns" +PII_PG_CA_CERT = "/etc/ssl/postgres/ca.crt" +PII_MCP_URL = "https://mcp.internal.corp.com:9090" + +ALL_PII_VALUES = [ + PII_HOST, + PII_TLS_CERT, + PII_TLS_KEY, + PII_TLS_PASS, + PII_CORS_ORIGIN, + PII_LLAMA_URL, + PII_API_KEY, + PII_LIB_CONFIG, + PII_K8S_API, + PII_K8S_CERT, + PII_JWK_URL, + PII_ROLE_VALUE, + PII_FEEDBACK_STORAGE, + PII_TRANSCRIPTS_STORAGE, + PII_SYSTEM_PROMPT, + PII_PROMPT_PATH, + PII_SQLITE_PATH, + PII_PG_HOST, + PII_PG_DB, + PII_PG_USER, + PII_PG_PASS, + PII_PG_NAMESPACE, + PII_PG_CA_CERT, + PII_MCP_URL, +] + +SAMPLE_LLAMA_STACK_CONFIG: dict[str, Any] = { + "version": 2, + "image_name": "starter", + "container_image": None, + "external_providers_dir": "/opt/providers", + "apis": ["agents", "inference", "safety", "vector_io"], + "server": {"port": 8321}, + "providers": { + "inference": [ + { + "provider_id": "openai", + "provider_type": "remote::openai", + "config": {"api_key": "sk-openai-secret-key"}, + }, + ], + "safety": [ + { + "provider_id": "llama-guard", + "provider_type": "inline::llama-guard", + "config": {}, + }, + ], + "vector_io": [], + }, + "registered_resources": { + "models": [ + { + "model_id": "gpt-4o-mini", + "provider_id": "openai", + "provider_model_id": "gpt-4o-mini", + "model_type": "llm", + }, + { + "model_id": "granite-embedding-30m", + "provider_id": "sentence-transformers", + "provider_model_id": "all-MiniLM-L6-v2", + "model_type": "embedding", + }, + ], + "shields": [ + {"shield_id": "llama-guard", "provider_id": "llama-guard"}, + ], + "vector_stores": [], + }, + "storage": { + "backends": { + "kv_default": { + "type": "kv_sqlite", + "db_path": "/secret/path/kv_store.db", + }, + "sql_default": { + "type": "sql_sqlite", + "db_path": "/secret/path/sql_store.db", + }, + }, + "stores": { + "metadata": { + "namespace": "registry", + "backend": "kv_default", + }, + "inference": { + "table_name": "inference_store", + "backend": "sql_default", + }, + }, + }, + "benchmarks": [], + "scoring_fns": [], + "datasets": [], +} + + +LLAMA_STACK_PII_VALUES = [ + "sk-openai-secret-key", + "/secret/path/kv_store.db", + "/secret/path/sql_store.db", + "/opt/providers", +] + + +def build_fully_populated_config() -> Configuration: + """Build a Configuration with all fields populated using known PII values. + + Uses model_construct() to bypass file-existence validators. + + Returns: + A fully-populated Configuration for testing PII masking. + """ + return Configuration.model_construct( + name="test-service", + service=ServiceConfiguration.model_construct( + host=PII_HOST, + port=8080, + base_url=None, + workers=4, + auth_enabled=True, + color_log=True, + access_log=False, + root_path="", + tls_config=TLSConfiguration.model_construct( + tls_certificate_path=PurePosixPath(PII_TLS_CERT), + tls_key_path=PurePosixPath(PII_TLS_KEY), + tls_key_password=PurePosixPath(PII_TLS_PASS), + ), + cors=CORSConfiguration.model_construct( + allow_origins=[PII_CORS_ORIGIN, "https://admin.corp.com"], + allow_credentials=True, + allow_methods=["GET", "POST"], + allow_headers=["Authorization", "Content-Type"], + ), + ), + llama_stack=LlamaStackConfiguration.model_construct( + url=PII_LLAMA_URL, + api_key=SecretStr(PII_API_KEY), + use_as_library_client=False, + library_client_config_path=PII_LIB_CONFIG, + timeout=180, + ), + inference=InferenceConfiguration.model_construct( + default_model="gpt-4o-mini", + default_provider="openai", + ), + authentication=AuthenticationConfiguration.model_construct( + module="jwk_token", + skip_tls_verification=False, + skip_for_health_probes=False, + k8s_cluster_api=PII_K8S_API, + k8s_ca_cert_path=PurePosixPath(PII_K8S_CERT), + jwk_config=JwkConfiguration.model_construct( + url=PII_JWK_URL, + jwt_configuration=JwtConfiguration.model_construct( + user_id_claim="sub", + username_claim="preferred_username", + role_rules=[ + JwtRoleRule.model_construct( + jsonpath="$.org_id", + operator=JsonPathOperator.EQUALS, + value=PII_ROLE_VALUE, + roles=["admin"], + negate=False, + compiled_regex=None, + ), + ], + ), + ), + api_key_config=None, + rh_identity_config=None, + ), + authorization=AuthorizationConfiguration.model_construct( + access_rules=[ + AccessRule.model_construct( + role="admin", + actions=[Action.ADMIN], + ), + AccessRule.model_construct( + role="user", + actions=[Action.QUERY, Action.FEEDBACK], + ), + ], + ), + user_data_collection=UserDataCollection.model_construct( + feedback_enabled=True, + feedback_storage=PII_FEEDBACK_STORAGE, + transcripts_enabled=True, + transcripts_storage=PII_TRANSCRIPTS_STORAGE, + ), + customization=Customization.model_construct( + system_prompt=PII_SYSTEM_PROMPT, + system_prompt_path=PurePosixPath(PII_PROMPT_PATH), + disable_query_system_prompt=False, + profile_path=None, + custom_profile=None, + agent_card_path=None, + agent_card_config=None, + ), + database=DatabaseConfiguration.model_construct( + sqlite=SQLiteDatabaseConfiguration.model_construct( + db_path=PII_SQLITE_PATH, + ), + postgres=PostgreSQLDatabaseConfiguration.model_construct( + host=PII_PG_HOST, + port=5432, + db=PII_PG_DB, + user=PII_PG_USER, + password=SecretStr(PII_PG_PASS), + namespace=PII_PG_NAMESPACE, + ssl_mode="verify-full", + gss_encmode="prefer", + ca_cert_path=PurePosixPath(PII_PG_CA_CERT), + ), + ), + mcp_servers=[ + ModelContextProtocolServer.model_construct( + name="my-mcp-server", + provider_id="model-context-protocol", + url=PII_MCP_URL, + authorization_headers={}, + timeout=None, + ), + ], + conversation_cache=None, + byok_rag=[], + a2a_state=None, + quota_handlers=None, + azure_entra_id=None, + splunk=None, + deployment_environment="production", + solr=None, + ) + + +def build_minimal_config() -> Configuration: + """Build a minimal Configuration with mostly None/default optional fields. + + Returns: + A minimal Configuration for testing snapshot behavior with defaults. + """ + return Configuration.model_construct( + name="minimal", + service=ServiceConfiguration.model_construct( + host="localhost", + port=8080, + base_url=None, + workers=1, + auth_enabled=False, + color_log=True, + access_log=True, + root_path="", + tls_config=TLSConfiguration.model_construct( + tls_certificate_path=None, + tls_key_path=None, + tls_key_password=None, + ), + cors=CORSConfiguration.model_construct( + allow_origins=["*"], + allow_credentials=False, + allow_methods=["*"], + allow_headers=["*"], + ), + ), + llama_stack=LlamaStackConfiguration.model_construct( + url=None, + api_key=None, + use_as_library_client=True, + library_client_config_path=None, + timeout=180, + ), + inference=InferenceConfiguration.model_construct( + default_model=None, + default_provider=None, + ), + authentication=AuthenticationConfiguration.model_construct( + module="noop", + skip_tls_verification=False, + skip_for_health_probes=False, + k8s_cluster_api=None, + k8s_ca_cert_path=None, + jwk_config=None, + api_key_config=None, + rh_identity_config=None, + ), + authorization=None, + user_data_collection=UserDataCollection.model_construct( + feedback_enabled=False, + feedback_storage=None, + transcripts_enabled=False, + transcripts_storage=None, + ), + customization=None, + database=DatabaseConfiguration.model_construct( + sqlite=SQLiteDatabaseConfiguration.model_construct( + db_path="/tmp/lightspeed-stack.db", + ), + postgres=None, + ), + mcp_servers=[], + conversation_cache=None, + byok_rag=[], + a2a_state=None, + quota_handlers=None, + azure_entra_id=None, + splunk=None, + deployment_environment="development", + solr=None, + ) + + +@pytest.fixture(name="llama_stack_config_file") +def llama_stack_config_file_fixture() -> Generator[str, None, None]: + """Write SAMPLE_LLAMA_STACK_CONFIG to a temp YAML file and return its path. + + Yields: + str: Path to the temporary YAML file. + """ + with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f: + yaml.dump(SAMPLE_LLAMA_STACK_CONFIG, f) + path = f.name + yield path + Path(path).unlink(missing_ok=True) diff --git a/tests/unit/telemetry/test_configuration_snapshot.py b/tests/unit/telemetry/test_configuration_snapshot.py new file mode 100644 index 000000000..149ef7352 --- /dev/null +++ b/tests/unit/telemetry/test_configuration_snapshot.py @@ -0,0 +1,703 @@ +"""Tests for configuration snapshot with PII masking.""" + +import json +import tempfile +from enum import Enum +from pathlib import Path, PurePosixPath +from typing import Any + +import yaml +from pydantic import SecretStr + +from models.config import Action, JsonPathOperator +from telemetry.configuration_snapshot import ( + CONFIGURED, + LIGHTSPEED_STACK_FIELDS, + LLAMA_STACK_FIELDS, + NOT_AVAILABLE, + NOT_CONFIGURED, + FieldSpec, + ListFieldSpec, + MaskingType, + _extract_field, + _extract_list_field, + _extract_store_info, + _serialize_passthrough, + _set_nested_value, + build_configuration_snapshot, + build_lightspeed_stack_snapshot, + build_llama_stack_snapshot, + get_nested_value, + mask_value, +) +from tests.unit.telemetry.conftest import ( + ALL_PII_VALUES, + LLAMA_STACK_PII_VALUES, + SAMPLE_LLAMA_STACK_CONFIG, + build_fully_populated_config, + build_minimal_config, +) + +# ============================================================================= +# Tests: get_nested_value +# ============================================================================= + + +class TestGetNestedValue: + """Tests for get_nested_value function.""" + + def test_dict_simple_key(self) -> None: + """Test simple key lookup in a dict.""" + assert get_nested_value({"a": 1}, "a") == 1 + + def test_dict_nested_key(self) -> None: + """Test nested key lookup in a dict.""" + assert get_nested_value({"a": {"b": {"c": 42}}}, "a.b.c") == 42 + + def test_dict_missing_key(self) -> None: + """Test missing key returns None.""" + assert get_nested_value({"a": 1}, "b") is None + + def test_dict_missing_intermediate(self) -> None: + """Test missing intermediate key returns None.""" + assert get_nested_value({"a": 1}, "a.b.c") is None + + def test_dict_none_intermediate(self) -> None: + """Test None intermediate returns None.""" + assert get_nested_value({"a": None}, "a.b") is None + + def test_none_root(self) -> None: + """Test None root returns None.""" + assert get_nested_value(None, "a.b") is None + + def test_pydantic_model(self) -> None: + """Test attribute access on a Pydantic model.""" + config = build_minimal_config() + assert get_nested_value(config, "service.port") == 8080 + + def test_pydantic_model_nested(self) -> None: + """Test deeply nested attribute access on Pydantic models.""" + config = build_fully_populated_config() + assert ( + get_nested_value( + config, + "authentication.jwk_config.jwt_configuration.user_id_claim", + ) + == "sub" + ) + + def test_pydantic_model_none_intermediate(self) -> None: + """Test None intermediate in Pydantic model returns None.""" + config = build_minimal_config() + assert get_nested_value(config, "authentication.jwk_config.url") is None + + +# ============================================================================= +# Tests: _serialize_passthrough +# ============================================================================= + + +class TestSerializePassthrough: + """Tests for _serialize_passthrough function.""" + + def test_none(self) -> None: + """Test None returns None.""" + assert _serialize_passthrough(None) is None + + def test_bool(self) -> None: + """Test bool passes through.""" + assert _serialize_passthrough(True) is True + assert _serialize_passthrough(False) is False + + def test_int(self) -> None: + """Test int passes through.""" + assert _serialize_passthrough(42) == 42 + + def test_float(self) -> None: + """Test float passes through.""" + assert _serialize_passthrough(3.14) == 3.14 + + def test_str(self) -> None: + """Test str passes through.""" + assert _serialize_passthrough("hello") == "hello" + + def test_enum(self) -> None: + """Test enum returns its value.""" + + class TestColor(Enum): + """Test enum for serialization.""" + + RED = "red" + + assert _serialize_passthrough(TestColor.RED) == "red" + + def test_action_enum(self) -> None: + """Test Action enum serialization.""" + assert _serialize_passthrough(Action.QUERY) == "query" + + def test_json_path_operator_enum(self) -> None: + """Test JsonPathOperator enum serialization.""" + assert _serialize_passthrough(JsonPathOperator.EQUALS) == "equals" + + def test_list(self) -> None: + """Test list with mixed types.""" + result = _serialize_passthrough([1, "a", True, Action.QUERY]) + assert result == [1, "a", True, "query"] + + def test_empty_list(self) -> None: + """Test empty list.""" + assert _serialize_passthrough([]) == [] + + def test_dict(self) -> None: + """Test dict serialization.""" + assert _serialize_passthrough({"a": 1, "b": "x"}) == {"a": 1, "b": "x"} + + def test_secret_str_safety(self) -> None: + """Test SecretStr is masked even in passthrough mode.""" + assert _serialize_passthrough(SecretStr("secret")) == CONFIGURED + + def test_path_safety(self) -> None: + """Test Path is masked even in passthrough mode.""" + assert _serialize_passthrough(PurePosixPath("/etc/secret")) == CONFIGURED + + +# ============================================================================= +# Tests: mask_value +# ============================================================================= + + +class TestMaskValue: + """Tests for mask_value function.""" + + def test_sensitive_with_value(self) -> None: + """Test sensitive masking with non-None value returns 'configured'.""" + assert mask_value("secret", MaskingType.SENSITIVE) == CONFIGURED + + def test_sensitive_with_none(self) -> None: + """Test sensitive masking with None returns 'not_configured'.""" + assert mask_value(None, MaskingType.SENSITIVE) == NOT_CONFIGURED + + def test_sensitive_with_secret_str(self) -> None: + """Test sensitive masking with SecretStr returns 'configured'.""" + assert mask_value(SecretStr("key"), MaskingType.SENSITIVE) == CONFIGURED + + def test_sensitive_with_path(self) -> None: + """Test sensitive masking with Path returns 'configured'.""" + assert ( + mask_value(PurePosixPath("/etc/cert"), MaskingType.SENSITIVE) == CONFIGURED + ) + + def test_sensitive_with_empty_string(self) -> None: + """Test sensitive masking with empty string returns 'configured'.""" + assert mask_value("", MaskingType.SENSITIVE) == CONFIGURED + + def test_passthrough_bool(self) -> None: + """Test passthrough returns bool as-is.""" + assert mask_value(True, MaskingType.PASSTHROUGH) is True + + def test_passthrough_int(self) -> None: + """Test passthrough returns int as-is.""" + assert mask_value(8080, MaskingType.PASSTHROUGH) == 8080 + + def test_passthrough_string(self) -> None: + """Test passthrough returns string as-is.""" + assert mask_value("noop", MaskingType.PASSTHROUGH) == "noop" + + def test_passthrough_none(self) -> None: + """Test passthrough with None returns None.""" + assert mask_value(None, MaskingType.PASSTHROUGH) is None + + def test_passthrough_list(self) -> None: + """Test passthrough with list returns list.""" + assert mask_value(["GET", "POST"], MaskingType.PASSTHROUGH) == ["GET", "POST"] + + +# ============================================================================= +# Tests: _set_nested_value +# ============================================================================= + + +class TestSetNestedValue: + """Tests for _set_nested_value function.""" + + def test_simple_key(self) -> None: + """Test setting a top-level key.""" + target: dict[str, Any] = {} + _set_nested_value(target, "name", "test") + assert target == {"name": "test"} + + def test_nested_key(self) -> None: + """Test setting a nested key creates intermediates.""" + target: dict[str, Any] = {} + _set_nested_value(target, "service.workers", 4) + assert target == {"service": {"workers": 4}} + + def test_deeply_nested(self) -> None: + """Test deeply nested path.""" + target: dict[str, Any] = {} + _set_nested_value(target, "a.b.c.d", "value") + assert target == {"a": {"b": {"c": {"d": "value"}}}} + + def test_multiple_fields_same_parent(self) -> None: + """Test multiple fields under the same parent.""" + target: dict[str, Any] = {} + _set_nested_value(target, "service.workers", 4) + _set_nested_value(target, "service.port", 8080) + assert target == {"service": {"workers": 4, "port": 8080}} + + +# ============================================================================= +# Tests: _extract_field and _extract_list_field +# ============================================================================= + + +class TestExtractField: + """Tests for _extract_field function.""" + + def test_passthrough_from_dict(self) -> None: + """Test passthrough extraction from a dict.""" + source = {"a": {"b": 42}} + assert _extract_field(source, FieldSpec("a.b", MaskingType.PASSTHROUGH)) == 42 + + def test_sensitive_from_dict(self) -> None: + """Test sensitive extraction from a dict.""" + source = {"secret": "password123"} + result = _extract_field(source, FieldSpec("secret", MaskingType.SENSITIVE)) + assert result == CONFIGURED + + def test_missing_field(self) -> None: + """Test missing field returns appropriate default.""" + source: dict[str, Any] = {} + assert ( + _extract_field(source, FieldSpec("missing", MaskingType.SENSITIVE)) + == NOT_CONFIGURED + ) + assert ( + _extract_field(source, FieldSpec("missing", MaskingType.PASSTHROUGH)) + is None + ) + + +class TestExtractListField: + """Tests for _extract_list_field function.""" + + def test_extract_items(self) -> None: + """Test extracting list items with sub-fields.""" + source = {"items": [{"name": "a", "secret": "x"}, {"name": "b", "secret": "y"}]} + spec = ListFieldSpec( + "items", + item_fields=( + FieldSpec("name", MaskingType.PASSTHROUGH), + FieldSpec("secret", MaskingType.SENSITIVE), + ), + ) + result = _extract_list_field(source, spec) + assert result == [ + {"name": "a", "secret": CONFIGURED}, + {"name": "b", "secret": CONFIGURED}, + ] + + def test_empty_list(self) -> None: + """Test empty list returns empty list.""" + source: dict[str, Any] = {"items": []} + spec = ListFieldSpec( + "items", item_fields=(FieldSpec("name", MaskingType.PASSTHROUGH),) + ) + assert _extract_list_field(source, spec) == [] + + def test_none_list(self) -> None: + """Test None list returns NOT_CONFIGURED.""" + source = {"items": None} + spec = ListFieldSpec( + "items", item_fields=(FieldSpec("name", MaskingType.PASSTHROUGH),) + ) + assert _extract_list_field(source, spec) == NOT_CONFIGURED + + def test_missing_list(self) -> None: + """Test missing list path returns NOT_CONFIGURED.""" + source: dict[str, Any] = {} + spec = ListFieldSpec( + "items", item_fields=(FieldSpec("name", MaskingType.PASSTHROUGH),) + ) + assert _extract_list_field(source, spec) == NOT_CONFIGURED + + +# ============================================================================= +# Tests: _extract_store_info +# ============================================================================= + + +class TestExtractStoreInfo: + """Tests for _extract_store_info function.""" + + def test_inference_store(self) -> None: + """Test inference store extraction.""" + result = _extract_store_info(SAMPLE_LLAMA_STACK_CONFIG, "inference") + assert result["type"] == "sql_sqlite" + assert result["db_path"] == CONFIGURED + + def test_metadata_store_with_namespace(self) -> None: + """Test metadata store extraction includes namespace.""" + result = _extract_store_info(SAMPLE_LLAMA_STACK_CONFIG, "metadata") + assert result["type"] == "kv_sqlite" + assert result["db_path"] == CONFIGURED + assert result["namespace"] == "registry" + + def test_missing_store(self) -> None: + """Test missing store returns not_configured.""" + result = _extract_store_info(SAMPLE_LLAMA_STACK_CONFIG, "nonexistent") + assert result["type"] == NOT_CONFIGURED + assert result["db_path"] == NOT_CONFIGURED + + def test_no_storage_section(self) -> None: + """Test config without storage section.""" + result = _extract_store_info({}, "inference") + assert result["type"] == NOT_CONFIGURED + + def test_db_path_is_masked(self) -> None: + """Test that db_path never leaks the actual path.""" + result = _extract_store_info(SAMPLE_LLAMA_STACK_CONFIG, "inference") + assert "/secret/path" not in str(result) + + +# ============================================================================= +# Tests: build_lightspeed_stack_snapshot +# ============================================================================= + + +class TestBuildLightspeedStackSnapshot: + """Tests for build_lightspeed_stack_snapshot function.""" + + def test_minimal_config_snapshot(self) -> None: + """Test snapshot from minimal config has expected structure.""" + snapshot = build_lightspeed_stack_snapshot(build_minimal_config()) + assert snapshot["name"] == "minimal" + assert snapshot["service"]["workers"] == 1 + assert snapshot["service"]["port"] == 8080 + assert snapshot["service"]["auth_enabled"] is False + assert snapshot["service"]["host"] == CONFIGURED + + def test_sensitive_fields_masked(self) -> None: + """Test all sensitive fields are masked in fully-populated config.""" + snapshot = build_lightspeed_stack_snapshot(build_fully_populated_config()) + assert snapshot["service"]["host"] == CONFIGURED + assert snapshot["service"]["tls_config"]["tls_certificate_path"] == CONFIGURED + assert snapshot["service"]["tls_config"]["tls_key_path"] == CONFIGURED + assert snapshot["service"]["tls_config"]["tls_key_password"] == CONFIGURED + assert snapshot["service"]["cors"]["allow_origins"] == CONFIGURED + assert snapshot["llama_stack"]["url"] == CONFIGURED + assert snapshot["llama_stack"]["api_key"] == CONFIGURED + assert snapshot["llama_stack"]["library_client_config_path"] == CONFIGURED + assert snapshot["authentication"]["k8s_cluster_api"] == CONFIGURED + assert snapshot["authentication"]["k8s_ca_cert_path"] == CONFIGURED + assert snapshot["authentication"]["jwk_config"]["url"] == CONFIGURED + assert snapshot["user_data_collection"]["feedback_storage"] == CONFIGURED + assert snapshot["user_data_collection"]["transcripts_storage"] == CONFIGURED + assert snapshot["customization"]["system_prompt"] == CONFIGURED + assert snapshot["customization"]["system_prompt_path"] == CONFIGURED + assert snapshot["database"]["sqlite"]["db_path"] == CONFIGURED + assert snapshot["database"]["postgres"]["host"] == CONFIGURED + assert snapshot["database"]["postgres"]["db"] == CONFIGURED + assert snapshot["database"]["postgres"]["user"] == CONFIGURED + assert snapshot["database"]["postgres"]["password"] == CONFIGURED + assert snapshot["database"]["postgres"]["namespace"] == CONFIGURED + assert snapshot["database"]["postgres"]["ca_cert_path"] == CONFIGURED + + def test_passthrough_fields_preserved(self) -> None: + """Test non-sensitive fields pass through correctly.""" + snapshot = build_lightspeed_stack_snapshot(build_fully_populated_config()) + assert snapshot["service"]["workers"] == 4 + assert snapshot["service"]["port"] == 8080 + assert snapshot["service"]["auth_enabled"] is True + assert snapshot["service"]["color_log"] is True + assert snapshot["service"]["access_log"] is False + assert snapshot["service"]["cors"]["allow_credentials"] is True + assert snapshot["service"]["cors"]["allow_methods"] == ["GET", "POST"] + assert snapshot["llama_stack"]["use_as_library_client"] is False + assert snapshot["inference"]["default_model"] == "gpt-4o-mini" + assert snapshot["inference"]["default_provider"] == "openai" + assert snapshot["authentication"]["module"] == "jwk_token" + assert snapshot["authentication"]["skip_tls_verification"] is False + + def test_optional_none_fields(self) -> None: + """Test optional fields that are None.""" + snapshot = build_lightspeed_stack_snapshot(build_minimal_config()) + assert ( + snapshot["service"]["tls_config"]["tls_certificate_path"] == NOT_CONFIGURED + ) + assert snapshot["service"]["tls_config"]["tls_key_path"] == NOT_CONFIGURED + assert snapshot["llama_stack"]["url"] == NOT_CONFIGURED + assert snapshot["llama_stack"]["api_key"] == NOT_CONFIGURED + assert snapshot["authentication"]["jwk_config"]["url"] == NOT_CONFIGURED + assert snapshot["customization"]["system_prompt"] == NOT_CONFIGURED + assert snapshot["database"]["postgres"]["host"] == NOT_CONFIGURED + + def test_list_field_mcp_servers(self) -> None: + """Test MCP servers list extraction with masking.""" + snapshot = build_lightspeed_stack_snapshot(build_fully_populated_config()) + mcp = snapshot["mcp_servers"] + assert isinstance(mcp, list) + assert len(mcp) == 1 + assert mcp[0]["name"] == "my-mcp-server" + assert mcp[0]["provider_id"] == "model-context-protocol" + assert mcp[0]["url"] == CONFIGURED + + def test_empty_mcp_servers(self) -> None: + """Test empty MCP servers list.""" + snapshot = build_lightspeed_stack_snapshot(build_minimal_config()) + assert snapshot["mcp_servers"] == [] + + def test_role_rules_extraction(self) -> None: + """Test JWT role rules list extraction with value masking.""" + snapshot = build_lightspeed_stack_snapshot(build_fully_populated_config()) + rules = snapshot["authentication"]["jwk_config"]["jwt_configuration"][ + "role_rules" + ] + assert isinstance(rules, list) + assert len(rules) == 1 + assert rules[0]["jsonpath"] == "$.org_id" + assert rules[0]["operator"] == "equals" + assert rules[0]["value"] == CONFIGURED + assert rules[0]["roles"] == ["admin"] + assert rules[0]["negate"] is False + + def test_access_rules_extraction(self) -> None: + """Test authorization access rules extraction.""" + snapshot = build_lightspeed_stack_snapshot(build_fully_populated_config()) + rules = snapshot["authorization"]["access_rules"] + assert isinstance(rules, list) + assert len(rules) == 2 + assert rules[0]["role"] == "admin" + assert rules[0]["actions"] == ["admin"] + assert rules[1]["role"] == "user" + assert rules[1]["actions"] == ["query", "feedback"] + + def test_authorization_none(self) -> None: + """Test authorization section when not configured.""" + snapshot = build_lightspeed_stack_snapshot(build_minimal_config()) + assert snapshot["authorization"]["access_rules"] == NOT_CONFIGURED + + def test_database_ssl_mode_passthrough(self) -> None: + """Test database ssl_mode and gss_encmode pass through.""" + snapshot = build_lightspeed_stack_snapshot(build_fully_populated_config()) + assert snapshot["database"]["postgres"]["ssl_mode"] == "verify-full" + assert snapshot["database"]["postgres"]["gss_encmode"] == "prefer" + + +# ============================================================================= +# Tests: build_llama_stack_snapshot +# ============================================================================= + + +class TestBuildLlamaStackSnapshot: + """Tests for build_llama_stack_snapshot function.""" + + def test_service_mode_returns_not_available(self) -> None: + """Test that service mode (no path) returns not_available status.""" + assert build_llama_stack_snapshot(None) == {"status": NOT_AVAILABLE} + + def test_nonexistent_file(self) -> None: + """Test that missing file returns not_available status.""" + assert build_llama_stack_snapshot("/nonexistent/path.yaml") == { + "status": NOT_AVAILABLE + } + + def test_invalid_yaml(self) -> None: + """Test that invalid YAML returns not_available status.""" + with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f: + f.write(": invalid: yaml: [") + path = f.name + result = build_llama_stack_snapshot(path) + Path(path).unlink() + assert result == {"status": NOT_AVAILABLE} + + def test_valid_config(self, llama_stack_config_file: str) -> None: + """Test snapshot from valid llama-stack config.""" + result = build_llama_stack_snapshot(llama_stack_config_file) + assert result["version"] == 2 + assert result["image_name"] == "starter" + assert result["apis"] == ["agents", "inference", "safety", "vector_io"] + assert result["external_providers_dir"] == CONFIGURED + + def test_models_extraction(self, llama_stack_config_file: str) -> None: + """Test models list extraction.""" + result = build_llama_stack_snapshot(llama_stack_config_file) + models = result["registered_resources"]["models"] + assert len(models) == 2 + assert models[0]["model_id"] == "gpt-4o-mini" + assert models[0]["model_type"] == "llm" + + def test_providers_extraction(self, llama_stack_config_file: str) -> None: + """Test provider lists extraction shows only id and type.""" + result = build_llama_stack_snapshot(llama_stack_config_file) + inference = result["providers"]["inference"] + assert len(inference) == 1 + assert inference[0]["provider_id"] == "openai" + assert inference[0]["provider_type"] == "remote::openai" + assert "config" not in inference[0] + + def test_storage_fields(self, llama_stack_config_file: str) -> None: + """Test storage store extraction.""" + result = build_llama_stack_snapshot(llama_stack_config_file) + assert result["inference_store"]["type"] == "sql_sqlite" + assert result["inference_store"]["db_path"] == CONFIGURED + assert result["metadata_store"]["type"] == "kv_sqlite" + assert result["metadata_store"]["namespace"] == "registry" + + def test_missing_providers_section(self) -> None: + """Test config without providers section.""" + with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f: + yaml.dump({"version": 1, "apis": []}, f) + path = f.name + result = build_llama_stack_snapshot(path) + Path(path).unlink() + assert result["providers"]["inference"] == NOT_CONFIGURED + + def test_server_fields_masked(self) -> None: + """Test server host and TLS fields are masked.""" + config = { + "version": 1, + "server": { + "host": "0.0.0.0", + "port": 8321, + "tls_cafile": "/etc/ssl/ca.crt", + "tls_certfile": "/etc/ssl/cert.crt", + "tls_keyfile": "/etc/ssl/key.pem", + }, + } + with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f: + yaml.dump(config, f) + path = f.name + result = build_llama_stack_snapshot(path) + Path(path).unlink() + assert result["server"]["host"] == CONFIGURED + assert result["server"]["port"] == 8321 + assert result["server"]["tls_cafile"] == CONFIGURED + + +# ============================================================================= +# Tests: build_configuration_snapshot +# ============================================================================= + + +class TestBuildConfigurationSnapshot: + """Tests for build_configuration_snapshot function.""" + + def test_combines_both_sources(self) -> None: + """Test that snapshot contains both lightspeed_stack and llama_stack.""" + result = build_configuration_snapshot(build_minimal_config(), None) + assert "lightspeed_stack" in result + assert "llama_stack" in result + assert result["llama_stack"] == {"status": NOT_AVAILABLE} + assert result["lightspeed_stack"]["name"] == "minimal" + + def test_with_llama_stack_config(self, llama_stack_config_file: str) -> None: + """Test snapshot with both config sources.""" + result = build_configuration_snapshot( + build_minimal_config(), llama_stack_config_file + ) + assert result["lightspeed_stack"]["name"] == "minimal" + assert result["llama_stack"]["version"] == 2 + + +# ============================================================================= +# Tests: PII Leak Prevention (Critical) +# ============================================================================= + + +class TestPiiLeakPrevention: + """Critical tests proving PII is not leaked in snapshots.""" + + def test_no_pii_in_lightspeed_stack_snapshot(self) -> None: + """Verify no PII leaks in lightspeed-stack snapshot JSON.""" + json_str = json.dumps( + build_lightspeed_stack_snapshot(build_fully_populated_config()) + ) + for pii_value in ALL_PII_VALUES: + assert ( + pii_value not in json_str + ), f"PII leaked in lightspeed-stack snapshot: '{pii_value}'" + + def test_no_pii_in_llama_stack_snapshot(self, llama_stack_config_file: str) -> None: + """Verify no PII leaks in llama-stack snapshot JSON.""" + json_str = json.dumps(build_llama_stack_snapshot(llama_stack_config_file)) + for pii_value in LLAMA_STACK_PII_VALUES: + assert ( + pii_value not in json_str + ), f"PII leaked in llama-stack snapshot: '{pii_value}'" + + def test_no_pii_in_combined_snapshot(self, llama_stack_config_file: str) -> None: + """Verify no PII leaks in the combined snapshot JSON.""" + snapshot = build_configuration_snapshot( + build_fully_populated_config(), llama_stack_config_file + ) + json_str = json.dumps(snapshot) + for pii_value in ALL_PII_VALUES + LLAMA_STACK_PII_VALUES: + assert ( + pii_value not in json_str + ), f"PII leaked in combined snapshot: '{pii_value}'" + + def test_snapshot_only_contains_allowlisted_fields(self) -> None: + """Verify snapshot does not contain any fields outside the allowlist.""" + snapshot = build_lightspeed_stack_snapshot(build_fully_populated_config()) + allowed_top_keys = {spec.path.split(".")[0] for spec in LIGHTSPEED_STACK_FIELDS} + unexpected = set(snapshot.keys()) - allowed_top_keys + assert ( + not unexpected + ), f"Snapshot contains unexpected top-level keys: {unexpected}" + + def test_provider_config_not_leaked(self, llama_stack_config_file: str) -> None: + """Verify provider config sections (with secrets) are not included.""" + json_str = json.dumps(build_llama_stack_snapshot(llama_stack_config_file)) + assert "api_key" not in json_str + assert "sk-openai" not in json_str + + def test_secret_str_values_never_exposed(self) -> None: + """Verify SecretStr values are never present in snapshot output.""" + json_str = json.dumps( + build_lightspeed_stack_snapshot(build_fully_populated_config()) + ) + assert "sk-super-secret-api-key-12345" not in json_str + assert "P@ssw0rd!SuperSecret" not in json_str + assert "**********" not in json_str + + def test_snapshot_is_json_serializable(self) -> None: + """Verify the snapshot can be serialized to JSON without errors.""" + json_str = json.dumps( + build_configuration_snapshot(build_fully_populated_config(), None) + ) + assert isinstance(json.loads(json_str), dict) + + +# ============================================================================= +# Tests: Registry Validation +# ============================================================================= + + +class TestRegistryValidation: + """Tests validating the field registry itself.""" + + def test_all_field_specs_have_valid_masking(self) -> None: + """Verify all field specs have a valid MaskingType.""" + for spec in LIGHTSPEED_STACK_FIELDS + LLAMA_STACK_FIELDS: + if isinstance(spec, FieldSpec): + assert isinstance( + spec.masking, MaskingType + ), f"Invalid masking for {spec.path}" + elif isinstance(spec, ListFieldSpec): + for sub in spec.item_fields: + assert isinstance( + sub.masking, MaskingType + ), f"Invalid masking for {spec.path}.{sub.path}" + + def test_no_duplicate_paths_in_lightspeed_registry(self) -> None: + """Verify no duplicate paths in lightspeed-stack registry.""" + paths = [spec.path for spec in LIGHTSPEED_STACK_FIELDS] + duplicates = [p for p in paths if paths.count(p) > 1] + assert not duplicates, f"Duplicate paths: {set(duplicates)}" + + def test_no_duplicate_paths_in_llama_stack_registry(self) -> None: + """Verify no duplicate paths in llama-stack registry.""" + paths = [spec.path for spec in LLAMA_STACK_FIELDS] + duplicates = [p for p in paths if paths.count(p) > 1] + assert not duplicates, f"Duplicate paths: {set(duplicates)}" From 8dce13bb9fc34d4a83618281e8b42deee9343401 Mon Sep 17 00:00:00 2001 From: Maxim Svistunov Date: Mon, 16 Feb 2026 16:59:08 +0100 Subject: [PATCH 2/4] CodeRabbit: async I/O, type fixes, defensive checks, tmp_path fixtures --- src/telemetry/configuration_snapshot.py | 40 +++++--- tests/unit/telemetry/conftest.py | 32 +++---- .../telemetry/test_configuration_snapshot.py | 92 ++++++++++--------- 3 files changed, 93 insertions(+), 71 deletions(-) diff --git a/src/telemetry/configuration_snapshot.py b/src/telemetry/configuration_snapshot.py index bf32d3f75..0621ff7b7 100644 --- a/src/telemetry/configuration_snapshot.py +++ b/src/telemetry/configuration_snapshot.py @@ -9,6 +9,7 @@ No integration with ingress is provided here — only methods to build the JSON. """ +import asyncio import logging from dataclasses import dataclass from enum import Enum @@ -199,7 +200,8 @@ class ListFieldSpec: FieldSpec("provider_id", MaskingType.PASSTHROUGH), ), ), - # Providers — extract only provider_id and provider_type per entry + # Providers — extract only provider_id and provider_type per entry. + # NOTE: Update this list when llama-stack adds new provider categories. *( ListFieldSpec( f"providers.{provider_name}", @@ -276,7 +278,9 @@ def _serialize_passthrough(value: Any) -> Any: return [_serialize_passthrough(v) for v in value] if isinstance(value, dict): return {str(k): _serialize_passthrough(v) for k, v in value.items()} - # Safety: mask SecretStr and file paths; stringify everything else + # Safety: mask SecretStr and file paths; log and stringify everything else + if not isinstance(value, (SecretStr, PurePath)): + logger.debug("Passthrough fallback to str() for type %s", type(value).__name__) return CONFIGURED if isinstance(value, (SecretStr, PurePath)) else str(value) @@ -308,7 +312,7 @@ def _set_nested_value(target: dict[str, Any], path: str, value: Any) -> None: parts = path.split(".") current = target for part in parts[:-1]: - if part not in current: + if part not in current or not isinstance(current[part], dict): current[part] = {} current = current[part] current[parts[-1]] = value @@ -442,7 +446,24 @@ def build_lightspeed_stack_snapshot( return _extract_snapshot_fields(config, LIGHTSPEED_STACK_FIELDS) -def build_llama_stack_snapshot( +def _read_yaml_file(config_path: str) -> Any: + """Read and parse a YAML config file synchronously. + + Parameters: + config_path: Path to the YAML file. + + Returns: + The parsed YAML content, or None on failure. + """ + try: + with open(config_path, "r", encoding="utf-8") as f: + return yaml.safe_load(f) + except (OSError, yaml.YAMLError) as e: + logger.warning("Failed to read llama-stack config for snapshot: %s", e) + return None + + +async def build_llama_stack_snapshot( config_path: Optional[str] = None, ) -> dict[str, Any]: """Build snapshot of llama-stack configuration with PII masking. @@ -462,12 +483,7 @@ def build_llama_stack_snapshot( if config_path is None: return {"status": NOT_AVAILABLE} - try: - with open(config_path, "r", encoding="utf-8") as f: - ls_config = yaml.safe_load(f) - except (OSError, yaml.YAMLError) as e: - logger.warning("Failed to read llama-stack config for snapshot: %s", e) - return {"status": NOT_AVAILABLE} + ls_config = await asyncio.to_thread(_read_yaml_file, config_path) if not isinstance(ls_config, dict): logger.warning("Llama-stack config is not a dict, skipping snapshot") @@ -479,7 +495,7 @@ def build_llama_stack_snapshot( return snapshot -def build_configuration_snapshot( +async def build_configuration_snapshot( config: Configuration, llama_stack_config_path: Optional[str] = None, ) -> dict[str, Any]: @@ -501,5 +517,5 @@ def build_configuration_snapshot( """ return { "lightspeed_stack": build_lightspeed_stack_snapshot(config), - "llama_stack": build_llama_stack_snapshot(llama_stack_config_path), + "llama_stack": await build_llama_stack_snapshot(llama_stack_config_path), } diff --git a/tests/unit/telemetry/conftest.py b/tests/unit/telemetry/conftest.py index e255c9f37..6b901c979 100644 --- a/tests/unit/telemetry/conftest.py +++ b/tests/unit/telemetry/conftest.py @@ -1,8 +1,7 @@ """Shared fixtures for telemetry unit tests.""" -import tempfile -from pathlib import Path, PurePosixPath -from typing import Any, Generator +from pathlib import Path +from typing import Any import pytest import yaml @@ -187,9 +186,9 @@ def build_fully_populated_config() -> Configuration: access_log=False, root_path="", tls_config=TLSConfiguration.model_construct( - tls_certificate_path=PurePosixPath(PII_TLS_CERT), - tls_key_path=PurePosixPath(PII_TLS_KEY), - tls_key_password=PurePosixPath(PII_TLS_PASS), + tls_certificate_path=Path(PII_TLS_CERT), + tls_key_path=Path(PII_TLS_KEY), + tls_key_password=Path(PII_TLS_PASS), ), cors=CORSConfiguration.model_construct( allow_origins=[PII_CORS_ORIGIN, "https://admin.corp.com"], @@ -214,7 +213,7 @@ def build_fully_populated_config() -> Configuration: skip_tls_verification=False, skip_for_health_probes=False, k8s_cluster_api=PII_K8S_API, - k8s_ca_cert_path=PurePosixPath(PII_K8S_CERT), + k8s_ca_cert_path=Path(PII_K8S_CERT), jwk_config=JwkConfiguration.model_construct( url=PII_JWK_URL, jwt_configuration=JwtConfiguration.model_construct( @@ -255,7 +254,7 @@ def build_fully_populated_config() -> Configuration: ), customization=Customization.model_construct( system_prompt=PII_SYSTEM_PROMPT, - system_prompt_path=PurePosixPath(PII_PROMPT_PATH), + system_prompt_path=Path(PII_PROMPT_PATH), disable_query_system_prompt=False, profile_path=None, custom_profile=None, @@ -275,7 +274,7 @@ def build_fully_populated_config() -> Configuration: namespace=PII_PG_NAMESPACE, ssl_mode="verify-full", gss_encmode="prefer", - ca_cert_path=PurePosixPath(PII_PG_CA_CERT), + ca_cert_path=Path(PII_PG_CA_CERT), ), ), mcp_servers=[ @@ -375,14 +374,15 @@ def build_minimal_config() -> Configuration: @pytest.fixture(name="llama_stack_config_file") -def llama_stack_config_file_fixture() -> Generator[str, None, None]: +def llama_stack_config_file_fixture(tmp_path: Path) -> str: """Write SAMPLE_LLAMA_STACK_CONFIG to a temp YAML file and return its path. - Yields: + Parameters: + tmp_path: Pytest-managed temporary directory (auto-cleaned). + + Returns: str: Path to the temporary YAML file. """ - with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f: - yaml.dump(SAMPLE_LLAMA_STACK_CONFIG, f) - path = f.name - yield path - Path(path).unlink(missing_ok=True) + path = tmp_path / "llama_stack_config.yaml" + path.write_text(yaml.dump(SAMPLE_LLAMA_STACK_CONFIG)) + return str(path) diff --git a/tests/unit/telemetry/test_configuration_snapshot.py b/tests/unit/telemetry/test_configuration_snapshot.py index 149ef7352..cee645158 100644 --- a/tests/unit/telemetry/test_configuration_snapshot.py +++ b/tests/unit/telemetry/test_configuration_snapshot.py @@ -1,7 +1,6 @@ """Tests for configuration snapshot with PII masking.""" import json -import tempfile from enum import Enum from pathlib import Path, PurePosixPath from typing import Any @@ -245,6 +244,13 @@ def test_multiple_fields_same_parent(self) -> None: _set_nested_value(target, "service.port", 8080) assert target == {"service": {"workers": 4, "port": 8080}} + def test_path_prefix_collision(self) -> None: + """Test that a scalar at a.b is replaced by a dict when a.b.c is set.""" + target: dict[str, Any] = {} + _set_nested_value(target, "a.b", "scalar") + _set_nested_value(target, "a.b.c", "nested") + assert target == {"a": {"b": {"c": "nested"}}} + # ============================================================================= # Tests: _extract_field and _extract_list_field @@ -492,68 +498,64 @@ def test_database_ssl_mode_passthrough(self) -> None: class TestBuildLlamaStackSnapshot: """Tests for build_llama_stack_snapshot function.""" - def test_service_mode_returns_not_available(self) -> None: + async def test_service_mode_returns_not_available(self) -> None: """Test that service mode (no path) returns not_available status.""" - assert build_llama_stack_snapshot(None) == {"status": NOT_AVAILABLE} + assert await build_llama_stack_snapshot(None) == {"status": NOT_AVAILABLE} - def test_nonexistent_file(self) -> None: + async def test_nonexistent_file(self) -> None: """Test that missing file returns not_available status.""" - assert build_llama_stack_snapshot("/nonexistent/path.yaml") == { + assert await build_llama_stack_snapshot("/nonexistent/path.yaml") == { "status": NOT_AVAILABLE } - def test_invalid_yaml(self) -> None: + async def test_invalid_yaml(self, tmp_path: Path) -> None: """Test that invalid YAML returns not_available status.""" - with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f: - f.write(": invalid: yaml: [") - path = f.name - result = build_llama_stack_snapshot(path) - Path(path).unlink() + path = tmp_path / "invalid.yaml" + path.write_text(": invalid: yaml: [") + result = await build_llama_stack_snapshot(str(path)) assert result == {"status": NOT_AVAILABLE} - def test_valid_config(self, llama_stack_config_file: str) -> None: + async def test_valid_config(self, llama_stack_config_file: str) -> None: """Test snapshot from valid llama-stack config.""" - result = build_llama_stack_snapshot(llama_stack_config_file) + result = await build_llama_stack_snapshot(llama_stack_config_file) assert result["version"] == 2 assert result["image_name"] == "starter" assert result["apis"] == ["agents", "inference", "safety", "vector_io"] assert result["external_providers_dir"] == CONFIGURED - def test_models_extraction(self, llama_stack_config_file: str) -> None: + async def test_models_extraction(self, llama_stack_config_file: str) -> None: """Test models list extraction.""" - result = build_llama_stack_snapshot(llama_stack_config_file) + result = await build_llama_stack_snapshot(llama_stack_config_file) models = result["registered_resources"]["models"] assert len(models) == 2 assert models[0]["model_id"] == "gpt-4o-mini" assert models[0]["model_type"] == "llm" - def test_providers_extraction(self, llama_stack_config_file: str) -> None: + async def test_providers_extraction(self, llama_stack_config_file: str) -> None: """Test provider lists extraction shows only id and type.""" - result = build_llama_stack_snapshot(llama_stack_config_file) + result = await build_llama_stack_snapshot(llama_stack_config_file) inference = result["providers"]["inference"] assert len(inference) == 1 assert inference[0]["provider_id"] == "openai" assert inference[0]["provider_type"] == "remote::openai" assert "config" not in inference[0] - def test_storage_fields(self, llama_stack_config_file: str) -> None: + async def test_storage_fields(self, llama_stack_config_file: str) -> None: """Test storage store extraction.""" - result = build_llama_stack_snapshot(llama_stack_config_file) + result = await build_llama_stack_snapshot(llama_stack_config_file) assert result["inference_store"]["type"] == "sql_sqlite" assert result["inference_store"]["db_path"] == CONFIGURED assert result["metadata_store"]["type"] == "kv_sqlite" assert result["metadata_store"]["namespace"] == "registry" - def test_missing_providers_section(self) -> None: + async def test_missing_providers_section(self, tmp_path: Path) -> None: """Test config without providers section.""" - with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f: - yaml.dump({"version": 1, "apis": []}, f) - path = f.name - result = build_llama_stack_snapshot(path) - Path(path).unlink() + path = tmp_path / "no_providers.yaml" + path.write_text(yaml.dump({"version": 1, "apis": []})) + result = await build_llama_stack_snapshot(str(path)) assert result["providers"]["inference"] == NOT_CONFIGURED - def test_server_fields_masked(self) -> None: + async def test_server_fields_masked(self, tmp_path: Path) -> None: """Test server host and TLS fields are masked.""" config = { "version": 1, @@ -565,11 +567,9 @@ def test_server_fields_masked(self) -> None: "tls_keyfile": "/etc/ssl/key.pem", }, } - with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f: - yaml.dump(config, f) - path = f.name - result = build_llama_stack_snapshot(path) - Path(path).unlink() + path = tmp_path / "server.yaml" + path.write_text(yaml.dump(config)) + result = await build_llama_stack_snapshot(str(path)) assert result["server"]["host"] == CONFIGURED assert result["server"]["port"] == 8321 assert result["server"]["tls_cafile"] == CONFIGURED @@ -583,17 +583,17 @@ def test_server_fields_masked(self) -> None: class TestBuildConfigurationSnapshot: """Tests for build_configuration_snapshot function.""" - def test_combines_both_sources(self) -> None: + async def test_combines_both_sources(self) -> None: """Test that snapshot contains both lightspeed_stack and llama_stack.""" - result = build_configuration_snapshot(build_minimal_config(), None) + result = await build_configuration_snapshot(build_minimal_config(), None) assert "lightspeed_stack" in result assert "llama_stack" in result assert result["llama_stack"] == {"status": NOT_AVAILABLE} assert result["lightspeed_stack"]["name"] == "minimal" - def test_with_llama_stack_config(self, llama_stack_config_file: str) -> None: + async def test_with_llama_stack_config(self, llama_stack_config_file: str) -> None: """Test snapshot with both config sources.""" - result = build_configuration_snapshot( + result = await build_configuration_snapshot( build_minimal_config(), llama_stack_config_file ) assert result["lightspeed_stack"]["name"] == "minimal" @@ -618,17 +618,21 @@ def test_no_pii_in_lightspeed_stack_snapshot(self) -> None: pii_value not in json_str ), f"PII leaked in lightspeed-stack snapshot: '{pii_value}'" - def test_no_pii_in_llama_stack_snapshot(self, llama_stack_config_file: str) -> None: + async def test_no_pii_in_llama_stack_snapshot( + self, llama_stack_config_file: str + ) -> None: """Verify no PII leaks in llama-stack snapshot JSON.""" - json_str = json.dumps(build_llama_stack_snapshot(llama_stack_config_file)) + json_str = json.dumps(await build_llama_stack_snapshot(llama_stack_config_file)) for pii_value in LLAMA_STACK_PII_VALUES: assert ( pii_value not in json_str ), f"PII leaked in llama-stack snapshot: '{pii_value}'" - def test_no_pii_in_combined_snapshot(self, llama_stack_config_file: str) -> None: + async def test_no_pii_in_combined_snapshot( + self, llama_stack_config_file: str + ) -> None: """Verify no PII leaks in the combined snapshot JSON.""" - snapshot = build_configuration_snapshot( + snapshot = await build_configuration_snapshot( build_fully_populated_config(), llama_stack_config_file ) json_str = json.dumps(snapshot) @@ -646,9 +650,11 @@ def test_snapshot_only_contains_allowlisted_fields(self) -> None: not unexpected ), f"Snapshot contains unexpected top-level keys: {unexpected}" - def test_provider_config_not_leaked(self, llama_stack_config_file: str) -> None: + async def test_provider_config_not_leaked( + self, llama_stack_config_file: str + ) -> None: """Verify provider config sections (with secrets) are not included.""" - json_str = json.dumps(build_llama_stack_snapshot(llama_stack_config_file)) + json_str = json.dumps(await build_llama_stack_snapshot(llama_stack_config_file)) assert "api_key" not in json_str assert "sk-openai" not in json_str @@ -661,10 +667,10 @@ def test_secret_str_values_never_exposed(self) -> None: assert "P@ssw0rd!SuperSecret" not in json_str assert "**********" not in json_str - def test_snapshot_is_json_serializable(self) -> None: + async def test_snapshot_is_json_serializable(self) -> None: """Verify the snapshot can be serialized to JSON without errors.""" json_str = json.dumps( - build_configuration_snapshot(build_fully_populated_config(), None) + await build_configuration_snapshot(build_fully_populated_config(), None) ) assert isinstance(json.loads(json_str), dict) From 5b0393b7d4ab86403a36ce6b73521c46acbfb118 Mon Sep 17 00:00:00 2001 From: Maxim Svistunov Date: Mon, 16 Feb 2026 22:34:58 +0100 Subject: [PATCH 3/4] CodeRabbit: mask unknown types, Literal constants, dupl-path detection --- src/telemetry/configuration_snapshot.py | 20 +++++++++++-------- .../telemetry/test_configuration_snapshot.py | 14 +++++++------ 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/src/telemetry/configuration_snapshot.py b/src/telemetry/configuration_snapshot.py index 0621ff7b7..ea0340cd0 100644 --- a/src/telemetry/configuration_snapshot.py +++ b/src/telemetry/configuration_snapshot.py @@ -14,7 +14,7 @@ from dataclasses import dataclass from enum import Enum from pathlib import PurePath -from typing import Any, Optional +from typing import Any, Literal, Optional import yaml from pydantic import SecretStr @@ -24,9 +24,9 @@ logger = logging.getLogger(__name__) # Masking output constants -CONFIGURED = "configured" -NOT_CONFIGURED = "not_configured" -NOT_AVAILABLE = "not_available" +CONFIGURED: Literal["configured"] = "configured" +NOT_CONFIGURED: Literal["not_configured"] = "not_configured" +NOT_AVAILABLE: Literal["not_available"] = "not_available" class MaskingType(Enum): @@ -278,10 +278,12 @@ def _serialize_passthrough(value: Any) -> Any: return [_serialize_passthrough(v) for v in value] if isinstance(value, dict): return {str(k): _serialize_passthrough(v) for k, v in value.items()} - # Safety: mask SecretStr and file paths; log and stringify everything else + # Safety: mask SecretStr, file paths, and any unrecognised types if not isinstance(value, (SecretStr, PurePath)): - logger.debug("Passthrough fallback to str() for type %s", type(value).__name__) - return CONFIGURED if isinstance(value, (SecretStr, PurePath)) else str(value) + logger.warning( + "Passthrough masking unexpected type %s as configured", type(value).__name__ + ) + return CONFIGURED def mask_value(value: Any, masking: MaskingType) -> Any: @@ -332,7 +334,9 @@ def _extract_field(source: Any, spec: FieldSpec) -> Any: return mask_value(value, spec.masking) -def _extract_list_field(source: Any, spec: ListFieldSpec) -> list[dict[str, Any]] | str: +def _extract_list_field( + source: Any, spec: ListFieldSpec +) -> list[dict[str, Any]] | Literal["not_configured"]: """Extract and mask a list field with per-item sub-fields. Parameters: diff --git a/tests/unit/telemetry/test_configuration_snapshot.py b/tests/unit/telemetry/test_configuration_snapshot.py index cee645158..2c9e692ca 100644 --- a/tests/unit/telemetry/test_configuration_snapshot.py +++ b/tests/unit/telemetry/test_configuration_snapshot.py @@ -698,12 +698,14 @@ def test_all_field_specs_have_valid_masking(self) -> None: def test_no_duplicate_paths_in_lightspeed_registry(self) -> None: """Verify no duplicate paths in lightspeed-stack registry.""" - paths = [spec.path for spec in LIGHTSPEED_STACK_FIELDS] - duplicates = [p for p in paths if paths.count(p) > 1] - assert not duplicates, f"Duplicate paths: {set(duplicates)}" + paths = [s.path for s in LIGHTSPEED_STACK_FIELDS] + assert len(paths) == len( + set(paths) + ), f"Duplicate paths: {set(p for p in paths if paths.count(p) > 1)}" def test_no_duplicate_paths_in_llama_stack_registry(self) -> None: """Verify no duplicate paths in llama-stack registry.""" - paths = [spec.path for spec in LLAMA_STACK_FIELDS] - duplicates = [p for p in paths if paths.count(p) > 1] - assert not duplicates, f"Duplicate paths: {set(duplicates)}" + paths = [s.path for s in LLAMA_STACK_FIELDS] + assert len(paths) == len( + set(paths) + ), f"Duplicate paths: {set(p for p in paths if paths.count(p) > 1)}" From c09bb505c4fe45ff0dd63e075ca87413e0a6ed60 Mon Sep 17 00:00:00 2001 From: Maxim Svistunov Date: Tue, 17 Feb 2026 05:53:55 +0100 Subject: [PATCH 4/4] Fix db_path check --- src/telemetry/configuration_snapshot.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/telemetry/configuration_snapshot.py b/src/telemetry/configuration_snapshot.py index ea0340cd0..7f21b36a6 100644 --- a/src/telemetry/configuration_snapshot.py +++ b/src/telemetry/configuration_snapshot.py @@ -418,7 +418,7 @@ def _extract_store_info(ls_config: dict[str, Any], store_name: str) -> dict[str, result: dict[str, Any] = { "type": backend.get("type", NOT_CONFIGURED), - "db_path": CONFIGURED if backend.get("db_path") else NOT_CONFIGURED, + "db_path": CONFIGURED if backend.get("db_path") is not None else NOT_CONFIGURED, } if store_name == "metadata":