From 24871c0aa9960c515964ecdc02e2cc794e36e550 Mon Sep 17 00:00:00 2001 From: Attila Kobor Date: Sat, 28 Feb 2026 14:11:19 +0100 Subject: [PATCH 1/8] Air Traffic unification Part 2 --- scenarios/bayesian_sim_air_traffic_data.yaml | 10 +- ...ir_traffic_data_varying_refresh_rates.yaml | 11 +- scenarios/bluesky_sim_air_traffic_data.yaml | 10 +- ...y_sim_air_traffic_data_latency_issues.yaml | 11 +- scenarios/opensky_live_data.yaml | 26 +-- scenarios/openutm_sim_air_traffic_data.yaml | 11 +- scenarios/sdsp_track.yaml | 9 +- scenarios/verify_sdsp_metrics.yaml | 16 +- .../core/execution/dependencies.py | 59 ------ .../core/providers/__init__.py | 2 + .../core/providers/bayesian_provider.py | 7 +- .../core/providers/bluesky_provider.py | 6 +- .../core/providers/factory.py | 12 +- .../core/providers/geojson_provider.py | 6 +- .../core/providers/latency.py | 102 ++++++++++ .../core/steps/air_traffic_step.py | 178 ++++++++++++++++-- .../core/streamers/__init__.py | 2 + .../core/streamers/factory.py | 6 +- .../core/streamers/flight_blender_streamer.py | 119 ++++++++++-- tests/test_group_execution.py | 18 +- tests/test_step_status_updates.py | 18 +- tests/test_stream_air_traffic.py | 40 ++-- tests/test_ui_groups.py | 6 +- tests/test_yaml_scenarios.py | 5 +- 24 files changed, 498 insertions(+), 192 deletions(-) create mode 100644 src/openutm_verification/core/providers/latency.py diff --git a/scenarios/bayesian_sim_air_traffic_data.yaml b/scenarios/bayesian_sim_air_traffic_data.yaml index 2afdfe2..e226a2e 100644 --- a/scenarios/bayesian_sim_air_traffic_data.yaml +++ b/scenarios/bayesian_sim_air_traffic_data.yaml @@ -1,10 +1,8 @@ name: bayesian_sim_air_traffic_data description: Bayesian simulation air traffic data test steps: -- id: generate_bayesian_simulation_air_traffic_data - step: Generate Bayesian Simulation Air Traffic Data -- step: Fetch Session IDs -- step: Submit Simulated Air Traffic +- step: Stream Air Traffic + id: stream_air_traffic arguments: - observations: ${{ steps.generate_bayesian_simulation_air_traffic_data.result }} - session_ids: ${{ steps.Fetch Session IDs.result }} + provider: bayesian + target: flight_blender diff --git a/scenarios/bayesian_sim_air_traffic_data_varying_refresh_rates.yaml b/scenarios/bayesian_sim_air_traffic_data_varying_refresh_rates.yaml index 8ab0151..4c24e25 100644 --- a/scenarios/bayesian_sim_air_traffic_data_varying_refresh_rates.yaml +++ b/scenarios/bayesian_sim_air_traffic_data_varying_refresh_rates.yaml @@ -1,10 +1,9 @@ name: bayesian_sim_air_traffic_data_varying_refresh_rates description: Bayesian simulation air traffic data test with varying refresh rates steps: -- id: generate_bayesian_simulation_air_traffic_data - step: Generate Bayesian Simulation Air Traffic Data -- step: Fetch Session IDs -- step: Submit Simulated Air Traffic at varying refresh rates +- step: Stream Air Traffic + id: stream_air_traffic arguments: - observations: ${{ steps.generate_bayesian_simulation_air_traffic_data.result }} - session_ids: ${{ steps.Fetch Session IDs.result }} + provider: bayesian + target: flight_blender + refresh_mode: varying diff --git a/scenarios/bluesky_sim_air_traffic_data.yaml b/scenarios/bluesky_sim_air_traffic_data.yaml index 89b007e..19f13f4 100644 --- a/scenarios/bluesky_sim_air_traffic_data.yaml +++ b/scenarios/bluesky_sim_air_traffic_data.yaml @@ -1,10 +1,8 @@ name: bluesky_sim_air_traffic_data description: Blue Sky test steps: -- id: generate_bluesky_simulation_air_traffic_data - step: Generate BlueSky Simulation Air Traffic Data -- step: Fetch Session IDs -- step: Submit Simulated Air Traffic +- step: Stream Air Traffic + id: stream_air_traffic arguments: - observations: ${{ steps.generate_bluesky_simulation_air_traffic_data.result }} - session_ids: ${{ steps.Fetch Session IDs.result }} + provider: bluesky + target: flight_blender diff --git a/scenarios/bluesky_sim_air_traffic_data_latency_issues.yaml b/scenarios/bluesky_sim_air_traffic_data_latency_issues.yaml index c01e373..880f168 100644 --- a/scenarios/bluesky_sim_air_traffic_data_latency_issues.yaml +++ b/scenarios/bluesky_sim_air_traffic_data_latency_issues.yaml @@ -1,10 +1,9 @@ name: bluesky_sim_air_traffic_data_latency_issues description: F3623 Blue Sky dataset with latency issues test steps: -- id: generate_bluesky_sim_air_traffic_data_with_sensor_latency_issues - step: Generate BlueSky Simulation Air Traffic Data with latency issues -- step: Fetch Session IDs -- step: Submit Simulated Air Traffic +- step: Stream Air Traffic + id: stream_air_traffic arguments: - observations: ${{ steps.generate_bluesky_sim_air_traffic_data_with_sensor_latency_issues.result }} - session_ids: ${{ steps.Fetch Session IDs.result }} + provider: bluesky + target: flight_blender + data_quality: latency diff --git a/scenarios/opensky_live_data.yaml b/scenarios/opensky_live_data.yaml index 0b4729e..6726ecb 100644 --- a/scenarios/opensky_live_data.yaml +++ b/scenarios/opensky_live_data.yaml @@ -1,24 +1,10 @@ name: opensky_live_data description: Fetch live flight data from OpenSky and submit to Flight Blender. -groups: - fetch_and_submit_opensky: - description: Fetches OpenSky data and submits it to Flight Blender - steps: - - id: fetch - step: Fetch OpenSky Data - - - id: submit - step: Submit Air Traffic - arguments: - observations: ${{ steps.fetch.result }} - - - id: wait - step: Wait X seconds - arguments: - duration: 3 - steps: - - step: fetch_and_submit_opensky - loop: - count: 5 + - step: Stream Air Traffic + id: stream_opensky + arguments: + provider: opensky + duration: 3 + target: flight_blender diff --git a/scenarios/openutm_sim_air_traffic_data.yaml b/scenarios/openutm_sim_air_traffic_data.yaml index 6f79c4a..f4c0376 100644 --- a/scenarios/openutm_sim_air_traffic_data.yaml +++ b/scenarios/openutm_sim_air_traffic_data.yaml @@ -1,10 +1,9 @@ name: openutm_sim_air_traffic_data -description: Generate simulated air traffic data using OpenSky client and submit to Flight Blender. +description: Generate simulated air traffic data using GeoJSON provider and submit to Flight Blender. steps: - - step: Generate Simulated Air Traffic Data - - step: Fetch Session IDs - - step: Submit Simulated Air Traffic + - step: Stream Air Traffic + id: stream_air_traffic arguments: - observations: ${{ steps.Generate Simulated Air Traffic Data.result }} - session_ids: ${{ steps.Fetch Session IDs.result }} + provider: geojson + target: flight_blender diff --git a/scenarios/sdsp_track.yaml b/scenarios/sdsp_track.yaml index c3013d9..bec7867 100644 --- a/scenarios/sdsp_track.yaml +++ b/scenarios/sdsp_track.yaml @@ -7,10 +7,11 @@ steps: arguments: session_id: ${{ steps.Generate UUID.result }} action: START -- step: Generate Simulated Air Traffic Data -- step: Submit Simulated Air Traffic +- id: stream_air_traffic + step: Stream Air Traffic arguments: - observations: ${{ steps.Generate Simulated Air Traffic Data.result }} + provider: geojson + target: flight_blender background: true - id: wait_2_seconds step: Wait X seconds @@ -31,4 +32,4 @@ steps: session_id: ${{ steps.Generate UUID.result }} action: STOP needs: - - Submit Simulated Air Traffic + - stream_air_traffic diff --git a/scenarios/verify_sdsp_metrics.yaml b/scenarios/verify_sdsp_metrics.yaml index 4c1813d..5eabd2d 100644 --- a/scenarios/verify_sdsp_metrics.yaml +++ b/scenarios/verify_sdsp_metrics.yaml @@ -1,14 +1,12 @@ name: verify_sdsp_metrics description: Verify that the SDSP is correctly reporting metrics based on submitted air traffic data. steps: -- id: generate_bayesian_simulation_air_traffic_data - step: Generate Bayesian Simulation Air Traffic Data -- step: Fetch Session IDs for Bayesian Simulation -- step: Submit Simulated Air Traffic - description: The observations will be generated and submitted using a session_id to the air traffic endpoint, this session_id is different from the one use to create the SDSP session. +- id: stream_air_traffic + step: Stream Air Traffic + description: Generate and submit Bayesian air traffic data in the background. arguments: - observations: ${{ steps.generate_bayesian_simulation_air_traffic_data.result }} - session_ids: ${{ steps.Fetch Session IDs for Bayesian Simulation.result }} + provider: bayesian + target: flight_blender background: true - id: wait_1_seconds step: Wait X seconds @@ -28,7 +26,7 @@ steps: duration: 10 - step: Verify Reported Metrics in Flight Blender arguments: - observations: ${{ steps.generate_bayesian_simulation_air_traffic_data.result }} + observations: ${{ steps.stream_air_traffic.result.observations }} session_id: ${{ steps.generated_sdsp_session_id.result }} - id: stop_sdsp_session step: Start / Stop SDSP Session @@ -36,4 +34,4 @@ steps: session_id: ${{ steps.generated_sdsp_session_id.result }} action: STOP needs: - - Submit Simulated Air Traffic + - stream_air_traffic diff --git a/src/openutm_verification/core/execution/dependencies.py b/src/openutm_verification/core/execution/dependencies.py index 76f9b77..a0b2a4a 100644 --- a/src/openutm_verification/core/execution/dependencies.py +++ b/src/openutm_verification/core/execution/dependencies.py @@ -9,20 +9,6 @@ from loguru import logger from openutm_verification.auth.providers import get_auth_provider -from openutm_verification.core.clients.air_traffic.air_traffic_client import ( - AirTrafficClient, -) -from openutm_verification.core.clients.air_traffic.base_client import ( - AirTrafficSettings, - BayesianAirTrafficSettings, - BlueSkyAirTrafficSettings, -) -from openutm_verification.core.clients.air_traffic.bayesian_air_traffic_client import ( - BayesianTrafficClient, -) -from openutm_verification.core.clients.air_traffic.blue_sky_client import ( - BlueSkyClient, -) from openutm_verification.core.clients.amqp import ( AMQPClient, AMQPSettings, @@ -31,10 +17,6 @@ from openutm_verification.core.clients.flight_blender.flight_blender_client import ( FlightBlenderClient, ) -from openutm_verification.core.clients.opensky.base_client import ( - OpenSkySettings, -) -from openutm_verification.core.clients.opensky.opensky_client import OpenSkyClient from openutm_verification.core.execution.config_models import ( AppConfig, DataFiles, @@ -174,25 +156,6 @@ async def flight_blender_client(config: AppConfig, data_files: DataFiles) -> Asy yield fb_client -@dependency(OpenSkyClient) -async def opensky_client(config: AppConfig) -> AsyncGenerator[OpenSkyClient, None]: - """Provides an OpenSkyClient instance for dependency injection.""" - settings = OpenSkySettings.from_config(config.opensky) - async with OpenSkyClient(settings) as client: - yield client - - -@dependency(AirTrafficClient) -async def air_traffic_client(config: AppConfig, data_files: DataFiles) -> AsyncGenerator[AirTrafficClient, None]: - """Provides an AirTrafficClient instance for dependency injection.""" - settings = AirTrafficSettings.from_config( - config.air_traffic_simulator_settings, - trajectory_path=data_files.trajectory, - ) - async with AirTrafficClient(settings) as client: - yield client - - @dependency(SessionManager) async def session_manager() -> AsyncGenerator[SessionManager, None]: yield SessionManager() @@ -203,28 +166,6 @@ async def common_client() -> AsyncGenerator[CommonClient, None]: yield CommonClient() -@dependency(BlueSkyClient) -async def bluesky_client(config: AppConfig, data_files: DataFiles) -> AsyncGenerator[BlueSkyClient, None]: - """Provides a BlueSkyClient instance for dependency injection.""" - settings = BlueSkyAirTrafficSettings.from_config( - config.blue_sky_air_traffic_simulator_settings, - simulation_path=data_files.simulation, - ) - async with BlueSkyClient(settings) as client: - yield client - - -@dependency(BayesianTrafficClient) -async def bayesian_air_traffic_client(config: AppConfig, data_files: DataFiles) -> AsyncGenerator[BayesianTrafficClient, None]: - """Provides a BayesianTrafficClient instance for dependency injection.""" - settings = BayesianAirTrafficSettings.from_config( - config.bayesian_air_traffic_simulator_settings, - simulation_path=data_files.simulation, - ) - async with BayesianTrafficClient(settings) as client: - yield client - - @dependency(AMQPClient) async def amqp_client(config: AppConfig) -> AsyncGenerator[AMQPClient, None]: """Provides an AMQPClient instance for dependency injection.""" diff --git a/src/openutm_verification/core/providers/__init__.py b/src/openutm_verification/core/providers/__init__.py index 6ce8c3e..f05b2b9 100644 --- a/src/openutm_verification/core/providers/__init__.py +++ b/src/openutm_verification/core/providers/__init__.py @@ -4,12 +4,14 @@ """ from .factory import ProviderType, create_provider +from .latency import DataQualityType from .opensky_provider import DEFAULT_SWITZERLAND_VIEWPORT from .protocol import AirTrafficProvider __all__ = [ "AirTrafficProvider", "DEFAULT_SWITZERLAND_VIEWPORT", + "DataQualityType", "ProviderType", "create_provider", ] diff --git a/src/openutm_verification/core/providers/bayesian_provider.py b/src/openutm_verification/core/providers/bayesian_provider.py index 4facc68..ed991c1 100644 --- a/src/openutm_verification/core/providers/bayesian_provider.py +++ b/src/openutm_verification/core/providers/bayesian_provider.py @@ -8,6 +8,7 @@ from openutm_verification.core.clients.air_traffic.bayesian_air_traffic_client import ( BayesianTrafficClient, ) +from openutm_verification.core.reporting.reporting_models import Status from openutm_verification.simulator.models.flight_data_types import ( FlightObservationSchema, ) @@ -90,9 +91,11 @@ async def get_observations( ) async with BayesianTrafficClient(settings) as client: - result = await client.generate_bayesian_sim_air_traffic_data( + step_result = await client.generate_bayesian_sim_air_traffic_data( config_path=self._config_path, duration=effective_duration, ) + if step_result.status == Status.FAIL: + raise RuntimeError(step_result.error_message or "Bayesian generation failed") # Handle case where Bayesian client returns None or empty - return result if result else [] + return step_result.result if step_result.result else [] diff --git a/src/openutm_verification/core/providers/bluesky_provider.py b/src/openutm_verification/core/providers/bluesky_provider.py index a69ecd2..7a77a54 100644 --- a/src/openutm_verification/core/providers/bluesky_provider.py +++ b/src/openutm_verification/core/providers/bluesky_provider.py @@ -8,6 +8,7 @@ from openutm_verification.core.clients.air_traffic.blue_sky_client import ( BlueSkyClient, ) +from openutm_verification.core.reporting.reporting_models import Status from openutm_verification.simulator.models.flight_data_types import ( FlightObservationSchema, ) @@ -90,7 +91,10 @@ async def get_observations( ) async with BlueSkyClient(settings) as client: - return await client.generate_bluesky_sim_air_traffic_data( + step_result = await client.generate_bluesky_sim_air_traffic_data( config_path=self._config_path, duration=effective_duration, ) + if step_result.status == Status.FAIL: + raise RuntimeError(step_result.error_message or "BlueSky generation failed") + return step_result.result diff --git a/src/openutm_verification/core/providers/factory.py b/src/openutm_verification/core/providers/factory.py index b494c72..a05c127 100644 --- a/src/openutm_verification/core/providers/factory.py +++ b/src/openutm_verification/core/providers/factory.py @@ -5,6 +5,7 @@ from .bayesian_provider import BayesianProvider from .bluesky_provider import BlueSkyProvider from .geojson_provider import GeoJSONProvider +from .latency import DataQualityType, LatencyProviderWrapper from .opensky_provider import OpenSkyProvider from .protocol import AirTrafficProvider @@ -20,6 +21,7 @@ def create_provider( sensor_ids: list[str] | None = None, session_ids: list[str] | None = None, viewport: tuple[float, float, float, float] | None = None, + data_quality: DataQualityType = "nominal", **kwargs, ) -> AirTrafficProvider: """Factory function to create providers by name. @@ -32,10 +34,11 @@ def create_provider( sensor_ids: List of sensor UUID strings. session_ids: List of session UUID strings. viewport: Geographic bounds for OpenSky (lat_min, lat_max, lon_min, lon_max). + data_quality: Data quality mode - "nominal" or "latency". **kwargs: Additional provider-specific arguments. Returns: - An AirTrafficProvider instance. + An AirTrafficProvider instance, optionally wrapped with latency simulation. Raises: ValueError: If the provider name is not recognized. @@ -50,7 +53,7 @@ def create_provider( if name not in providers: raise ValueError(f"Unknown provider: {name}. Available: {list(providers.keys())}") - return providers[name].from_kwargs( + provider = providers[name].from_kwargs( config_path=config_path, number_of_aircraft=number_of_aircraft, duration=duration, @@ -59,3 +62,8 @@ def create_provider( viewport=viewport, **kwargs, ) + + if data_quality == "latency": + return LatencyProviderWrapper(provider) + + return provider diff --git a/src/openutm_verification/core/providers/geojson_provider.py b/src/openutm_verification/core/providers/geojson_provider.py index 8b59e99..62e43c2 100644 --- a/src/openutm_verification/core/providers/geojson_provider.py +++ b/src/openutm_verification/core/providers/geojson_provider.py @@ -10,6 +10,7 @@ from openutm_verification.core.clients.air_traffic.base_client import ( AirTrafficSettings, ) +from openutm_verification.core.reporting.reporting_models import Status if TYPE_CHECKING: from openutm_verification.simulator.models.flight_data_types import ( @@ -93,7 +94,10 @@ async def get_observations( ) async with AirTrafficClient(settings) as client: - return await client.generate_simulated_air_traffic_data( + step_result = await client.generate_simulated_air_traffic_data( config_path=self._config_path, duration=effective_duration, ) + if step_result.status == Status.FAIL: + raise RuntimeError(step_result.error_message or "GeoJSON generation failed") + return step_result.result diff --git a/src/openutm_verification/core/providers/latency.py b/src/openutm_verification/core/providers/latency.py new file mode 100644 index 0000000..892b988 --- /dev/null +++ b/src/openutm_verification/core/providers/latency.py @@ -0,0 +1,102 @@ +"""Latency simulation utilities for air traffic providers. + +Applies realistic sensor latency effects to observation data: +- Random observation drops (simulating missed readings) +- Timestamp shifts (simulating delayed sensor data) + +These effects are consistent across all providers (GeoJSON, BlueSky, Bayesian) +and match the original per-client implementations. +""" + +from __future__ import annotations + +import random +from typing import TYPE_CHECKING, Literal + +from loguru import logger + +if TYPE_CHECKING: + from openutm_verification.core.providers.protocol import AirTrafficProvider + from openutm_verification.simulator.models.flight_data_types import ( + FlightObservationSchema, + ) + +# Default latency parameters — consistent with the original per-client implementations +LATENCY_PROBABILITY = 0.1 # 10% chance per observation +TIMESTAMP_SHIFT_RANGE_SECONDS = (-1, 2.5) # Shift range in seconds + +DataQualityType = Literal["nominal", "latency"] + + +def apply_latency( + observations: list[list[FlightObservationSchema]], + *, + latency_probability: float = LATENCY_PROBABILITY, + timestamp_shift_range: tuple[float, float] = TIMESTAMP_SHIFT_RANGE_SECONDS, +) -> list[list[FlightObservationSchema]]: + """Apply simulated sensor latency effects to observations. + + For each observation, there is a configurable probability of being affected: + - 50% chance: drop the observation entirely (simulating missed readings) + - 50% chance: shift the timestamp (simulating delayed sensor data) + + Args: + observations: List of observation lists per aircraft. + latency_probability: Probability each observation is affected (0.0-1.0). + timestamp_shift_range: Range (min, max) for timestamp shifts in seconds. + + Returns: + Modified observation lists with latency effects applied. + """ + modified_observations = [] + total_dropped = 0 + total_shifted = 0 + + for track_observations in observations: + modified_track = [] + for obs in track_observations: + if random.random() < latency_probability: + if random.random() < 0.5: + # Drop the observation entirely + total_dropped += 1 + continue + # Shift the timestamp + shift_seconds = random.uniform(*timestamp_shift_range) + obs.timestamp += int(shift_seconds * 1000) + total_shifted += 1 + modified_track.append(obs) + modified_observations.append(modified_track) + + logger.info(f"Latency simulation applied: {total_dropped} observations dropped, {total_shifted} timestamps shifted") + return modified_observations + + +class LatencyProviderWrapper: + """Wraps an air traffic provider to add latency simulation to its observations. + + This decorator pattern preserves the original provider's name while applying + latency post-processing to the generated observations. + """ + + def __init__(self, inner: AirTrafficProvider): + self._inner = inner + + @property + def name(self) -> str: + """Provider identifier (passes through to inner provider).""" + return self._inner.name + + async def get_observations( + self, + duration: int | None = None, + ) -> list[list[FlightObservationSchema]]: + """Get observations with latency effects applied. + + Args: + duration: Override duration in seconds. + + Returns: + Observation lists with simulated latency effects. + """ + observations = await self._inner.get_observations(duration=duration) + return apply_latency(observations) diff --git a/src/openutm_verification/core/steps/air_traffic_step.py b/src/openutm_verification/core/steps/air_traffic_step.py index 5b44882..5004e97 100644 --- a/src/openutm_verification/core/steps/air_traffic_step.py +++ b/src/openutm_verification/core/steps/air_traffic_step.py @@ -4,9 +4,44 @@ replacing the multiple provider-specific steps with a unified interface. """ +from __future__ import annotations + +from loguru import logger + +from openutm_verification.core.execution.config_models import get_settings +from openutm_verification.core.execution.dependency_resolution import CONTEXT from openutm_verification.core.execution.scenario_runner import scenario_step -from openutm_verification.core.providers import ProviderType, create_provider -from openutm_verification.core.streamers import StreamResult, TargetType, create_streamer +from openutm_verification.core.providers import DataQualityType, ProviderType, create_provider +from openutm_verification.core.streamers import RefreshModeType, StreamResult, TargetType, create_streamer + + +def _get_data_file_path(field_name: str) -> str | None: + """Get a data file path from the current suite context or global config. + + Checks the current CONTEXT for suite-specific overrides first, + then falls back to the global config data_files. + + Args: + field_name: Data file field name (e.g., "trajectory", "simulation"). + + Returns: + The resolved file path, or None if not configured. + """ + try: + context = CONTEXT.get() + suite_scenario = context.get("suite_scenario") if context else None + if suite_scenario and hasattr(suite_scenario, field_name): + value = getattr(suite_scenario, field_name, None) + if value: + return value + except (LookupError, AttributeError): + pass + + try: + config = get_settings() + return getattr(config.data_files, field_name, None) + except Exception: + return None class AirTrafficStepClient: @@ -14,7 +49,8 @@ class AirTrafficStepClient: This client wraps the provider/streamer architecture to expose a single scenario step that can handle all air traffic generation and streaming - operations. + operations. When step arguments are not provided, defaults are read + from the application configuration. """ async def __aenter__(self): @@ -23,19 +59,105 @@ async def __aenter__(self): async def __aexit__(self, exc_type, exc_val, exc_tb): pass + def _apply_config_defaults( + self, + provider: ProviderType, + *, + duration: int | None, + config_path: str | None, + number_of_aircraft: int | None, + sensor_ids: list[str] | None, + session_ids: list[str] | None, + ) -> tuple[int, str | None, int | None, list[str] | None, list[str] | None]: + """Fill in missing parameters from application configuration. + + Reads defaults from the provider-specific config section when + step arguments are not explicitly provided. + + Args: + provider: The provider type to read defaults for. + duration: Explicit duration or None. + config_path: Explicit config path or None. + number_of_aircraft: Explicit count or None. + sensor_ids: Explicit sensor IDs or None. + session_ids: Explicit session IDs or None. + + Returns: + Tuple of (duration, config_path, number_of_aircraft, sensor_ids, session_ids) + with defaults filled in. + """ + try: + app_config = get_settings() + except Exception: + logger.debug("Could not read application config for defaults, using step arguments only.") + return (duration or 30, config_path, number_of_aircraft, sensor_ids, session_ids) + + try: + if provider == "geojson": + sim = app_config.air_traffic_simulator_settings + if duration is None: + duration = sim.simulation_duration if hasattr(sim, "simulation_duration") else 30 + if number_of_aircraft is None: + number_of_aircraft = sim.number_of_aircraft + if sensor_ids is None and sim.sensor_ids: + sensor_ids = sim.sensor_ids + if session_ids is None and sim.session_ids: + session_ids = sim.session_ids + if config_path is None: + config_path = _get_data_file_path("trajectory") + + elif provider == "bluesky": + sim = app_config.blue_sky_air_traffic_simulator_settings + if duration is None: + duration = sim.simulation_duration_seconds + if number_of_aircraft is None: + number_of_aircraft = sim.number_of_aircraft + if sensor_ids is None and sim.sensor_ids: + sensor_ids = sim.sensor_ids + if session_ids is None and sim.session_ids: + session_ids = sim.session_ids + if config_path is None: + config_path = _get_data_file_path("simulation") + + elif provider == "bayesian": + sim = app_config.bayesian_air_traffic_simulator_settings + if duration is None: + duration = sim.simulation_duration_seconds + if number_of_aircraft is None: + number_of_aircraft = sim.number_of_aircraft + if sensor_ids is None and sim.sensor_ids: + sensor_ids = sim.sensor_ids + if session_ids is None and sim.session_ids: + session_ids = sim.session_ids + + elif provider == "opensky": + pass # OpenSky reads its own config in the provider + except Exception: + logger.debug("Could not read application config for defaults, using step arguments only.") + + # Ensure duration has a value + if duration is None: + duration = 30 + + return (duration, config_path, number_of_aircraft, sensor_ids, session_ids) + @scenario_step("Stream Air Traffic") async def stream_air_traffic( self, provider: ProviderType, - duration: int, + duration: int | None = None, target: TargetType = "flight_blender", *, - # Provider settings (optional overrides) + # Provider settings (optional overrides — defaults read from config) config_path: str | None = None, number_of_aircraft: int | None = None, sensor_ids: list[str] | None = None, session_ids: list[str] | None = None, viewport: tuple[float, float, float, float] | None = None, + # Data quality mode + data_quality: DataQualityType = "nominal", + # Streamer settings + refresh_mode: RefreshModeType = "normal", ) -> StreamResult: """Stream air traffic data from a provider to a target system. @@ -43,15 +165,20 @@ async def stream_air_traffic( Supports synthetic data generation (GeoJSON, BlueSky, Bayesian) and live data fetching (OpenSky Network). + When arguments are not provided, defaults are read from the application + configuration (e.g., air_traffic_simulator_settings, data_files). + Args: provider: Data source - geojson, bluesky, bayesian, or opensky. - duration: Streaming duration in seconds. + duration: Streaming duration in seconds (defaults from config). target: Delivery target - flight_blender, amqp, or none (default: flight_blender). - config_path: Path to configuration file (provider-specific). - number_of_aircraft: Number of aircraft to simulate. - sensor_ids: Sensor UUIDs for observations. - session_ids: Session UUIDs for grouping. + config_path: Path to configuration file (provider-specific, defaults from data_files). + number_of_aircraft: Number of aircraft to simulate (defaults from config). + sensor_ids: Sensor UUIDs for observations (defaults from config). + session_ids: Session UUIDs for grouping (defaults from config). viewport: Geographic bounds for OpenSky (lat_min, lat_max, lon_min, lon_max). + data_quality: Data quality mode - "nominal" or "latency" for simulated sensor issues. + refresh_mode: Submission mode - "normal" or "varying" for corrupted timestamps. Returns: StreamResult with success status, counts, and optionally the observations. @@ -63,7 +190,28 @@ async def stream_air_traffic( duration: 30 target: flight_blender config_path: config/bern/trajectory.geojson + + # Minimal form (reads all defaults from config): + - step: Stream Air Traffic + arguments: + provider: bayesian + + # With latency simulation: + - step: Stream Air Traffic + arguments: + provider: bluesky + data_quality: latency """ + # Apply config defaults for any unset parameters + duration, config_path, number_of_aircraft, sensor_ids, session_ids = self._apply_config_defaults( + provider, + duration=duration, + config_path=config_path, + number_of_aircraft=number_of_aircraft, + sensor_ids=sensor_ids, + session_ids=session_ids, + ) + # Build provider from arguments provider_instance = create_provider( name=provider, @@ -73,16 +221,24 @@ async def stream_air_traffic( sensor_ids=sensor_ids, session_ids=session_ids, viewport=viewport, + data_quality=data_quality, ) # Build streamer (or null streamer for target=none) streamer_instance = create_streamer( name=target, session_ids=session_ids, + refresh_mode=refresh_mode, ) # Execute streaming - return await streamer_instance.stream_from_provider( + stream_result = await streamer_instance.stream_from_provider( provider=provider_instance, duration_seconds=duration, ) + + # If streaming failed, raise so the scenario step is marked as failed + if not stream_result.success: + raise RuntimeError(f"Air traffic streaming failed: {stream_result.errors}") + + return stream_result diff --git a/src/openutm_verification/core/streamers/__init__.py b/src/openutm_verification/core/streamers/__init__.py index beabeb3..c57ccce 100644 --- a/src/openutm_verification/core/streamers/__init__.py +++ b/src/openutm_verification/core/streamers/__init__.py @@ -4,10 +4,12 @@ """ from .factory import TargetType, create_streamer +from .flight_blender_streamer import RefreshModeType from .protocol import AirTrafficStreamer, StreamResult __all__ = [ "AirTrafficStreamer", + "RefreshModeType", "StreamResult", "TargetType", "create_streamer", diff --git a/src/openutm_verification/core/streamers/factory.py b/src/openutm_verification/core/streamers/factory.py index a1b14a2..d0a98f3 100644 --- a/src/openutm_verification/core/streamers/factory.py +++ b/src/openutm_verification/core/streamers/factory.py @@ -3,7 +3,7 @@ from typing import Literal from .amqp_streamer import AMQPStreamer -from .flight_blender_streamer import FlightBlenderStreamer +from .flight_blender_streamer import FlightBlenderStreamer, RefreshModeType from .null_streamer import NullStreamer from .protocol import AirTrafficStreamer @@ -14,6 +14,7 @@ def create_streamer( name: TargetType, *, session_ids: list[str] | None = None, + refresh_mode: RefreshModeType = "normal", **kwargs, ) -> AirTrafficStreamer: """Factory function to create streamers by name. @@ -21,6 +22,7 @@ def create_streamer( Args: name: Target type - flight_blender, amqp, or none. session_ids: Optional list of session UUID strings (for flight_blender). + refresh_mode: Submission mode for flight_blender - "normal" or "varying". **kwargs: Additional streamer-specific arguments. Returns: @@ -38,4 +40,4 @@ def create_streamer( if name not in streamers: raise ValueError(f"Unknown streamer: {name}. Available: {list(streamers.keys())}") - return streamers[name].from_kwargs(session_ids=session_ids, **kwargs) + return streamers[name].from_kwargs(session_ids=session_ids, refresh_mode=refresh_mode, **kwargs) diff --git a/src/openutm_verification/core/streamers/flight_blender_streamer.py b/src/openutm_verification/core/streamers/flight_blender_streamer.py index e435cf9..8dd5bde 100644 --- a/src/openutm_verification/core/streamers/flight_blender_streamer.py +++ b/src/openutm_verification/core/streamers/flight_blender_streamer.py @@ -6,8 +6,9 @@ from __future__ import annotations +import random import uuid -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Literal from loguru import logger @@ -15,27 +16,38 @@ FlightBlenderClient, ) from openutm_verification.core.execution.config_models import get_settings +from openutm_verification.core.reporting.reporting_models import Status, StepResult from .protocol import StreamResult if TYPE_CHECKING: from openutm_verification.core.providers.protocol import AirTrafficProvider +RefreshModeType = Literal["normal", "varying"] + class FlightBlenderStreamer: """Streamer that sends observations to Flight Blender via HTTP API. - Wraps the existing FlightBlenderClient's submit_simulated_air_traffic - method to provide the unified streaming interface. + Wraps the existing FlightBlenderClient's submit methods to provide + the unified streaming interface. Supports both normal and varying + refresh rate submission modes. """ - def __init__(self, session_ids: list[uuid.UUID] | None = None): + def __init__( + self, + session_ids: list[uuid.UUID] | None = None, + refresh_mode: RefreshModeType = "normal", + ): """Initialize the Flight Blender streamer. Args: session_ids: Optional list of session UUIDs for grouping observations. + refresh_mode: Submission mode - "normal" for standard real-time playback, + "varying" for corrupted timestamps simulating malfunctioning sensors. """ self._session_ids = session_ids + self._refresh_mode = refresh_mode @property def name(self) -> str: @@ -46,12 +58,14 @@ def name(self) -> str: def from_kwargs( cls, session_ids: list[str] | None = None, + refresh_mode: RefreshModeType = "normal", **_kwargs, ) -> "FlightBlenderStreamer": """Factory method to create streamer from configuration. Args: session_ids: Optional list of session UUID strings. + refresh_mode: Submission mode - "normal" or "varying". """ parsed_ids = None if session_ids: @@ -59,7 +73,7 @@ def from_kwargs( parsed_ids = [uuid.UUID(sid) for sid in session_ids] except ValueError: logger.warning("Invalid session ID format detected, will auto-generate. Ensure session IDs are valid UUIDs.") - return cls(session_ids=parsed_ids) + return cls(session_ids=parsed_ids, refresh_mode=refresh_mode) def _make_result( self, @@ -81,9 +95,56 @@ def _make_result( total_observations=total_observations, total_batches=total_batches, errors=errors or [], - observations=observations or [], + observations=observations if observations is not None else None, ) + @staticmethod + def _corrupt_timestamps( + observations: list[list], + ) -> list[list]: + """Apply timestamp corruption to simulate malfunctioning sensors. + + Randomly applies anomalies to observation timestamps: + - 30% chance: stale timestamp (repeat previous) + - 20% chance: backward jump (10-60s into the past) + - 15% chance: forward jump (10-60s into the future) + - 35% chance: keep original (normal) + + Args: + observations: List of observation lists per aircraft. + + Returns: + New observation lists with corrupted timestamps. + """ + corrupted_observations = [] + for aircraft_obs in observations: + corrupted_aircraft_obs = [] + last_used_timestamp: int | None = None + for obs in aircraft_obs: + original_timestamp = obs.timestamp + anomaly_roll = random.random() + + if anomaly_roll < 0.3 and last_used_timestamp is not None: + new_timestamp = last_used_timestamp + logger.debug(f"[off-nominal] Stale timestamp for {obs.icao_address}: kept {new_timestamp} instead of {original_timestamp}") + elif anomaly_roll < 0.5: + offset = random.randint(10, 60) + new_timestamp = original_timestamp - offset + logger.debug(f"[off-nominal] Backward jump for {obs.icao_address}: {original_timestamp} -> {new_timestamp} (-{offset}s)") + elif anomaly_roll < 0.65: + offset = random.randint(10, 60) + new_timestamp = original_timestamp + offset + logger.debug(f"[off-nominal] Forward jump for {obs.icao_address}: {original_timestamp} -> {new_timestamp} (+{offset}s)") + else: + new_timestamp = original_timestamp + + corrupted_obs = obs.model_copy(update={"timestamp": new_timestamp}) + corrupted_aircraft_obs.append(corrupted_obs) + last_used_timestamp = new_timestamp + + corrupted_observations.append(corrupted_aircraft_obs) + return corrupted_observations + async def stream_from_provider( self, provider: "AirTrafficProvider", @@ -91,8 +152,10 @@ async def stream_from_provider( ) -> StreamResult: """Stream observations from provider to Flight Blender. - Gets observations from the provider, then submits them to Flight Blender - in real-time playback mode (one observation per second per aircraft). + Gets observations from the provider, then submits them to Flight Blender. + In "normal" mode, submits using standard real-time playback. + In "varying" mode, corrupts timestamps before submission to simulate + malfunctioning sensors. Args: provider: The air traffic provider to get observations from. @@ -143,18 +206,42 @@ async def stream_from_provider( observations=observations, ) + # Apply timestamp corruption for varying refresh mode + submit_observations = observations + if self._refresh_mode == "varying": + logger.info("Applying timestamp corruption for varying refresh rate submission") + submit_observations = self._corrupt_timestamps(observations) + try: async with FlightBlenderClient( base_url=config.flight_blender.url, credentials={"username": username, "password": password}, ) as client: - result = await client.submit_simulated_air_traffic( - observations=observations, + # Choose submission method based on refresh mode + if self._refresh_mode == "varying": + submit_fn = client.submit_simulated_air_traffic_at_random_refresh_rates + else: + submit_fn = client.submit_simulated_air_traffic + + step_result = await submit_fn( + observations=submit_observations, session_ids=self._session_ids, ) + # The @scenario_step decorator wraps the return in StepResult. + # Extract success from the inner result. + if isinstance(step_result, StepResult): + if step_result.status == Status.FAIL: + raise RuntimeError(step_result.error_message or "Submission failed") + raw = step_result.result + success = raw.get("success", False) if isinstance(raw, dict) else True + elif isinstance(step_result, dict): + success = step_result.get("success", False) + else: + success = True + return self._make_result( - success=result.get("success", False), + success=success, provider_name=provider.name, duration_seconds=duration_seconds, total_observations=sum(len(batch) for batch in observations), @@ -163,11 +250,5 @@ async def stream_from_provider( ) except Exception as e: - logger.error(f"Flight Blender streaming failed: {e}") - return self._make_result( - success=False, - provider_name=provider.name, - duration_seconds=duration_seconds, - errors=[str(e)], - observations=observations, - ) + logger.exception(f"Flight Blender streaming failed: {e}") + raise diff --git a/tests/test_group_execution.py b/tests/test_group_execution.py index e206f01..ce19ca4 100644 --- a/tests/test_group_execution.py +++ b/tests/test_group_execution.py @@ -50,7 +50,10 @@ async def test_group_with_loop(): fetch_data: steps: - id: fetch - step: Fetch OpenSky Data + step: Stream Air Traffic + arguments: + provider: opensky + target: flight_blender - id: wait step: Wait X seconds arguments: @@ -86,11 +89,14 @@ async def test_group_references_within_group(): process_data: steps: - id: fetch - step: Fetch OpenSky Data + step: Stream Air Traffic + arguments: + provider: opensky + target: flight_blender - id: submit - step: Submit Air Traffic + step: Wait X seconds arguments: - observations: ${{ group.fetch.result }} + duration: 1 steps: - step: process_data @@ -103,8 +109,8 @@ async def test_group_references_within_group(): assert "process_data" in scenario.groups group = scenario.groups["process_data"] - # Verify the submit step has a reference to fetch - assert group.steps[1].arguments["observations"] == "${{ group.fetch.result }}" + # Verify the submit step has correct arguments + assert group.steps[1].arguments["duration"] == 1 @pytest.mark.asyncio diff --git a/tests/test_step_status_updates.py b/tests/test_step_status_updates.py index e98c6b1..8311d91 100644 --- a/tests/test_step_status_updates.py +++ b/tests/test_step_status_updates.py @@ -43,7 +43,7 @@ async def test_step_status_fail_on_validation_error(self, session_manager): session_manager.session_context = mock_context session_manager.session_resolver = MagicMock() - step = StepDefinition(id="test_step", step="Submit Air Traffic", arguments={"observations": None}) + step = StepDefinition(id="test_step", step="Stream Air Traffic", arguments={"provider": "opensky", "target": "flight_blender"}) # Simulate a validation error by making _execute_step raise with patch.object(session_manager, "_execute_step") as mock_execute: @@ -117,8 +117,8 @@ async def test_remaining_group_steps_marked_skip_on_failure(self, session_manage "my_group": GroupDefinition( description="Test group", steps=[ - StepDefinition(id="step1", step="Fetch OpenSky Data"), - StepDefinition(id="step2", step="Submit Air Traffic", arguments={"observations": []}), + StepDefinition(id="step1", step="Stream Air Traffic", arguments={"provider": "opensky", "target": "flight_blender"}), + StepDefinition(id="step2", step="Wait X seconds", arguments={"duration": 1}), StepDefinition(id="step3", step="Wait X seconds", arguments={"duration": 1}), ], ) @@ -201,11 +201,11 @@ async def test_group_step_with_condition_skipped(self, session_manager): "my_group": GroupDefinition( description="Test group", steps=[ - StepDefinition(id="fetch", step="Fetch OpenSky Data"), + StepDefinition(id="fetch", step="Stream Air Traffic", arguments={"provider": "opensky", "target": "flight_blender"}), StepDefinition( id="submit", - step="Submit Air Traffic", - arguments={"observations": []}, + step="Wait X seconds", + arguments={"duration": 1}, if_condition="steps.fetch.result != None", ), StepDefinition(id="wait", step="Wait X seconds", arguments={"duration": 1}), @@ -290,11 +290,11 @@ async def test_group_context_takes_priority_over_state_step_results(self, sessio "my_group": GroupDefinition( description="Test group", steps=[ - StepDefinition(id="fetch", step="Fetch OpenSky Data"), + StepDefinition(id="fetch", step="Stream Air Traffic", arguments={"provider": "opensky", "target": "flight_blender"}), StepDefinition( id="submit", - step="Submit Air Traffic", - arguments={"observations": "${{ steps.fetch.result }}"}, + step="Wait X seconds", + arguments={"duration": 1}, ), ], ) diff --git a/tests/test_stream_air_traffic.py b/tests/test_stream_air_traffic.py index 84cfb9c..7588744 100644 --- a/tests/test_stream_air_traffic.py +++ b/tests/test_stream_air_traffic.py @@ -7,12 +7,18 @@ from openutm_verification.core.providers import ProviderType, create_provider from openutm_verification.core.providers.geojson_provider import GeoJSONProvider from openutm_verification.core.providers.opensky_provider import OpenSkyProvider +from openutm_verification.core.reporting.reporting_models import Status, StepResult from openutm_verification.core.steps import AirTrafficStepClient from openutm_verification.core.streamers import StreamResult, TargetType, create_streamer from openutm_verification.core.streamers.null_streamer import NullStreamer from openutm_verification.simulator.models.flight_data_types import FlightObservationSchema +def _wrap_step_result(result, step_name="test"): + """Wrap a raw result in a StepResult as the @scenario_step decorator would.""" + return StepResult(name=step_name, status=Status.PASS, duration=0.0, result=result) + + class TestProviderFactory: """Tests for the provider factory.""" @@ -225,7 +231,9 @@ async def test_geojson_provider_instantiates_client_with_correct_settings(self, # Setup mock client instance mock_client_instance = AsyncMock() - mock_client_instance.generate_simulated_air_traffic_data = AsyncMock(return_value=mock_observations) + mock_client_instance.generate_simulated_air_traffic_data = AsyncMock( + return_value=_wrap_step_result(mock_observations, "Generate Simulated Air Traffic Data") + ) mock_client_class.return_value.__aenter__ = AsyncMock(return_value=mock_client_instance) mock_client_class.return_value.__aexit__ = AsyncMock(return_value=None) @@ -268,7 +276,9 @@ async def test_geojson_provider_uses_default_duration_when_not_overridden(self, mock_observations = _create_mock_observations() mock_client_instance = AsyncMock() - mock_client_instance.generate_simulated_air_traffic_data = AsyncMock(return_value=mock_observations) + mock_client_instance.generate_simulated_air_traffic_data = AsyncMock( + return_value=_wrap_step_result(mock_observations, "Generate Simulated Air Traffic Data") + ) mock_client_class.return_value.__aenter__ = AsyncMock(return_value=mock_client_instance) mock_client_class.return_value.__aexit__ = AsyncMock(return_value=None) @@ -306,7 +316,9 @@ async def test_bluesky_provider_instantiates_client_with_correct_settings(self, ] mock_client_instance = AsyncMock() - mock_client_instance.generate_bluesky_sim_air_traffic_data = AsyncMock(return_value=mock_observations) + mock_client_instance.generate_bluesky_sim_air_traffic_data = AsyncMock( + return_value=_wrap_step_result(mock_observations, "Generate BlueSky Simulation Air Traffic Data") + ) mock_client_class.return_value.__aenter__ = AsyncMock(return_value=mock_client_instance) mock_client_class.return_value.__aexit__ = AsyncMock(return_value=None) @@ -363,7 +375,9 @@ async def test_bayesian_provider_instantiates_client_with_correct_settings(self, ] mock_client_instance = AsyncMock() - mock_client_instance.generate_bayesian_sim_air_traffic_data = AsyncMock(return_value=mock_observations) + mock_client_instance.generate_bayesian_sim_air_traffic_data = AsyncMock( + return_value=_wrap_step_result(mock_observations, "Generate Bayesian Air Traffic Data") + ) mock_client_class.return_value.__aenter__ = AsyncMock(return_value=mock_client_instance) mock_client_class.return_value.__aexit__ = AsyncMock(return_value=None) @@ -396,7 +410,9 @@ async def test_bayesian_provider_handles_none_result(self, mock_client_class): from openutm_verification.core.providers.bayesian_provider import BayesianProvider mock_client_instance = AsyncMock() - mock_client_instance.generate_bayesian_sim_air_traffic_data = AsyncMock(return_value=None) + mock_client_instance.generate_bayesian_sim_air_traffic_data = AsyncMock( + return_value=_wrap_step_result(None, "Generate Bayesian Air Traffic Data") + ) mock_client_class.return_value.__aenter__ = AsyncMock(return_value=mock_client_instance) mock_client_class.return_value.__aexit__ = AsyncMock(return_value=None) @@ -506,7 +522,9 @@ async def test_flight_blender_streamer_submits_to_client(self, mock_get_settings mock_get_settings.return_value = mock_config mock_fb_client = AsyncMock() - mock_fb_client.submit_simulated_air_traffic = AsyncMock(return_value={"success": True, "observations_submitted": 1}) + mock_fb_client.submit_simulated_air_traffic = AsyncMock( + return_value=_wrap_step_result({"success": True, "observations_submitted": 1}, "Submit Simulated Air Traffic") + ) mock_fb_class.return_value.__aenter__ = AsyncMock(return_value=mock_fb_client) mock_fb_class.return_value.__aexit__ = AsyncMock(return_value=None) @@ -578,10 +596,8 @@ async def test_flight_blender_streamer_handles_client_error(self, mock_get_setti mock_provider.get_observations = AsyncMock(return_value=mock_observations) streamer = FlightBlenderStreamer() - result = await streamer.stream_from_provider(mock_provider, duration_seconds=30) - - assert result.success is False - assert "Connection refused" in result.errors[0] + with pytest.raises(Exception, match="Connection refused"): + await streamer.stream_from_provider(mock_provider, duration_seconds=30) class TestNullStreamerIntegration: @@ -658,7 +674,9 @@ async def test_stream_air_traffic_with_null_target(self, mock_client_class): # Mock the AirTrafficClient used by GeoJSONProvider mock_client_instance = AsyncMock() - mock_client_instance.generate_simulated_air_traffic_data = AsyncMock(return_value=mock_observations) + mock_client_instance.generate_simulated_air_traffic_data = AsyncMock( + return_value=_wrap_step_result(mock_observations, "Generate Simulated Air Traffic Data") + ) mock_client_class.return_value.__aenter__ = AsyncMock(return_value=mock_client_instance) mock_client_class.return_value.__aexit__ = AsyncMock(return_value=None) diff --git a/tests/test_ui_groups.py b/tests/test_ui_groups.py index 64d26e4..56eaf5d 100644 --- a/tests/test_ui_groups.py +++ b/tests/test_ui_groups.py @@ -13,8 +13,8 @@ def test_group_roundtrip(): "process_data": { "description": "Process some data", "steps": [ - {"id": "fetch", "step": "Fetch OpenSky Data"}, - {"id": "submit", "step": "Submit Air Traffic", "arguments": {"observations": "${{ group.fetch.result }}"}}, + {"id": "fetch", "step": "Stream Air Traffic", "arguments": {"provider": "opensky", "target": "flight_blender"}}, + {"id": "submit", "step": "Wait X seconds", "arguments": {"duration": 1}}, ], } }, @@ -29,7 +29,7 @@ def test_group_roundtrip(): assert scenario.groups["process_data"].description == "Process some data" assert len(scenario.groups["process_data"].steps) == 2 assert scenario.groups["process_data"].steps[0].id == "fetch" - assert scenario.groups["process_data"].steps[1].arguments["observations"] == "${{ group.fetch.result }}" + assert scenario.groups["process_data"].steps[1].arguments["duration"] == 1 # Verify the group is referenced in steps assert scenario.steps[0].step == "process_data" diff --git a/tests/test_yaml_scenarios.py b/tests/test_yaml_scenarios.py index f746100..7b89309 100644 --- a/tests/test_yaml_scenarios.py +++ b/tests/test_yaml_scenarios.py @@ -35,9 +35,8 @@ def mock_clients(): fb_client.start_stop_sdsp_session.return_value = "Session Started" # Mock methods that return objects with attributes accessed in YAML - # e.g. ${{ steps.Generate Simulated Air Traffic Data.result.result }} - # But wait, we changed it to just .result in the previous turn. - # Let's check if any other steps return complex objects. + # e.g. ${{ steps.stream_air_traffic.result.observations }} + # The unified Stream Air Traffic step returns StreamResult with observations. mocks["FlightBlenderClient"] = fb_client From 8cca28508a86ac2a5f67cc6c154d2f31891d5d5e Mon Sep 17 00:00:00 2001 From: Attila Kobor Date: Sat, 28 Feb 2026 14:19:06 +0100 Subject: [PATCH 2/8] fix auth config --- .../core/streamers/flight_blender_streamer.py | 25 +++++------------- tests/test_stream_air_traffic.py | 26 +++++++++++++------ 2 files changed, 25 insertions(+), 26 deletions(-) diff --git a/src/openutm_verification/core/streamers/flight_blender_streamer.py b/src/openutm_verification/core/streamers/flight_blender_streamer.py index 8dd5bde..49e3f43 100644 --- a/src/openutm_verification/core/streamers/flight_blender_streamer.py +++ b/src/openutm_verification/core/streamers/flight_blender_streamer.py @@ -12,6 +12,7 @@ from loguru import logger +from openutm_verification.auth.providers import get_auth_provider from openutm_verification.core.clients.flight_blender.flight_blender_client import ( FlightBlenderClient, ) @@ -188,23 +189,11 @@ async def stream_from_provider( observations=observations, ) - username = config.flight_blender.auth.username - password = config.flight_blender.auth.password - - if not username or not password: - error_msg = ( - "Flight Blender credentials are not configured. " - "Please set 'flight_blender.auth.username' and " - "'flight_blender.auth.password' in your configuration." - ) - logger.error(error_msg) - return self._make_result( - success=False, - provider_name=provider.name, - duration_seconds=duration_seconds, - errors=[error_msg], - observations=observations, - ) + auth_provider = get_auth_provider(config.flight_blender.auth) + credentials = auth_provider.get_cached_credentials( + audience=config.flight_blender.auth.audience, + scopes=config.flight_blender.auth.scopes, + ) # Apply timestamp corruption for varying refresh mode submit_observations = observations @@ -215,7 +204,7 @@ async def stream_from_provider( try: async with FlightBlenderClient( base_url=config.flight_blender.url, - credentials={"username": username, "password": password}, + credentials=credentials, ) as client: # Choose submission method based on refresh mode if self._refresh_mode == "varying": diff --git a/tests/test_stream_air_traffic.py b/tests/test_stream_air_traffic.py index 7588744..a5e1724 100644 --- a/tests/test_stream_air_traffic.py +++ b/tests/test_stream_air_traffic.py @@ -507,8 +507,9 @@ class TestFlightBlenderStreamerIntegration: @pytest.mark.asyncio @patch("openutm_verification.core.streamers.flight_blender_streamer.FlightBlenderClient") + @patch("openutm_verification.core.streamers.flight_blender_streamer.get_auth_provider") @patch("openutm_verification.core.streamers.flight_blender_streamer.get_settings") - async def test_flight_blender_streamer_submits_to_client(self, mock_get_settings, mock_fb_class): + async def test_flight_blender_streamer_submits_to_client(self, mock_get_settings, mock_get_auth, mock_fb_class): """Test that FlightBlenderStreamer properly submits observations to FlightBlenderClient.""" from openutm_verification.core.streamers.flight_blender_streamer import FlightBlenderStreamer @@ -517,10 +518,15 @@ async def test_flight_blender_streamer_submits_to_client(self, mock_get_settings # Mock get_settings mock_config = MagicMock() mock_config.flight_blender.url = "http://test-flight-blender:8080" - mock_config.flight_blender.auth.username = "test-user" - mock_config.flight_blender.auth.password = "test-pass" + mock_config.flight_blender.auth.audience = "test-audience" + mock_config.flight_blender.auth.scopes = [] mock_get_settings.return_value = mock_config + # Mock auth provider + mock_auth = MagicMock() + mock_auth.get_cached_credentials.return_value = {"token": "test-token"} + mock_get_auth.return_value = mock_auth + mock_fb_client = AsyncMock() mock_fb_client.submit_simulated_air_traffic = AsyncMock( return_value=_wrap_step_result({"success": True, "observations_submitted": 1}, "Submit Simulated Air Traffic") @@ -540,8 +546,7 @@ async def test_flight_blender_streamer_submits_to_client(self, mock_get_settings mock_fb_class.assert_called_once() call_kwargs = mock_fb_class.call_args[1] assert call_kwargs["base_url"] == "http://test-flight-blender:8080" - assert call_kwargs["credentials"]["username"] == "test-user" - assert call_kwargs["credentials"]["password"] == "test-pass" + assert call_kwargs["credentials"] == {"token": "test-token"} # Verify submit was called with observations mock_fb_client.submit_simulated_air_traffic.assert_called_once() @@ -573,8 +578,9 @@ async def test_flight_blender_streamer_handles_empty_observations(self): @pytest.mark.asyncio @patch("openutm_verification.core.streamers.flight_blender_streamer.FlightBlenderClient") + @patch("openutm_verification.core.streamers.flight_blender_streamer.get_auth_provider") @patch("openutm_verification.core.streamers.flight_blender_streamer.get_settings") - async def test_flight_blender_streamer_handles_client_error(self, mock_get_settings, mock_fb_class): + async def test_flight_blender_streamer_handles_client_error(self, mock_get_settings, mock_get_auth, mock_fb_class): """Test that FlightBlenderStreamer handles client errors gracefully.""" from openutm_verification.core.streamers.flight_blender_streamer import FlightBlenderStreamer @@ -582,10 +588,14 @@ async def test_flight_blender_streamer_handles_client_error(self, mock_get_setti mock_config = MagicMock() mock_config.flight_blender.url = "http://test:8080" - mock_config.flight_blender.auth.username = "user" - mock_config.flight_blender.auth.password = "pass" + mock_config.flight_blender.auth.audience = "test-audience" + mock_config.flight_blender.auth.scopes = [] mock_get_settings.return_value = mock_config + mock_auth = MagicMock() + mock_auth.get_cached_credentials.return_value = {"token": "test-token"} + mock_get_auth.return_value = mock_auth + mock_fb_client = AsyncMock() mock_fb_client.submit_simulated_air_traffic = AsyncMock(side_effect=Exception("Connection refused")) mock_fb_class.return_value.__aenter__ = AsyncMock(return_value=mock_fb_client) From e3a69b88e9fe45ae048a48a2fd9dfd3e07327497 Mon Sep 17 00:00:00 2001 From: Attila Kobor Date: Sat, 28 Feb 2026 14:39:11 +0100 Subject: [PATCH 3/8] review --- scenarios/opensky_live_data.yaml | 3 +- scenarios/verify_sdsp_metrics.yaml | 2 + .../clients/air_traffic/air_traffic_client.py | 2 +- .../bayesian_air_traffic_client.py | 2 +- .../core/clients/amqp/amqp_client.py | 4 +- .../core/providers/latency.py | 5 +- .../core/providers/opensky_provider.py | 2 +- .../core/steps/air_traffic_step.py | 3 +- .../core/streamers/flight_blender_streamer.py | 64 ++------------- tests/test_stream_air_traffic.py | 81 ++++++++++++++++++- 10 files changed, 99 insertions(+), 69 deletions(-) diff --git a/scenarios/opensky_live_data.yaml b/scenarios/opensky_live_data.yaml index 6726ecb..ed4fd70 100644 --- a/scenarios/opensky_live_data.yaml +++ b/scenarios/opensky_live_data.yaml @@ -6,5 +6,6 @@ steps: id: stream_opensky arguments: provider: opensky - duration: 3 target: flight_blender + loop: + count: 5 diff --git a/scenarios/verify_sdsp_metrics.yaml b/scenarios/verify_sdsp_metrics.yaml index 5eabd2d..404ef55 100644 --- a/scenarios/verify_sdsp_metrics.yaml +++ b/scenarios/verify_sdsp_metrics.yaml @@ -25,6 +25,8 @@ steps: arguments: duration: 10 - step: Verify Reported Metrics in Flight Blender + needs: + - stream_air_traffic arguments: observations: ${{ steps.stream_air_traffic.result.observations }} session_id: ${{ steps.generated_sdsp_session_id.result }} diff --git a/src/openutm_verification/core/clients/air_traffic/air_traffic_client.py b/src/openutm_verification/core/clients/air_traffic/air_traffic_client.py index 73a68b5..abf1bfd 100644 --- a/src/openutm_verification/core/clients/air_traffic/air_traffic_client.py +++ b/src/openutm_verification/core/clients/air_traffic/air_traffic_client.py @@ -112,7 +112,7 @@ async def generate_simulated_air_traffic_data_with_latency( duration: int | None = None, ) -> list[list[FlightObservationSchema]]: """This method, simulates a adding latency to the flight observations list""" - flight_observations = self.generate_simulated_air_traffic_data(config_path=config_path, duration=duration) + flight_observations = await self.generate_simulated_air_traffic_data(config_path=config_path, duration=duration).result LATENCY_PROBABILITY = 0.1 # 10% chance to have latency issues TIMESTAMP_SHIFT_RANGE_SECONDS = (-1, 2.5) # Shift timestamps by -5 to +5 seconds diff --git a/src/openutm_verification/core/clients/air_traffic/bayesian_air_traffic_client.py b/src/openutm_verification/core/clients/air_traffic/bayesian_air_traffic_client.py index a9af8ff..0849370 100644 --- a/src/openutm_verification/core/clients/air_traffic/bayesian_air_traffic_client.py +++ b/src/openutm_verification/core/clients/air_traffic/bayesian_air_traffic_client.py @@ -219,7 +219,7 @@ async def generate_bayesian_sim_air_traffic_data_with_sensor_latency_issues( Latency is simulated by randomly removing some observations and randomly shifting the timestamps of some observations to be earlier or later than the actual timestamp, mimicking real-world sensor latency issues. """ - flight_observations = self.generate_bayesian_sim_air_traffic_data(config_path=config_path, duration=duration) + flight_observations = await self.generate_bayesian_sim_air_traffic_data(config_path=config_path, duration=duration).result LATENCY_PROBABILITY = 0.1 # 10% chance to have latency issues TIMESTAMP_SHIFT_RANGE_SECONDS = (-1, 2.5) # Shift timestamps by -5 to +5 seconds diff --git a/src/openutm_verification/core/clients/amqp/amqp_client.py b/src/openutm_verification/core/clients/amqp/amqp_client.py index 8aa12e4..20d93d4 100644 --- a/src/openutm_verification/core/clients/amqp/amqp_client.py +++ b/src/openutm_verification/core/clients/amqp/amqp_client.py @@ -373,7 +373,7 @@ async def wait_for_messages( start_time = time.time() while (time.time() - start_time) < timeout: - messages = await self.get_received_messages(routing_key_filter=routing_key_filter) + messages = await self.get_received_messages(routing_key_filter=routing_key_filter).result if len(messages) >= count: return { "success": True, @@ -384,7 +384,7 @@ async def wait_for_messages( await asyncio.sleep(0.5) # Timeout reached - messages = await self.get_received_messages(routing_key_filter=routing_key_filter) + messages = await self.get_received_messages(routing_key_filter=routing_key_filter).result return { "success": False, "message_count": len(messages), diff --git a/src/openutm_verification/core/providers/latency.py b/src/openutm_verification/core/providers/latency.py index 892b988..e25506e 100644 --- a/src/openutm_verification/core/providers/latency.py +++ b/src/openutm_verification/core/providers/latency.py @@ -60,9 +60,10 @@ def apply_latency( # Drop the observation entirely total_dropped += 1 continue - # Shift the timestamp + # Shift the timestamp (in seconds, matching the timestamp unit) shift_seconds = random.uniform(*timestamp_shift_range) - obs.timestamp += int(shift_seconds * 1000) + new_timestamp = obs.timestamp + int(shift_seconds) + obs = obs.model_copy(update={"timestamp": new_timestamp}) total_shifted += 1 modified_track.append(obs) modified_observations.append(modified_track) diff --git a/src/openutm_verification/core/providers/opensky_provider.py b/src/openutm_verification/core/providers/opensky_provider.py index 64ba48d..16a98f7 100644 --- a/src/openutm_verification/core/providers/opensky_provider.py +++ b/src/openutm_verification/core/providers/opensky_provider.py @@ -80,7 +80,7 @@ async def get_observations( ) async with OpenSkyClient(settings) as client: - observations = await client.fetch_data() + observations = (await client.fetch_data()).result if observations is None: return [] # Wrap flat list in outer list for interface consistency diff --git a/src/openutm_verification/core/steps/air_traffic_step.py b/src/openutm_verification/core/steps/air_traffic_step.py index 5004e97..176dfa0 100644 --- a/src/openutm_verification/core/steps/air_traffic_step.py +++ b/src/openutm_verification/core/steps/air_traffic_step.py @@ -90,7 +90,8 @@ def _apply_config_defaults( app_config = get_settings() except Exception: logger.debug("Could not read application config for defaults, using step arguments only.") - return (duration or 30, config_path, number_of_aircraft, sensor_ids, session_ids) + resolved_duration = duration if duration is not None else 30 + return (resolved_duration, config_path, number_of_aircraft, sensor_ids, session_ids) try: if provider == "geojson": diff --git a/src/openutm_verification/core/streamers/flight_blender_streamer.py b/src/openutm_verification/core/streamers/flight_blender_streamer.py index 49e3f43..1a4a20f 100644 --- a/src/openutm_verification/core/streamers/flight_blender_streamer.py +++ b/src/openutm_verification/core/streamers/flight_blender_streamer.py @@ -6,7 +6,6 @@ from __future__ import annotations -import random import uuid from typing import TYPE_CHECKING, Literal @@ -99,53 +98,6 @@ def _make_result( observations=observations if observations is not None else None, ) - @staticmethod - def _corrupt_timestamps( - observations: list[list], - ) -> list[list]: - """Apply timestamp corruption to simulate malfunctioning sensors. - - Randomly applies anomalies to observation timestamps: - - 30% chance: stale timestamp (repeat previous) - - 20% chance: backward jump (10-60s into the past) - - 15% chance: forward jump (10-60s into the future) - - 35% chance: keep original (normal) - - Args: - observations: List of observation lists per aircraft. - - Returns: - New observation lists with corrupted timestamps. - """ - corrupted_observations = [] - for aircraft_obs in observations: - corrupted_aircraft_obs = [] - last_used_timestamp: int | None = None - for obs in aircraft_obs: - original_timestamp = obs.timestamp - anomaly_roll = random.random() - - if anomaly_roll < 0.3 and last_used_timestamp is not None: - new_timestamp = last_used_timestamp - logger.debug(f"[off-nominal] Stale timestamp for {obs.icao_address}: kept {new_timestamp} instead of {original_timestamp}") - elif anomaly_roll < 0.5: - offset = random.randint(10, 60) - new_timestamp = original_timestamp - offset - logger.debug(f"[off-nominal] Backward jump for {obs.icao_address}: {original_timestamp} -> {new_timestamp} (-{offset}s)") - elif anomaly_roll < 0.65: - offset = random.randint(10, 60) - new_timestamp = original_timestamp + offset - logger.debug(f"[off-nominal] Forward jump for {obs.icao_address}: {original_timestamp} -> {new_timestamp} (+{offset}s)") - else: - new_timestamp = original_timestamp - - corrupted_obs = obs.model_copy(update={"timestamp": new_timestamp}) - corrupted_aircraft_obs.append(corrupted_obs) - last_used_timestamp = new_timestamp - - corrupted_observations.append(corrupted_aircraft_obs) - return corrupted_observations - async def stream_from_provider( self, provider: "AirTrafficProvider", @@ -155,8 +107,8 @@ async def stream_from_provider( Gets observations from the provider, then submits them to Flight Blender. In "normal" mode, submits using standard real-time playback. - In "varying" mode, corrupts timestamps before submission to simulate - malfunctioning sensors. + In "varying" mode, uses the client's varying-refresh submission method + which applies its own timestamp anomalies to simulate malfunctioning sensors. Args: provider: The air traffic provider to get observations from. @@ -195,25 +147,21 @@ async def stream_from_provider( scopes=config.flight_blender.auth.scopes, ) - # Apply timestamp corruption for varying refresh mode - submit_observations = observations - if self._refresh_mode == "varying": - logger.info("Applying timestamp corruption for varying refresh rate submission") - submit_observations = self._corrupt_timestamps(observations) - try: async with FlightBlenderClient( base_url=config.flight_blender.url, credentials=credentials, ) as client: - # Choose submission method based on refresh mode + # Choose submission method based on refresh mode. + # The varying-refresh client method applies its own timestamp + # anomalies, so we always pass the original observations. if self._refresh_mode == "varying": submit_fn = client.submit_simulated_air_traffic_at_random_refresh_rates else: submit_fn = client.submit_simulated_air_traffic step_result = await submit_fn( - observations=submit_observations, + observations=observations, session_ids=self._session_ids, ) diff --git a/tests/test_stream_air_traffic.py b/tests/test_stream_air_traffic.py index a5e1724..9009372 100644 --- a/tests/test_stream_air_traffic.py +++ b/tests/test_stream_air_traffic.py @@ -49,6 +49,30 @@ def test_create_unknown_provider_raises(self): with pytest.raises(ValueError, match="Unknown provider"): create_provider(name="unknown") # type: ignore + def test_create_provider_with_latency_wraps_in_latency_provider(self): + """Test that data_quality='latency' wraps the provider with LatencyProviderWrapper.""" + from openutm_verification.core.providers.latency import LatencyProviderWrapper + + provider = create_provider( + name="geojson", + config_path="/some/path.geojson", + data_quality="latency", + ) + assert isinstance(provider, LatencyProviderWrapper) + assert provider.name == "geojson" + + def test_create_provider_nominal_does_not_wrap(self): + """Test that data_quality='nominal' returns unwrapped provider.""" + from openutm_verification.core.providers.latency import LatencyProviderWrapper + + provider = create_provider( + name="geojson", + config_path="/some/path.geojson", + data_quality="nominal", + ) + assert isinstance(provider, GeoJSONProvider) + assert not isinstance(provider, LatencyProviderWrapper) + class TestStreamerFactory: """Tests for the streamer factory.""" @@ -100,6 +124,59 @@ def test_stream_result_with_errors(self): assert len(result.errors) == 2 +class TestLatencySimulation: + """Tests for the apply_latency function and LatencyProviderWrapper.""" + + def test_apply_latency_does_not_mutate_originals(self): + """Test that apply_latency creates copies instead of mutating original observations.""" + from openutm_verification.core.providers.latency import apply_latency + + original_obs = FlightObservationSchema( + lat_dd=46.9, + lon_dd=7.4, + altitude_mm=1000000, + traffic_source=0, + source_type=0, + icao_address="TEST1", + timestamp=1000000, + ) + observations = [[original_obs]] + + # Force all observations to be shifted (100% probability, always shift) + apply_latency(observations, latency_probability=1.0, timestamp_shift_range=(1.0, 1.0)) + + # Original should be unchanged + assert original_obs.timestamp == 1000000 + + def test_apply_latency_shifts_in_seconds_not_milliseconds(self): + """Test that timestamp shifts are applied in seconds, not milliseconds.""" + import random + + from openutm_verification.core.providers.latency import apply_latency + + random.seed(42) # Make test deterministic + obs = FlightObservationSchema( + lat_dd=46.9, + lon_dd=7.4, + altitude_mm=1000000, + traffic_source=0, + source_type=0, + icao_address="TEST1", + timestamp=1000000, + ) + observations = [[obs]] + + # Use a fixed shift of exactly 2 seconds + result = apply_latency(observations, latency_probability=1.0, timestamp_shift_range=(2.0, 2.0)) + + # If any observations remain (not dropped), check the shift magnitude + for track in result: + for r_obs in track: + # Shift should be 2 seconds, not 2000 milliseconds + shift = abs(r_obs.timestamp - 1000000) + assert shift <= 3, f"Timestamp shift {shift} is too large, likely milliseconds instead of seconds" + + class TestAirTrafficStepClient: """Tests for the AirTrafficStepClient.""" @@ -458,7 +535,7 @@ async def test_opensky_provider_instantiates_client_with_correct_viewport(self, mock_get_settings.return_value = mock_config mock_client_instance = AsyncMock() - mock_client_instance.fetch_data = AsyncMock(return_value=mock_observations) + mock_client_instance.fetch_data = AsyncMock(return_value=_wrap_step_result(mock_observations, "Fetch OpenSky Data")) mock_client_class.return_value.__aenter__ = AsyncMock(return_value=mock_client_instance) mock_client_class.return_value.__aexit__ = AsyncMock(return_value=None) @@ -492,7 +569,7 @@ async def test_opensky_provider_returns_empty_list_when_no_data(self, mock_get_s mock_get_settings.return_value = mock_config mock_client_instance = AsyncMock() - mock_client_instance.fetch_data = AsyncMock(return_value=None) + mock_client_instance.fetch_data = AsyncMock(return_value=_wrap_step_result(None, "Fetch OpenSky Data")) mock_client_class.return_value.__aenter__ = AsyncMock(return_value=mock_client_instance) mock_client_class.return_value.__aexit__ = AsyncMock(return_value=None) From ff658620b05de8dc9a495ae6de9e5a07e8e208a6 Mon Sep 17 00:00:00 2001 From: Attila Kobor Date: Sat, 28 Feb 2026 15:13:41 +0100 Subject: [PATCH 4/8] address review comments --- .../clients/air_traffic/air_traffic_client.py | 3 +- .../bayesian_air_traffic_client.py | 3 +- .../core/clients/amqp/amqp_client.py | 6 +- .../core/execution/config_models.py | 11 ++ .../core/providers/__init__.py | 4 +- .../core/providers/factory.py | 13 +- .../core/providers/latency.py | 112 ++++++++++++++---- .../core/steps/air_traffic_step.py | 2 +- .../core/streamers/flight_blender_streamer.py | 22 +++- tests/test_stream_air_traffic.py | 37 +++--- 10 files changed, 158 insertions(+), 55 deletions(-) diff --git a/src/openutm_verification/core/clients/air_traffic/air_traffic_client.py b/src/openutm_verification/core/clients/air_traffic/air_traffic_client.py index abf1bfd..d85d836 100644 --- a/src/openutm_verification/core/clients/air_traffic/air_traffic_client.py +++ b/src/openutm_verification/core/clients/air_traffic/air_traffic_client.py @@ -112,7 +112,8 @@ async def generate_simulated_air_traffic_data_with_latency( duration: int | None = None, ) -> list[list[FlightObservationSchema]]: """This method, simulates a adding latency to the flight observations list""" - flight_observations = await self.generate_simulated_air_traffic_data(config_path=config_path, duration=duration).result + step_result = await self.generate_simulated_air_traffic_data(config_path=config_path, duration=duration) + flight_observations = step_result.result LATENCY_PROBABILITY = 0.1 # 10% chance to have latency issues TIMESTAMP_SHIFT_RANGE_SECONDS = (-1, 2.5) # Shift timestamps by -5 to +5 seconds diff --git a/src/openutm_verification/core/clients/air_traffic/bayesian_air_traffic_client.py b/src/openutm_verification/core/clients/air_traffic/bayesian_air_traffic_client.py index 0849370..22daead 100644 --- a/src/openutm_verification/core/clients/air_traffic/bayesian_air_traffic_client.py +++ b/src/openutm_verification/core/clients/air_traffic/bayesian_air_traffic_client.py @@ -219,7 +219,8 @@ async def generate_bayesian_sim_air_traffic_data_with_sensor_latency_issues( Latency is simulated by randomly removing some observations and randomly shifting the timestamps of some observations to be earlier or later than the actual timestamp, mimicking real-world sensor latency issues. """ - flight_observations = await self.generate_bayesian_sim_air_traffic_data(config_path=config_path, duration=duration).result + step_result = await self.generate_bayesian_sim_air_traffic_data(config_path=config_path, duration=duration) + flight_observations = step_result.result LATENCY_PROBABILITY = 0.1 # 10% chance to have latency issues TIMESTAMP_SHIFT_RANGE_SECONDS = (-1, 2.5) # Shift timestamps by -5 to +5 seconds diff --git a/src/openutm_verification/core/clients/amqp/amqp_client.py b/src/openutm_verification/core/clients/amqp/amqp_client.py index 20d93d4..6972682 100644 --- a/src/openutm_verification/core/clients/amqp/amqp_client.py +++ b/src/openutm_verification/core/clients/amqp/amqp_client.py @@ -373,7 +373,8 @@ async def wait_for_messages( start_time = time.time() while (time.time() - start_time) < timeout: - messages = await self.get_received_messages(routing_key_filter=routing_key_filter).result + step_result = await self.get_received_messages(routing_key_filter=routing_key_filter) + messages = step_result.result if len(messages) >= count: return { "success": True, @@ -384,7 +385,8 @@ async def wait_for_messages( await asyncio.sleep(0.5) # Timeout reached - messages = await self.get_received_messages(routing_key_filter=routing_key_filter).result + step_result = await self.get_received_messages(routing_key_filter=routing_key_filter) + messages = step_result.result return { "success": False, "message_count": len(messages), diff --git a/src/openutm_verification/core/execution/config_models.py b/src/openutm_verification/core/execution/config_models.py index cdbe047..87d1707 100644 --- a/src/openutm_verification/core/execution/config_models.py +++ b/src/openutm_verification/core/execution/config_models.py @@ -47,6 +47,17 @@ class AirTrafficSimulatorSettings(StrictBaseModel): def validate_duration(cls, v: int | str) -> int: return int(parse_duration(v)) + @property + def simulation_duration_seconds(self) -> int: + """Standardized accessor — returns duration in seconds. + + The ``simulation_duration`` field already validates to an ``int`` + of seconds via :meth:`validate_duration`. This property provides + the same name used by the BlueSky / Bayesian config models so + callers can access duration uniformly. + """ + return int(self.simulation_duration) + class BlueSkyAirTrafficSimulatorSettings(StrictBaseModel): number_of_aircraft: int diff --git a/src/openutm_verification/core/providers/__init__.py b/src/openutm_verification/core/providers/__init__.py index f05b2b9..ae16b21 100644 --- a/src/openutm_verification/core/providers/__init__.py +++ b/src/openutm_verification/core/providers/__init__.py @@ -4,7 +4,7 @@ """ from .factory import ProviderType, create_provider -from .latency import DataQualityType +from .latency import DataQualityType, drop_observations, shift_timestamps from .opensky_provider import DEFAULT_SWITZERLAND_VIEWPORT from .protocol import AirTrafficProvider @@ -14,4 +14,6 @@ "DataQualityType", "ProviderType", "create_provider", + "drop_observations", + "shift_timestamps", ] diff --git a/src/openutm_verification/core/providers/factory.py b/src/openutm_verification/core/providers/factory.py index a05c127..2a4f445 100644 --- a/src/openutm_verification/core/providers/factory.py +++ b/src/openutm_verification/core/providers/factory.py @@ -11,6 +11,13 @@ ProviderType = Literal["geojson", "bluesky", "bayesian", "opensky"] +# Registry mapping data quality types to their wrapper classes. +# Add new entries here to support additional quality degradation modes +# without modifying the create_provider function. +_QUALITY_WRAPPERS: dict[DataQualityType, type] = { + DataQualityType.LATENCY: LatencyProviderWrapper, +} + def create_provider( name: ProviderType, @@ -63,7 +70,9 @@ def create_provider( **kwargs, ) - if data_quality == "latency": - return LatencyProviderWrapper(provider) + # Apply quality wrapper if registered for this quality type + wrapper_cls = _QUALITY_WRAPPERS.get(DataQualityType(data_quality)) + if wrapper_cls: + return wrapper_cls(provider) return provider diff --git a/src/openutm_verification/core/providers/latency.py b/src/openutm_verification/core/providers/latency.py index e25506e..2543669 100644 --- a/src/openutm_verification/core/providers/latency.py +++ b/src/openutm_verification/core/providers/latency.py @@ -11,7 +11,8 @@ from __future__ import annotations import random -from typing import TYPE_CHECKING, Literal +from enum import StrEnum +from typing import TYPE_CHECKING from loguru import logger @@ -25,7 +26,76 @@ LATENCY_PROBABILITY = 0.1 # 10% chance per observation TIMESTAMP_SHIFT_RANGE_SECONDS = (-1, 2.5) # Shift range in seconds -DataQualityType = Literal["nominal", "latency"] + +class DataQualityType(StrEnum): + """Data quality modes for air traffic observations. + + Each quality type applies independent degradation effects to observation data. + StrEnum allows direct comparison with string values (e.g., ``quality == "latency"``). + """ + + NOMINAL = "nominal" + LATENCY = "latency" + + +def drop_observations( + observations: list[list[FlightObservationSchema]], + probability: float = LATENCY_PROBABILITY, +) -> tuple[list[list[FlightObservationSchema]], int]: + """Randomly drop observations to simulate missed sensor readings. + + Each observation has an independent probability of being removed. + + Args: + observations: List of observation lists per aircraft (tracks). + probability: Probability each observation is dropped (0.0-1.0). + + Returns: + Tuple of (modified observation lists, total observations dropped). + """ + modified = [] + total_dropped = 0 + for track in observations: + kept = [] + for obs in track: + if random.random() < probability: + total_dropped += 1 + else: + kept.append(obs) + modified.append(kept) + return modified, total_dropped + + +def shift_timestamps( + observations: list[list[FlightObservationSchema]], + probability: float = LATENCY_PROBABILITY, + shift_range: tuple[float, float] = TIMESTAMP_SHIFT_RANGE_SECONDS, +) -> tuple[list[list[FlightObservationSchema]], int]: + """Randomly shift observation timestamps to simulate delayed sensor data. + + Each observation has an independent probability of having its timestamp + shifted by a random amount within the given range (in seconds). + + Args: + observations: List of observation lists per aircraft (tracks). + probability: Probability each observation is shifted (0.0-1.0). + shift_range: Range (min, max) for timestamp shifts in seconds. + + Returns: + Tuple of (modified observation lists, total observations shifted). + """ + modified = [] + total_shifted = 0 + for track in observations: + new_track = [] + for obs in track: + if random.random() < probability: + shift_seconds = random.uniform(*shift_range) + obs = obs.model_copy(update={"timestamp": obs.timestamp + int(shift_seconds)}) + total_shifted += 1 + new_track.append(obs) + modified.append(new_track) + return modified, total_shifted def apply_latency( @@ -36,40 +106,30 @@ def apply_latency( ) -> list[list[FlightObservationSchema]]: """Apply simulated sensor latency effects to observations. - For each observation, there is a configurable probability of being affected: - - 50% chance: drop the observation entirely (simulating missed readings) - - 50% chance: shift the timestamp (simulating delayed sensor data) + Composes independent quality degradation effects: + 1. Random observation drops (simulating missed readings) + 2. Random timestamp shifts (simulating delayed sensor data) + + Each effect is applied independently, making it straightforward to add + new quality degradation types in the future. Args: - observations: List of observation lists per aircraft. - latency_probability: Probability each observation is affected (0.0-1.0). + observations: List of observation lists per aircraft (tracks). + latency_probability: Base probability for each effect (0.0-1.0). + Split equally between drops and shifts to maintain overall 50/50 ratio. timestamp_shift_range: Range (min, max) for timestamp shifts in seconds. Returns: Modified observation lists with latency effects applied. """ - modified_observations = [] - total_dropped = 0 - total_shifted = 0 + drop_prob = latency_probability / 2 + shift_prob = latency_probability / 2 - for track_observations in observations: - modified_track = [] - for obs in track_observations: - if random.random() < latency_probability: - if random.random() < 0.5: - # Drop the observation entirely - total_dropped += 1 - continue - # Shift the timestamp (in seconds, matching the timestamp unit) - shift_seconds = random.uniform(*timestamp_shift_range) - new_timestamp = obs.timestamp + int(shift_seconds) - obs = obs.model_copy(update={"timestamp": new_timestamp}) - total_shifted += 1 - modified_track.append(obs) - modified_observations.append(modified_track) + result, total_dropped = drop_observations(observations, probability=drop_prob) + result, total_shifted = shift_timestamps(result, probability=shift_prob, shift_range=timestamp_shift_range) logger.info(f"Latency simulation applied: {total_dropped} observations dropped, {total_shifted} timestamps shifted") - return modified_observations + return result class LatencyProviderWrapper: diff --git a/src/openutm_verification/core/steps/air_traffic_step.py b/src/openutm_verification/core/steps/air_traffic_step.py index 176dfa0..be401d0 100644 --- a/src/openutm_verification/core/steps/air_traffic_step.py +++ b/src/openutm_verification/core/steps/air_traffic_step.py @@ -97,7 +97,7 @@ def _apply_config_defaults( if provider == "geojson": sim = app_config.air_traffic_simulator_settings if duration is None: - duration = sim.simulation_duration if hasattr(sim, "simulation_duration") else 30 + duration = sim.simulation_duration_seconds if number_of_aircraft is None: number_of_aircraft = sim.number_of_aircraft if sensor_ids is None and sim.sensor_ids: diff --git a/src/openutm_verification/core/streamers/flight_blender_streamer.py b/src/openutm_verification/core/streamers/flight_blender_streamer.py index 1a4a20f..ae98ee5 100644 --- a/src/openutm_verification/core/streamers/flight_blender_streamer.py +++ b/src/openutm_verification/core/streamers/flight_blender_streamer.py @@ -7,7 +7,8 @@ from __future__ import annotations import uuid -from typing import TYPE_CHECKING, Literal +from enum import StrEnum +from typing import TYPE_CHECKING from loguru import logger @@ -23,7 +24,16 @@ if TYPE_CHECKING: from openutm_verification.core.providers.protocol import AirTrafficProvider -RefreshModeType = Literal["normal", "varying"] + +class RefreshModeType(StrEnum): + """Submission mode for Flight Blender streaming. + + Controls how observations are submitted to the Flight Blender API. + StrEnum allows direct comparison with string values. + """ + + NORMAL = "normal" + VARYING = "varying" class FlightBlenderStreamer: @@ -188,4 +198,10 @@ async def stream_from_provider( except Exception as e: logger.exception(f"Flight Blender streaming failed: {e}") - raise + return self._make_result( + success=False, + provider_name=provider.name, + duration_seconds=duration_seconds, + errors=[str(e)], + observations=observations, + ) diff --git a/tests/test_stream_air_traffic.py b/tests/test_stream_air_traffic.py index 9009372..041acf8 100644 --- a/tests/test_stream_air_traffic.py +++ b/tests/test_stream_air_traffic.py @@ -6,6 +6,7 @@ from openutm_verification.core.providers import ProviderType, create_provider from openutm_verification.core.providers.geojson_provider import GeoJSONProvider +from openutm_verification.core.providers.latency import shift_timestamps from openutm_verification.core.providers.opensky_provider import OpenSkyProvider from openutm_verification.core.reporting.reporting_models import Status, StepResult from openutm_verification.core.steps import AirTrafficStepClient @@ -129,7 +130,6 @@ class TestLatencySimulation: def test_apply_latency_does_not_mutate_originals(self): """Test that apply_latency creates copies instead of mutating original observations.""" - from openutm_verification.core.providers.latency import apply_latency original_obs = FlightObservationSchema( lat_dd=46.9, @@ -142,19 +142,14 @@ def test_apply_latency_does_not_mutate_originals(self): ) observations = [[original_obs]] - # Force all observations to be shifted (100% probability, always shift) - apply_latency(observations, latency_probability=1.0, timestamp_shift_range=(1.0, 1.0)) + # Force all observations to be shifted (100% probability, fixed shift) + shift_timestamps(observations, probability=1.0, shift_range=(1.0, 1.0)) # Original should be unchanged assert original_obs.timestamp == 1000000 def test_apply_latency_shifts_in_seconds_not_milliseconds(self): """Test that timestamp shifts are applied in seconds, not milliseconds.""" - import random - - from openutm_verification.core.providers.latency import apply_latency - - random.seed(42) # Make test deterministic obs = FlightObservationSchema( lat_dd=46.9, lon_dd=7.4, @@ -166,15 +161,17 @@ def test_apply_latency_shifts_in_seconds_not_milliseconds(self): ) observations = [[obs]] - # Use a fixed shift of exactly 2 seconds - result = apply_latency(observations, latency_probability=1.0, timestamp_shift_range=(2.0, 2.0)) + # Use 100% probability and fixed shift of exactly 2 seconds. + # Calling shift_timestamps directly guarantees the shift path + # is always taken (no random drop/shift branching). + result, total_shifted = shift_timestamps(observations, probability=1.0, shift_range=(2.0, 2.0)) - # If any observations remain (not dropped), check the shift magnitude - for track in result: - for r_obs in track: - # Shift should be 2 seconds, not 2000 milliseconds - shift = abs(r_obs.timestamp - 1000000) - assert shift <= 3, f"Timestamp shift {shift} is too large, likely milliseconds instead of seconds" + assert total_shifted == 1 + assert len(result[0]) == 1 + shifted_obs = result[0][0] + # Shift should be 2 seconds, not 2000 milliseconds + shift = abs(shifted_obs.timestamp - 1000000) + assert shift == 2, f"Timestamp shift {shift} is not 2 seconds" class TestAirTrafficStepClient: @@ -683,8 +680,12 @@ async def test_flight_blender_streamer_handles_client_error(self, mock_get_setti mock_provider.get_observations = AsyncMock(return_value=mock_observations) streamer = FlightBlenderStreamer() - with pytest.raises(Exception, match="Connection refused"): - await streamer.stream_from_provider(mock_provider, duration_seconds=30) + result = await streamer.stream_from_provider(mock_provider, duration_seconds=30) + + # Protocol-compliant: returns StreamResult with success=False instead of raising + assert result.success is False + assert len(result.errors) == 1 + assert "Connection refused" in result.errors[0] class TestNullStreamerIntegration: From db3b34706c5b6504f3adcf53a14bcb2fc6d02810 Mon Sep 17 00:00:00 2001 From: Attila Kobor Date: Sat, 28 Feb 2026 15:24:06 +0100 Subject: [PATCH 5/8] StrEnums --- .../core/providers/factory.py | 24 ++++++++++++------- .../core/steps/air_traffic_step.py | 14 +++++------ .../core/streamers/factory.py | 18 ++++++++------ tests/test_stream_air_traffic.py | 8 ++----- 4 files changed, 35 insertions(+), 29 deletions(-) diff --git a/src/openutm_verification/core/providers/factory.py b/src/openutm_verification/core/providers/factory.py index 2a4f445..6125efc 100644 --- a/src/openutm_verification/core/providers/factory.py +++ b/src/openutm_verification/core/providers/factory.py @@ -1,6 +1,6 @@ """Factory for creating air traffic providers.""" -from typing import Literal +from enum import StrEnum from .bayesian_provider import BayesianProvider from .bluesky_provider import BlueSkyProvider @@ -9,7 +9,13 @@ from .opensky_provider import OpenSkyProvider from .protocol import AirTrafficProvider -ProviderType = Literal["geojson", "bluesky", "bayesian", "opensky"] + +class ProviderType(StrEnum): + GEOJSON = "geojson" + BLUESKY = "bluesky" + BAYESIAN = "bayesian" + OPENSKY = "opensky" + # Registry mapping data quality types to their wrapper classes. # Add new entries here to support additional quality degradation modes @@ -28,7 +34,7 @@ def create_provider( sensor_ids: list[str] | None = None, session_ids: list[str] | None = None, viewport: tuple[float, float, float, float] | None = None, - data_quality: DataQualityType = "nominal", + data_quality: DataQualityType = DataQualityType.NOMINAL, **kwargs, ) -> AirTrafficProvider: """Factory function to create providers by name. @@ -50,11 +56,11 @@ def create_provider( Raises: ValueError: If the provider name is not recognized. """ - providers: dict[str, type] = { - "geojson": GeoJSONProvider, - "bluesky": BlueSkyProvider, - "bayesian": BayesianProvider, - "opensky": OpenSkyProvider, + providers: dict[ProviderType, type] = { + ProviderType.GEOJSON: GeoJSONProvider, + ProviderType.BLUESKY: BlueSkyProvider, + ProviderType.BAYESIAN: BayesianProvider, + ProviderType.OPENSKY: OpenSkyProvider, } if name not in providers: @@ -71,7 +77,7 @@ def create_provider( ) # Apply quality wrapper if registered for this quality type - wrapper_cls = _QUALITY_WRAPPERS.get(DataQualityType(data_quality)) + wrapper_cls = _QUALITY_WRAPPERS.get(data_quality) if wrapper_cls: return wrapper_cls(provider) diff --git a/src/openutm_verification/core/steps/air_traffic_step.py b/src/openutm_verification/core/steps/air_traffic_step.py index be401d0..29c475c 100644 --- a/src/openutm_verification/core/steps/air_traffic_step.py +++ b/src/openutm_verification/core/steps/air_traffic_step.py @@ -94,7 +94,7 @@ def _apply_config_defaults( return (resolved_duration, config_path, number_of_aircraft, sensor_ids, session_ids) try: - if provider == "geojson": + if provider == ProviderType.GEOJSON: sim = app_config.air_traffic_simulator_settings if duration is None: duration = sim.simulation_duration_seconds @@ -107,7 +107,7 @@ def _apply_config_defaults( if config_path is None: config_path = _get_data_file_path("trajectory") - elif provider == "bluesky": + elif provider == ProviderType.BLUESKY: sim = app_config.blue_sky_air_traffic_simulator_settings if duration is None: duration = sim.simulation_duration_seconds @@ -120,7 +120,7 @@ def _apply_config_defaults( if config_path is None: config_path = _get_data_file_path("simulation") - elif provider == "bayesian": + elif provider == ProviderType.BAYESIAN: sim = app_config.bayesian_air_traffic_simulator_settings if duration is None: duration = sim.simulation_duration_seconds @@ -131,7 +131,7 @@ def _apply_config_defaults( if session_ids is None and sim.session_ids: session_ids = sim.session_ids - elif provider == "opensky": + elif provider == ProviderType.OPENSKY: pass # OpenSky reads its own config in the provider except Exception: logger.debug("Could not read application config for defaults, using step arguments only.") @@ -147,7 +147,7 @@ async def stream_air_traffic( self, provider: ProviderType, duration: int | None = None, - target: TargetType = "flight_blender", + target: TargetType = TargetType.FLIGHT_BLENDER, *, # Provider settings (optional overrides — defaults read from config) config_path: str | None = None, @@ -156,9 +156,9 @@ async def stream_air_traffic( session_ids: list[str] | None = None, viewport: tuple[float, float, float, float] | None = None, # Data quality mode - data_quality: DataQualityType = "nominal", + data_quality: DataQualityType = DataQualityType.NOMINAL, # Streamer settings - refresh_mode: RefreshModeType = "normal", + refresh_mode: RefreshModeType = RefreshModeType.NORMAL, ) -> StreamResult: """Stream air traffic data from a provider to a target system. diff --git a/src/openutm_verification/core/streamers/factory.py b/src/openutm_verification/core/streamers/factory.py index d0a98f3..5d06f47 100644 --- a/src/openutm_verification/core/streamers/factory.py +++ b/src/openutm_verification/core/streamers/factory.py @@ -1,20 +1,24 @@ """Factory for creating air traffic streamers.""" -from typing import Literal +from enum import StrEnum from .amqp_streamer import AMQPStreamer from .flight_blender_streamer import FlightBlenderStreamer, RefreshModeType from .null_streamer import NullStreamer from .protocol import AirTrafficStreamer -TargetType = Literal["flight_blender", "amqp", "none"] + +class TargetType(StrEnum): + FLIGHT_BLENDER = "flight_blender" + AMQP = "amqp" + NONE = "none" def create_streamer( name: TargetType, *, session_ids: list[str] | None = None, - refresh_mode: RefreshModeType = "normal", + refresh_mode: RefreshModeType = RefreshModeType.NORMAL, **kwargs, ) -> AirTrafficStreamer: """Factory function to create streamers by name. @@ -31,10 +35,10 @@ def create_streamer( Raises: ValueError: If the streamer name is not recognized. """ - streamers: dict[str, type] = { - "flight_blender": FlightBlenderStreamer, - "amqp": AMQPStreamer, - "none": NullStreamer, + streamers: dict[TargetType, type] = { + TargetType.FLIGHT_BLENDER: FlightBlenderStreamer, + TargetType.AMQP: AMQPStreamer, + TargetType.NONE: NullStreamer, } if name not in streamers: diff --git a/tests/test_stream_air_traffic.py b/tests/test_stream_air_traffic.py index 041acf8..ab5e39e 100644 --- a/tests/test_stream_air_traffic.py +++ b/tests/test_stream_air_traffic.py @@ -246,18 +246,14 @@ def test_step_param_model_has_required_fields(self): def test_provider_type_literal(self): """Test that ProviderType includes expected values.""" - from typing import get_args - expected = {"geojson", "bluesky", "bayesian", "opensky"} - actual = set(get_args(ProviderType)) + actual = set(item.value for item in ProviderType) assert actual == expected def test_target_type_literal(self): """Test that TargetType includes expected values.""" - from typing import get_args - expected = {"flight_blender", "amqp", "none"} - actual = set(get_args(TargetType)) + actual = set(item.value for item in TargetType) assert actual == expected From 946f6da12d0b521b0d953d10f73a98f6c68350bb Mon Sep 17 00:00:00 2001 From: Attila Kobor Date: Sat, 28 Feb 2026 15:30:48 +0100 Subject: [PATCH 6/8] fix test --- src/openutm_verification/scenarios/common.py | 7 +++++++ tests/test_group_execution.py | 13 +++++++++---- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/src/openutm_verification/scenarios/common.py b/src/openutm_verification/scenarios/common.py index e667947..e206012 100644 --- a/src/openutm_verification/scenarios/common.py +++ b/src/openutm_verification/scenarios/common.py @@ -1,4 +1,5 @@ import json +import random import uuid from pathlib import Path @@ -69,3 +70,9 @@ def get_geo_fence_path(geo_fence_filename: str) -> str: async def generate_uuid() -> str: """Generates a random UUID.""" return str(uuid.uuid4()) + + +@scenario_step("Generate Random number") +async def generate_random_number(min: int = 0, max: int = 5) -> int: + """Generates a random number.""" + return random.randint(min, max) diff --git a/tests/test_group_execution.py b/tests/test_group_execution.py index ce19ca4..91630ca 100644 --- a/tests/test_group_execution.py +++ b/tests/test_group_execution.py @@ -88,15 +88,20 @@ async def test_group_references_within_group(): groups: process_data: steps: - - id: fetch + - id: stream step: Stream Air Traffic arguments: provider: opensky target: flight_blender - - id: submit + - id: generate + step: Generate Random number + arguments: + min: 1 + max: 5 + - id: wait step: Wait X seconds arguments: - duration: 1 + duration: ${{ group.generate.result }} steps: - step: process_data @@ -110,7 +115,7 @@ async def test_group_references_within_group(): group = scenario.groups["process_data"] # Verify the submit step has correct arguments - assert group.steps[1].arguments["duration"] == 1 + assert group.steps[2].arguments["duration"] == "${{ group.generate.result }}" @pytest.mark.asyncio From 1efd21c3087e9e16fce1e186ec8366b32c7b01e3 Mon Sep 17 00:00:00 2001 From: Attila Kobor Date: Sat, 28 Feb 2026 15:31:06 +0100 Subject: [PATCH 7/8] case --- src/openutm_verification/scenarios/common.py | 2 +- tests/test_group_execution.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/openutm_verification/scenarios/common.py b/src/openutm_verification/scenarios/common.py index e206012..5ba3f09 100644 --- a/src/openutm_verification/scenarios/common.py +++ b/src/openutm_verification/scenarios/common.py @@ -72,7 +72,7 @@ async def generate_uuid() -> str: return str(uuid.uuid4()) -@scenario_step("Generate Random number") +@scenario_step("Generate Random Number") async def generate_random_number(min: int = 0, max: int = 5) -> int: """Generates a random number.""" return random.randint(min, max) diff --git a/tests/test_group_execution.py b/tests/test_group_execution.py index 91630ca..b408d36 100644 --- a/tests/test_group_execution.py +++ b/tests/test_group_execution.py @@ -94,7 +94,7 @@ async def test_group_references_within_group(): provider: opensky target: flight_blender - id: generate - step: Generate Random number + step: Generate Random Number arguments: min: 1 max: 5 From b8da2e20cda2c08e8096c31a370dd2a5c65a84aa Mon Sep 17 00:00:00 2001 From: Attila Kobor Date: Sat, 28 Feb 2026 15:38:50 +0100 Subject: [PATCH 8/8] standardize simulation_duration --- config/default.yaml | 4 ++-- config/pull_request.yaml | 4 ++-- .../core/clients/air_traffic/base_client.py | 8 ++++---- .../bayesian_air_traffic_client.py | 4 ++-- .../clients/air_traffic/blue_sky_client.py | 6 +++--- .../flight_blender/flight_blender_client.py | 14 +++++++------- .../core/execution/config_models.py | 19 ++++--------------- .../core/providers/bayesian_provider.py | 2 +- .../core/providers/bluesky_provider.py | 2 +- .../core/steps/air_traffic_step.py | 6 +++--- tests/test_client_settings.py | 8 ++++---- tests/test_client_steps.py | 2 +- tests/test_reporting_output.py | 4 ++-- tests/test_stream_air_traffic.py | 4 ++-- tests/test_suite_scenario_merge.py | 8 ++++---- web-editor/src/components/ScenarioEditor.tsx | 2 +- .../ScenarioEditor/ConfigEditor.tsx | 8 ++++---- web-editor/src/types/scenario.ts | 4 ++-- 18 files changed, 49 insertions(+), 60 deletions(-) diff --git a/config/default.yaml b/config/default.yaml index 2063c92..861fe27 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -35,7 +35,7 @@ air_traffic_simulator_settings: # Bluesky Air traffic data configuration blue_sky_air_traffic_simulator_settings: number_of_aircraft: 3 - simulation_duration_seconds: 30 + simulation_duration: 30 single_or_multiple_sensors: "single" # this setting specifies if the traffic data is submitted from a single sensor or multiple sensors sensor_ids: ["562e6297036a4adebb4848afcd1ede90"] # List of sensor IDs to use when 'multiple' is selected session_ids: ["ee9405e564ea4373823e37d950858e6a"] # List of session IDs to use when 'multiple' is selected, a session id is needed in Flight Blender to depict a period of time these observations were made (this assumes the observations may not be continuous); if empty, random UUIDs will be generated @@ -43,7 +43,7 @@ blue_sky_air_traffic_simulator_settings: # Bayesian Air traffic data configuration bayesian_air_traffic_simulator_settings: number_of_aircraft: 3 - simulation_duration_seconds: 30 + simulation_duration: 30 single_or_multiple_sensors: "single" # this setting specifies if the traffic data is submitted from a single sensor or multiple sensors sensor_ids: ["562e6297036a4adebb4848afcd1ede90"] # List of sensor IDs to use when 'multiple' is selected session_ids: ["ee9405e564ea4373823e37d950858e6a"] # List of session IDs to use when 'multiple' is selected, a session id is needed in Flight Blender to depict a period of time these observations were made (this assumes the observations may not be continuous); if empty, random UUIDs will be generated diff --git a/config/pull_request.yaml b/config/pull_request.yaml index ffea157..ded32bb 100644 --- a/config/pull_request.yaml +++ b/config/pull_request.yaml @@ -28,14 +28,14 @@ air_traffic_simulator_settings: # Bluesky Air traffic data configuration blue_sky_air_traffic_simulator_settings: number_of_aircraft: 3 - simulation_duration_seconds: 30 + simulation_duration: 30 single_or_multiple_sensors: "multiple" # this setting specifiies if the traffic data is submitted from a single sensor or multiple sensors sensor_ids: ["562e6297036a4adebb4848afcd1ede90"] # List of sensor IDs to use when 'multiple' is selected # Bayesian Air traffic data configuration bayesian_air_traffic_simulator_settings: number_of_aircraft: 3 - simulation_duration_seconds: 30 + simulation_duration: 30 single_or_multiple_sensors: "multiple" # this setting specifies if the traffic data is submitted from a single sensor or multiple sensors sensor_ids: ["562e6297036a4adebb4848afcd1ede90"] # List of sensor IDs to use when 'multiple' is selected session_ids: ["ee9405e564ea4373823e37d950858e6a"] # List of session IDs to use when 'multiple' is selected, a session id is needed in Flight Blender to depict a period of time these observations were made (this assumes the observations may not be continuous); if empty, random UUIDs will be generated diff --git a/src/openutm_verification/core/clients/air_traffic/base_client.py b/src/openutm_verification/core/clients/air_traffic/base_client.py index 32be574..6267912 100644 --- a/src/openutm_verification/core/clients/air_traffic/base_client.py +++ b/src/openutm_verification/core/clients/air_traffic/base_client.py @@ -51,7 +51,7 @@ class BlueSkyAirTrafficSettings(BaseModel): """Settings for BlueSky Air Traffic API.""" simulation_config_path: str = "" - simulation_duration_seconds: int = 30 + simulation_duration: int = 30 number_of_aircraft: int = 2 single_or_multiple_sensors: Literal["single", "multiple"] = SENSOR_MODE_SINGLE sensor_ids: list[str] = [] @@ -62,7 +62,7 @@ def from_config(cls, sim_config: "BlueSkySimConfig", simulation_path: str | None """Create settings from config.""" return cls( simulation_config_path=simulation_path or "", - simulation_duration_seconds=sim_config.simulation_duration_seconds, + simulation_duration=sim_config.simulation_duration, number_of_aircraft=sim_config.number_of_aircraft, single_or_multiple_sensors=sim_config.single_or_multiple_sensors, sensor_ids=sim_config.sensor_ids, @@ -74,7 +74,7 @@ class BayesianAirTrafficSettings(BaseModel): """Settings for Bayesian Air Traffic API.""" simulation_config_path: str = "" - simulation_duration_seconds: int = 30 + simulation_duration: int = 30 number_of_aircraft: int = 2 single_or_multiple_sensors: Literal["single", "multiple"] = SENSOR_MODE_SINGLE @@ -86,7 +86,7 @@ def from_config(cls, sim_config: "BayesianSimConfig", simulation_path: str | Non """Create settings from config.""" return cls( simulation_config_path=simulation_path or "", - simulation_duration_seconds=sim_config.simulation_duration_seconds, + simulation_duration=sim_config.simulation_duration, number_of_aircraft=sim_config.number_of_aircraft, single_or_multiple_sensors=sim_config.single_or_multiple_sensors, sensor_ids=sim_config.sensor_ids, diff --git a/src/openutm_verification/core/clients/air_traffic/bayesian_air_traffic_client.py b/src/openutm_verification/core/clients/air_traffic/bayesian_air_traffic_client.py index 22daead..fbeb52a 100644 --- a/src/openutm_verification/core/clients/air_traffic/bayesian_air_traffic_client.py +++ b/src/openutm_verification/core/clients/air_traffic/bayesian_air_traffic_client.py @@ -75,14 +75,14 @@ async def generate_bayesian_sim_air_traffic_data( Args: config_path: Path to .scn scenario file. Defaults to settings.simulation_config_path. - duration: Simulation duration in seconds. Defaults to settings.simulation_duration_seconds (expected 30). + duration: Simulation duration in seconds. Defaults to settings.simulation_duration (expected 30). Returns: list[list[FlightObservationSchema]]: outer list per aircraft (icao_address), inner list is time-series sampled at 1 Hz. """ # scn_path = config_path or self.settings.simulation_config_path - duration_in_seconds = int(duration or self.settings.simulation_duration_seconds or 30) + duration_in_seconds = int(duration or self.settings.simulation_duration or 30) number_of_aircraft = self.settings.number_of_aircraft or 3 sensor_ids = self.settings.sensor_ids use_multiple_sensors = self.settings.single_or_multiple_sensors == SENSOR_MODE_MULTIPLE diff --git a/src/openutm_verification/core/clients/air_traffic/blue_sky_client.py b/src/openutm_verification/core/clients/air_traffic/blue_sky_client.py index 34eb47a..fabdc0a 100644 --- a/src/openutm_verification/core/clients/air_traffic/blue_sky_client.py +++ b/src/openutm_verification/core/clients/air_traffic/blue_sky_client.py @@ -46,7 +46,7 @@ async def generate_bluesky_sim_air_traffic_data( Args: config_path: Path to .scn scenario file. Defaults to settings.simulation_config_path. - duration: Simulation duration in seconds. Defaults to settings.simulation_duration_seconds (expected 30). + duration: Simulation duration in seconds. Defaults to settings.simulation_duration (expected 30). Returns: list[list[FlightObservationSchema]]: outer list per aircraft (icao_address), @@ -54,7 +54,7 @@ async def generate_bluesky_sim_air_traffic_data( """ scn_path = config_path or self.settings.simulation_config_path - duration_s = int(duration or self.settings.simulation_duration_seconds or 30) + duration_s = int(duration or self.settings.simulation_duration or 30) sensor_ids = self.settings.sensor_ids use_multiple_sensors = self.settings.single_or_multiple_sensors == SENSOR_MODE_MULTIPLE @@ -147,7 +147,7 @@ async def generate_bluesky_sim_air_traffic_data_with_sensor_latency_issues( """This method generates""" scn_path = config_path or self.settings.simulation_config_path - duration_s = int(duration or self.settings.simulation_duration_seconds or 30) + duration_s = int(duration or self.settings.simulation_duration or 30) sensor_ids = self.settings.sensor_ids use_multiple_sensors = self.settings.single_or_multiple_sensors == SENSOR_MODE_MULTIPLE diff --git a/src/openutm_verification/core/clients/flight_blender/flight_blender_client.py b/src/openutm_verification/core/clients/flight_blender/flight_blender_client.py index f4c8894..f062d35 100644 --- a/src/openutm_verification/core/clients/flight_blender/flight_blender_client.py +++ b/src/openutm_verification/core/clients/flight_blender/flight_blender_client.py @@ -901,7 +901,7 @@ async def submit_simulated_air_traffic( "observations_submitted": observations_submitted, "submission_errors": submission_errors, "duration_seconds": round(duration_seconds, 2), - "simulation_duration_seconds": (simulation_end - simulation_start).total_seconds(), + "simulation_duration": (simulation_end - simulation_start).total_seconds(), } @scenario_step("Submit Simulated Air Traffic at varying refresh rates") @@ -1068,7 +1068,7 @@ async def verify_reported_metrics_in_flight_blender( simulation_start = min(start_times) simulation_end = max(end_times) - simulation_duration_seconds = (simulation_end - simulation_start).total_seconds() + simulation_duration = (simulation_end - simulation_start).total_seconds() metrics_endpoint = ( f"/surveillance_monitoring_ops/service_metrics/?session_id={session_id}&start_time={simulation_start}&end_time={simulation_end}" ) @@ -1081,7 +1081,7 @@ async def verify_reported_metrics_in_flight_blender( return StepResult( name="Verify Reported Metrics in Flight Blender", status=Status.FAIL, - duration=round(simulation_duration_seconds, 2), + duration=round(simulation_duration, 2), error_message=f"Metrics endpoint returned HTTP {metrics_response.status_code}", ) @@ -1091,14 +1091,14 @@ async def verify_reported_metrics_in_flight_blender( return StepResult( name="Verify Reported Metrics in Flight Blender", status=Status.FAIL, - duration=round(simulation_duration_seconds, 2), + duration=round(simulation_duration, 2), error_message=f"Invalid metrics response structure: {e}", ) num_aircraft = sum(1 for a in observations if a) total_observations = sum(len(a) for a in observations if a) - expected_track_update_probability = total_observations / (num_aircraft * simulation_duration_seconds) - expected_heartbeat_rate = total_observations / (num_aircraft * simulation_duration_seconds) + expected_track_update_probability = total_observations / (num_aircraft * simulation_duration) + expected_heartbeat_rate = total_observations / (num_aircraft * simulation_duration) expected_heartbeat_delivery_probability = 1.0 expected_active_sessions = 1 @@ -1120,7 +1120,7 @@ async def verify_reported_metrics_in_flight_blender( return StepResult( name="Verify Reported Metrics in Flight Blender", status=Status.PASS if not errors else Status.FAIL, - duration=round(simulation_duration_seconds, 2), + duration=round(simulation_duration, 2), error_message=None if not errors else "; ".join(errors), ) diff --git a/src/openutm_verification/core/execution/config_models.py b/src/openutm_verification/core/execution/config_models.py index 87d1707..d0fd0ad 100644 --- a/src/openutm_verification/core/execution/config_models.py +++ b/src/openutm_verification/core/execution/config_models.py @@ -37,31 +37,20 @@ class FlightBlenderConfig(StrictBaseModel): class AirTrafficSimulatorSettings(StrictBaseModel): number_of_aircraft: int - simulation_duration: int | str + simulation_duration: int single_or_multiple_sensors: Literal["single", "multiple"] = "single" sensor_ids: list[str] = Field(default_factory=list) session_ids: list[str] = Field(default_factory=list) - @field_validator("simulation_duration") + @field_validator("simulation_duration", mode="before") @classmethod def validate_duration(cls, v: int | str) -> int: return int(parse_duration(v)) - @property - def simulation_duration_seconds(self) -> int: - """Standardized accessor — returns duration in seconds. - - The ``simulation_duration`` field already validates to an ``int`` - of seconds via :meth:`validate_duration`. This property provides - the same name used by the BlueSky / Bayesian config models so - callers can access duration uniformly. - """ - return int(self.simulation_duration) - class BlueSkyAirTrafficSimulatorSettings(StrictBaseModel): number_of_aircraft: int - simulation_duration_seconds: int + simulation_duration: int single_or_multiple_sensors: Literal["single", "multiple"] = "single" sensor_ids: list[str] = Field(default_factory=list) session_ids: list[str] = Field(default_factory=list) @@ -69,7 +58,7 @@ class BlueSkyAirTrafficSimulatorSettings(StrictBaseModel): class BayesianAirTrafficSimulatorSettings(StrictBaseModel): number_of_aircraft: int - simulation_duration_seconds: int + simulation_duration: int single_or_multiple_sensors: Literal["single", "multiple"] = "single" sensor_ids: list[str] = Field(default_factory=list) session_ids: list[str] = Field(default_factory=list) diff --git a/src/openutm_verification/core/providers/bayesian_provider.py b/src/openutm_verification/core/providers/bayesian_provider.py index ed991c1..3b153a5 100644 --- a/src/openutm_verification/core/providers/bayesian_provider.py +++ b/src/openutm_verification/core/providers/bayesian_provider.py @@ -84,7 +84,7 @@ async def get_observations( settings = BayesianAirTrafficSettings( simulation_config_path=self._config_path, - simulation_duration_seconds=effective_duration, + simulation_duration=effective_duration, number_of_aircraft=self._number_of_aircraft, sensor_ids=self._sensor_ids, session_ids=self._session_ids, diff --git a/src/openutm_verification/core/providers/bluesky_provider.py b/src/openutm_verification/core/providers/bluesky_provider.py index 7a77a54..818bc6f 100644 --- a/src/openutm_verification/core/providers/bluesky_provider.py +++ b/src/openutm_verification/core/providers/bluesky_provider.py @@ -84,7 +84,7 @@ async def get_observations( settings = BlueSkyAirTrafficSettings( simulation_config_path=self._config_path, - simulation_duration_seconds=effective_duration, + simulation_duration=effective_duration, number_of_aircraft=self._number_of_aircraft, sensor_ids=self._sensor_ids, session_ids=self._session_ids, diff --git a/src/openutm_verification/core/steps/air_traffic_step.py b/src/openutm_verification/core/steps/air_traffic_step.py index 29c475c..a7a4706 100644 --- a/src/openutm_verification/core/steps/air_traffic_step.py +++ b/src/openutm_verification/core/steps/air_traffic_step.py @@ -97,7 +97,7 @@ def _apply_config_defaults( if provider == ProviderType.GEOJSON: sim = app_config.air_traffic_simulator_settings if duration is None: - duration = sim.simulation_duration_seconds + duration = sim.simulation_duration if number_of_aircraft is None: number_of_aircraft = sim.number_of_aircraft if sensor_ids is None and sim.sensor_ids: @@ -110,7 +110,7 @@ def _apply_config_defaults( elif provider == ProviderType.BLUESKY: sim = app_config.blue_sky_air_traffic_simulator_settings if duration is None: - duration = sim.simulation_duration_seconds + duration = sim.simulation_duration if number_of_aircraft is None: number_of_aircraft = sim.number_of_aircraft if sensor_ids is None and sim.sensor_ids: @@ -123,7 +123,7 @@ def _apply_config_defaults( elif provider == ProviderType.BAYESIAN: sim = app_config.bayesian_air_traffic_simulator_settings if duration is None: - duration = sim.simulation_duration_seconds + duration = sim.simulation_duration if number_of_aircraft is None: number_of_aircraft = sim.number_of_aircraft if sensor_ids is None and sim.sensor_ids: diff --git a/tests/test_client_settings.py b/tests/test_client_settings.py index 74de4a2..15313d9 100644 --- a/tests/test_client_settings.py +++ b/tests/test_client_settings.py @@ -170,7 +170,7 @@ def test_from_config_all_fields(self): """All config fields are correctly mapped.""" sim_config = BlueSkyAirTrafficSimulatorSettings( number_of_aircraft=10, - simulation_duration_seconds=120, + simulation_duration=120, single_or_multiple_sensors="single", sensor_ids=["bluesky_sensor"], session_ids=["bluesky_session"], @@ -182,7 +182,7 @@ def test_from_config_all_fields(self): ) assert settings.simulation_config_path == "/path/to/simulation.scn" - assert settings.simulation_duration_seconds == 120 + assert settings.simulation_duration == 120 assert settings.number_of_aircraft == 10 assert settings.single_or_multiple_sensors == SENSOR_MODE_SINGLE assert settings.sensor_ids == ["bluesky_sensor"] @@ -192,7 +192,7 @@ def test_from_config_no_simulation(self): """Settings work without simulation path.""" sim_config = BlueSkyAirTrafficSimulatorSettings( number_of_aircraft=3, - simulation_duration_seconds=30, + simulation_duration=30, ) settings = BlueSkyAirTrafficSettings.from_config(sim_config, simulation_path=None) @@ -204,7 +204,7 @@ def test_from_config_defaults(self): """Default values work correctly.""" sim_config = BlueSkyAirTrafficSimulatorSettings( number_of_aircraft=2, - simulation_duration_seconds=30, + simulation_duration=30, ) settings = BlueSkyAirTrafficSettings.from_config(sim_config) diff --git a/tests/test_client_steps.py b/tests/test_client_steps.py index 088aa74..e5e7a2d 100644 --- a/tests/test_client_steps.py +++ b/tests/test_client_steps.py @@ -31,7 +31,7 @@ def fb_client(): def at_client(): settings = MagicMock() settings.simulation_config_path = "test_config.json" - settings.simulation_duration_seconds = 60 + settings.simulation_duration = 60 settings.number_of_aircraft = 1 settings.sensor_ids = [] client = AirTrafficClient(settings) diff --git a/tests/test_reporting_output.py b/tests/test_reporting_output.py index a47143e..61d40d7 100644 --- a/tests/test_reporting_output.py +++ b/tests/test_reporting_output.py @@ -29,10 +29,10 @@ def test_report_outputs_use_result(tmp_path: Path): number_of_aircraft=1, simulation_duration=1, single_or_multiple_sensors="single", sensor_ids=[] ), blue_sky_air_traffic_simulator_settings=BlueSkyAirTrafficSimulatorSettings( - number_of_aircraft=1, simulation_duration_seconds=1, single_or_multiple_sensors="single", sensor_ids=[] + number_of_aircraft=1, simulation_duration=1, single_or_multiple_sensors="single", sensor_ids=[] ), bayesian_air_traffic_simulator_settings=BayesianAirTrafficSimulatorSettings( - number_of_aircraft=1, simulation_duration_seconds=1, single_or_multiple_sensors="single", sensor_ids=[] + number_of_aircraft=1, simulation_duration=1, single_or_multiple_sensors="single", sensor_ids=[] ), data_files=DataFiles(), suites={}, diff --git a/tests/test_stream_air_traffic.py b/tests/test_stream_air_traffic.py index ab5e39e..b529529 100644 --- a/tests/test_stream_air_traffic.py +++ b/tests/test_stream_air_traffic.py @@ -407,7 +407,7 @@ async def test_bluesky_provider_instantiates_client_with_correct_settings(self, settings = mock_client_class.call_args[0][0] assert settings.simulation_config_path == "/path/to/scenario.scn" - assert settings.simulation_duration_seconds == 25 + assert settings.simulation_duration == 25 assert settings.number_of_aircraft == 2 assert settings.sensor_ids == ["sensor-1"] assert settings.session_ids == ["session-1"] @@ -466,7 +466,7 @@ async def test_bayesian_provider_instantiates_client_with_correct_settings(self, settings = mock_client_class.call_args[0][0] assert settings.simulation_config_path == "/path/to/model.mat" - assert settings.simulation_duration_seconds == 80 + assert settings.simulation_duration == 80 assert settings.number_of_aircraft == 5 assert settings.sensor_ids == ["sensor-bayesian"] assert settings.session_ids == ["session-bayesian"] diff --git a/tests/test_suite_scenario_merge.py b/tests/test_suite_scenario_merge.py index c070aa8..47d34a7 100644 --- a/tests/test_suite_scenario_merge.py +++ b/tests/test_suite_scenario_merge.py @@ -136,11 +136,11 @@ def test_config_loading_merges_defaults(self, tmp_path): }, "blue_sky_air_traffic_simulator_settings": { "number_of_aircraft": 3, - "simulation_duration_seconds": 30, + "simulation_duration": 30, }, "bayesian_air_traffic_simulator_settings": { "number_of_aircraft": 3, - "simulation_duration_seconds": 30, + "simulation_duration": 30, }, "data_files": { "trajectory": "default_traj.json", @@ -212,11 +212,11 @@ def test_daa_scenario_config_example(self, tmp_path): }, "blue_sky_air_traffic_simulator_settings": { "number_of_aircraft": 3, - "simulation_duration_seconds": 30, + "simulation_duration": 30, }, "bayesian_air_traffic_simulator_settings": { "number_of_aircraft": 3, - "simulation_duration_seconds": 30, + "simulation_duration": 30, }, "data_files": { "trajectory": "trajectory_f1.json", # DEFAULT diff --git a/web-editor/src/components/ScenarioEditor.tsx b/web-editor/src/components/ScenarioEditor.tsx index 5567ee4..b8ec44b 100644 --- a/web-editor/src/components/ScenarioEditor.tsx +++ b/web-editor/src/components/ScenarioEditor.tsx @@ -72,7 +72,7 @@ const ScenarioEditorContent = () => { }, blue_sky_air_traffic_simulator_settings: { number_of_aircraft: 3, - simulation_duration_seconds: 30, + simulation_duration: 30, single_or_multiple_sensors: "multiple", sensor_ids: ["562e6297036a4adebb4848afcd1ede90"] } diff --git a/web-editor/src/components/ScenarioEditor/ConfigEditor.tsx b/web-editor/src/components/ScenarioEditor/ConfigEditor.tsx index 5e04771..6187bd1 100644 --- a/web-editor/src/components/ScenarioEditor/ConfigEditor.tsx +++ b/web-editor/src/components/ScenarioEditor/ConfigEditor.tsx @@ -483,8 +483,8 @@ export const ConfigEditor: React.FC = ({ config, onUpdateConf updateBlueSkyAirTrafficSimulator('simulation_duration_seconds', parseInt(e.target.value))} + value={config.blue_sky_air_traffic_simulator_settings?.simulation_duration || 30} + onChange={(e) => updateBlueSkyAirTrafficSimulator('simulation_duration', parseInt(e.target.value))} min="1" max="3600" /> @@ -561,8 +561,8 @@ export const ConfigEditor: React.FC = ({ config, onUpdateConf updateBayesianAirTrafficSimulator('simulation_duration_seconds', parseInt(e.target.value))} + value={config.bayesian_air_traffic_simulator_settings?.simulation_duration || 30} + onChange={(e) => updateBayesianAirTrafficSimulator('simulation_duration', parseInt(e.target.value))} min="1" max="3600" /> diff --git a/web-editor/src/types/scenario.ts b/web-editor/src/types/scenario.ts index be484eb..90f6628 100644 --- a/web-editor/src/types/scenario.ts +++ b/web-editor/src/types/scenario.ts @@ -111,14 +111,14 @@ export interface AirTrafficSimulatorSettings { export interface BlueSkyAirTrafficSimulatorSettings { number_of_aircraft?: number; - simulation_duration_seconds?: number; + simulation_duration?: number; single_or_multiple_sensors?: string; sensor_ids?: string[]; } export interface BayesianAirTrafficSimulatorSettings { number_of_aircraft?: number; - simulation_duration_seconds?: number; + simulation_duration?: number; single_or_multiple_sensors?: string; sensor_ids?: string[]; }