diff --git a/src/openutm_verification/core/clients/air_traffic/air_traffic_client.py b/src/openutm_verification/core/clients/air_traffic/air_traffic_client.py index 6f8ccea..73a68b5 100644 --- a/src/openutm_verification/core/clients/air_traffic/air_traffic_client.py +++ b/src/openutm_verification/core/clients/air_traffic/air_traffic_client.py @@ -6,6 +6,7 @@ from loguru import logger from openutm_verification.core.clients.air_traffic.base_client import ( + SENSOR_MODE_MULTIPLE, AirTrafficSettings, BaseAirTrafficAPIClient, ) @@ -76,6 +77,7 @@ async def generate_simulated_air_traffic_data( duration = duration or self.settings.simulation_duration number_of_aircraft = self.settings.number_of_aircraft sensor_ids = self.settings.sensor_ids + use_multiple_sensors = self.settings.single_or_multiple_sensors == SENSOR_MODE_MULTIPLE try: # create a list of UUIDs with at least one UUID if session_ids is empty @@ -96,6 +98,7 @@ async def generate_simulated_air_traffic_data( duration=duration, number_of_aircraft=number_of_aircraft, sensor_ids=sensor_ids, + use_multiple_sensors=use_multiple_sensors, ) except Exception as exc: # noqa: BLE001 diff --git a/src/openutm_verification/core/clients/air_traffic/base_client.py b/src/openutm_verification/core/clients/air_traffic/base_client.py index 203c935..32be574 100644 --- a/src/openutm_verification/core/clients/air_traffic/base_client.py +++ b/src/openutm_verification/core/clients/air_traffic/base_client.py @@ -1,9 +1,13 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Literal from pydantic import BaseModel +# Constants for sensor mode configuration +SENSOR_MODE_SINGLE = "single" +SENSOR_MODE_MULTIPLE = "multiple" + if TYPE_CHECKING: from openutm_verification.core.execution.config_models import ( AirTrafficSimulatorSettings as AirTrafficSimConfig, @@ -26,7 +30,7 @@ class AirTrafficSettings(BaseModel): simulation_config_path: str = "" simulation_duration: int = 30 number_of_aircraft: int = 2 - single_or_multiple_sensors: str = "single" + single_or_multiple_sensors: Literal["single", "multiple"] = SENSOR_MODE_SINGLE sensor_ids: list[str] = [] session_ids: list[str] = [] @@ -35,7 +39,7 @@ def from_config(cls, sim_config: "AirTrafficSimConfig", trajectory_path: str | N """Create settings from config.""" return cls( simulation_config_path=trajectory_path or "", - simulation_duration=sim_config.simulation_duration, + simulation_duration=int(sim_config.simulation_duration), number_of_aircraft=sim_config.number_of_aircraft, single_or_multiple_sensors=sim_config.single_or_multiple_sensors, sensor_ids=sim_config.sensor_ids, @@ -49,7 +53,7 @@ class BlueSkyAirTrafficSettings(BaseModel): simulation_config_path: str = "" simulation_duration_seconds: int = 30 number_of_aircraft: int = 2 - single_or_multiple_sensors: str = "single" + single_or_multiple_sensors: Literal["single", "multiple"] = SENSOR_MODE_SINGLE sensor_ids: list[str] = [] session_ids: list[str] = [] @@ -73,7 +77,7 @@ class BayesianAirTrafficSettings(BaseModel): simulation_duration_seconds: int = 30 number_of_aircraft: int = 2 - single_or_multiple_sensors: str = "single" + single_or_multiple_sensors: Literal["single", "multiple"] = SENSOR_MODE_SINGLE sensor_ids: list[str] = [] session_ids: list[str] = [] diff --git a/src/openutm_verification/core/clients/air_traffic/bayesian_air_traffic_client.py b/src/openutm_verification/core/clients/air_traffic/bayesian_air_traffic_client.py index 73f76b6..65c17cf 100644 --- a/src/openutm_verification/core/clients/air_traffic/bayesian_air_traffic_client.py +++ b/src/openutm_verification/core/clients/air_traffic/bayesian_air_traffic_client.py @@ -10,6 +10,7 @@ from pyproj import Transformer from openutm_verification.core.clients.air_traffic.base_client import ( + SENSOR_MODE_MULTIPLE, BayesianAirTrafficClient, BayesianAirTrafficSettings, ) @@ -61,6 +62,7 @@ async def generate_bayesian_sim_air_traffic_data( duration_in_seconds = int(duration or self.settings.simulation_duration_seconds or 30) number_of_aircraft = self.settings.number_of_aircraft or 3 sensor_ids = self.settings.sensor_ids + use_multiple_sensors = self.settings.single_or_multiple_sensors == SENSOR_MODE_MULTIPLE try: # create a list of UUIDs with at least one UUID if session_ids is empty @@ -68,7 +70,6 @@ async def generate_bayesian_sim_air_traffic_data( except ValueError as exc: logger.error(f"Invalid sensor ID in configuration, it should be a valid UUID: {exc}") raise - # current_sensor_id = sensor_ids[0] # List the models bundled with the library available_models = get_available_model_files() @@ -76,7 +77,7 @@ async def generate_bayesian_sim_air_traffic_data( if not available_models: logger.info("No models found.") - return + return [] # Use one of the models model_filename = "Light_Aircraft_Below_10000_ft_Data.mat" @@ -87,7 +88,7 @@ async def generate_bayesian_sim_air_traffic_data( if session is None: logger.info("Failed to create a session.") - return + return [] # Generate a few tracks logger.info("Generating tracks...") @@ -109,6 +110,8 @@ async def generate_bayesian_sim_air_traffic_data( track=track, icao_address=icao_address, base_timestamp=base_timestamp, + sensor_ids=sensor_ids, + use_multiple_sensors=use_multiple_sensors, ) all_observations.append(observations) logger.info(f"Track {track_idx} ({icao_address}): {len(observations)} observations") @@ -124,6 +127,8 @@ def _convert_track_to_observations( track: TrackResultData, icao_address: str, base_timestamp: int, + sensor_ids: list[UUID], + use_multiple_sensors: bool, ) -> list[FlightObservationSchema]: """Convert a raw track dict from cam-track-gen to FlightObservationSchema list. @@ -153,7 +158,11 @@ def _convert_track_to_observations( altitude_mm = float(alt_ft_val) * FEET_TO_MM timestamp = base_timestamp + int(round(float(t))) + # Assign sensor ID: randomly select from list if multiple sensors, otherwise use first + selected_sensor_id = random.choice(sensor_ids) if use_multiple_sensors and len(sensor_ids) > 1 else sensor_ids[0] + metadata = { + "sensor_id": str(selected_sensor_id), "speed_feet_per_second": float(speed_ft_s_val), "bank_angle_radians": float(bank_val), "pitch_angle_radians": float(pitch_val), diff --git a/src/openutm_verification/core/clients/air_traffic/blue_sky_client.py b/src/openutm_verification/core/clients/air_traffic/blue_sky_client.py index ed38af9..34eb47a 100644 --- a/src/openutm_verification/core/clients/air_traffic/blue_sky_client.py +++ b/src/openutm_verification/core/clients/air_traffic/blue_sky_client.py @@ -13,6 +13,7 @@ from loguru import logger from openutm_verification.core.clients.air_traffic.base_client import ( + SENSOR_MODE_MULTIPLE, BaseBlueSkyAirTrafficClient, BlueSkyAirTrafficSettings, ) @@ -56,6 +57,7 @@ async def generate_bluesky_sim_air_traffic_data( duration_s = int(duration or self.settings.simulation_duration_seconds or 30) sensor_ids = self.settings.sensor_ids + use_multiple_sensors = self.settings.single_or_multiple_sensors == SENSOR_MODE_MULTIPLE try: # create a list of UUIDs with at least one UUID if session_ids is empty @@ -63,7 +65,6 @@ async def generate_bluesky_sim_air_traffic_data( except ValueError as exc: logger.error(f"Invalid sensor ID in configuration, it should be a valid UUID: {exc}") raise - current_sensor_id = sensor_ids[0] if not scn_path: raise ValueError("No scenario path provided. Provide config_path or set settings.simulation_config_path.") @@ -115,7 +116,10 @@ async def generate_bluesky_sim_air_traffic_data( # We store altitude_mm as "millimeters"; keep it consistent with your schema. # If alt is actually feet, you can convert here: alt_m = alt_ft * 0.3048 altitude_mm = alt_m_or_ft * 1000.0 - metadata = {"sensor_id": current_sensor_id} if current_sensor_id else {} + + # Assign sensor ID: randomly select from list if multiple sensors, otherwise use first + selected_sensor_id = random.choice(sensor_ids) if use_multiple_sensors and len(sensor_ids) > 1 else sensor_ids[0] + metadata = {"sensor_id": str(selected_sensor_id)} if selected_sensor_id else {} obs = FlightObservationSchema( lat_dd=lat, @@ -142,11 +146,11 @@ async def generate_bluesky_sim_air_traffic_data_with_sensor_latency_issues( ) -> list[list[FlightObservationSchema]]: """This method generates""" - scn_path = config_path or self.settings.simulation_config_path duration_s = int(duration or self.settings.simulation_duration_seconds or 30) sensor_ids = self.settings.sensor_ids + use_multiple_sensors = self.settings.single_or_multiple_sensors == SENSOR_MODE_MULTIPLE try: # create a list of UUIDs with at least one UUID if session_ids is empty @@ -154,7 +158,6 @@ async def generate_bluesky_sim_air_traffic_data_with_sensor_latency_issues( except ValueError as exc: logger.error(f"Invalid sensor ID in configuration, it should be a valid UUID: {exc}") raise - current_sensor_id = sensor_ids[0] if not scn_path: raise ValueError("No scenario path provided. Provide config_path or set settings.simulation_config_path.") @@ -207,7 +210,10 @@ async def generate_bluesky_sim_air_traffic_data_with_sensor_latency_issues( # We store altitude_mm as "millimeters"; keep it consistent with your schema. # If alt is actually feet, you can convert here: alt_m = alt_ft * 0.3048 altitude_mm = alt_m_or_ft * 1000.0 - metadata = {"sensor_id": current_sensor_id} if current_sensor_id else {} + + # Assign sensor ID: randomly select from list if multiple sensors, otherwise use first + selected_sensor_id = random.choice(sensor_ids) if use_multiple_sensors and len(sensor_ids) > 1 else sensor_ids[0] + metadata = {"sensor_id": str(selected_sensor_id)} if selected_sensor_id else {} obs = FlightObservationSchema( lat_dd=lat, @@ -226,7 +232,6 @@ async def generate_bluesky_sim_air_traffic_data_with_sensor_latency_issues( # Convert dict -> list[list[FlightObservationSchema]] with stable ordering flight_observations = [results_by_acid[acid] for acid in sorted(results_by_acid.keys())] - # This method modifies the retrieved simulation data by changing the timestamp and adding latency to the observed dataset LATENCY_PROBABILITY = 0.1 # 10% chance to have latency issues TIMESTAMP_SHIFT_RANGE_SECONDS = (-1, 2.5) # Shift timestamps by -5 to +5 seconds diff --git a/src/openutm_verification/simulator/geo_json_telemetry.py b/src/openutm_verification/simulator/geo_json_telemetry.py index 46f89c8..9a20a39 100644 --- a/src/openutm_verification/simulator/geo_json_telemetry.py +++ b/src/openutm_verification/simulator/geo_json_telemetry.py @@ -80,6 +80,7 @@ def generate_air_traffic_data( duration: int, sensor_ids: list[UUID], number_of_aircraft: int = 1, + use_multiple_sensors: bool = False, ) -> list[list[FlightObservationSchema]]: """Generate simulated air traffic observations for the specified duration. @@ -88,14 +89,16 @@ def generate_air_traffic_data( Args: duration: Number of seconds to generate data for. - session_id: Unique identifier for the simulation session. + sensor_ids: List of sensor IDs to use for observations. + number_of_aircraft: Number of aircraft to simulate. + use_multiple_sensors: If True, randomly assign sensor IDs from the list. Returns: - List of flight observation dictionaries containing position, altitude, - and metadata for each generated data point. + A list of flights, where each flight is a list of ``FlightObservationSchema`` + instances containing position, altitude, and metadata for each generated + data point. """ - sensor_id = str(sensor_ids[0]) - logger.info(f"Generating air traffic data for {duration} seconds with sensor ID {sensor_id}") + logger.info(f"Generating air traffic data for {duration} seconds with {'multiple' if use_multiple_sensors else 'single'} sensor(s)") all_trajectories = [] # Generate a random trajectory # Improve to generate a trajectory where the speed of the aircraft can be controlled @@ -109,24 +112,28 @@ def generate_air_traffic_data( coordinates = trajectory_geojson["coordinates"] airtraffic: list[FlightObservationSchema] = [] icao_address = "".join(random.choices("0123456789ABCDEF", k=6)) - for i in range(duration): + # Each coordinate corresponds to one second of flight time + for i, point in enumerate(coordinates): + if i >= duration: + break timestamp = self.reference_time.shift(seconds=i) - for point in coordinates: - metadata = {"sensor_id": sensor_id} if sensor_id else {} - # Convert altitude from meters to millimeters for altitude_mm field - altitude_m = self.config.altitude_of_ground_level_wgs_84 - airtraffic.append( - FlightObservationSchema( - lat_dd=point[1], - lon_dd=point[0], - altitude_mm=altitude_m * 1000, # Convert m -> mm - traffic_source=1, - source_type=2, - icao_address=icao_address, - timestamp=timestamp.int_timestamp, - metadata=metadata, - ) + # Assign sensor ID: randomly select from list if multiple sensors, otherwise use first + selected_sensor_id = random.choice(sensor_ids) if use_multiple_sensors and len(sensor_ids) > 1 else sensor_ids[0] + metadata = {"sensor_id": str(selected_sensor_id)} if selected_sensor_id else {} + # Convert altitude from meters to millimeters for altitude_mm field + altitude_m = self.config.altitude_of_ground_level_wgs_84 + airtraffic.append( + FlightObservationSchema( + lat_dd=point[1], + lon_dd=point[0], + altitude_mm=altitude_m * 1000, # Convert m -> mm + traffic_source=1, + source_type=2, + icao_address=icao_address, + timestamp=timestamp.int_timestamp, + metadata=metadata, ) + ) all_air_traffic.append(airtraffic) logger.info(f"Generated observations for {len(all_air_traffic)} aircraft") return all_air_traffic diff --git a/tests/test_altitude_units.py b/tests/test_altitude_units.py index 50800b0..3a75911 100644 --- a/tests/test_altitude_units.py +++ b/tests/test_altitude_units.py @@ -120,6 +120,70 @@ def test_air_traffic_altitude_is_in_millimeters(self, sample_geojson): expected_mm = altitude_meters * 1000 assert first_obs.altitude_mm == expected_mm, f"Expected altitude_mm={expected_mm} (from {altitude_meters}m), got {first_obs.altitude_mm}" + def test_single_sensor_mode_uses_one_sensor_id(self, sample_geojson): + """Verify all observations use the same sensor_id in single-sensor mode.""" + from uuid import uuid4 + + from openutm_verification.simulator.geo_json_telemetry import ( + GeoJSONAirtrafficSimulator, + ) + + config = AirTrafficGeneratorConfiguration( + geojson=sample_geojson, + reference_time=arrow.utcnow(), + ) + sensor_ids = [uuid4(), uuid4(), uuid4()] + simulator = GeoJSONAirtrafficSimulator(config) + result = simulator.generate_air_traffic_data( + duration=2, + sensor_ids=sensor_ids, + number_of_aircraft=1, + use_multiple_sensors=False, + ) + + unique_sensor_ids = {obs.metadata["sensor_id"] for flight in result for obs in flight} + assert len(unique_sensor_ids) == 1, f"Expected 1 sensor ID in single mode, got {len(unique_sensor_ids)}: {unique_sensor_ids}" + assert unique_sensor_ids == {str(sensor_ids[0])} + + def test_multiple_sensor_mode_assigns_varied_sensor_ids(self, sample_geojson): + """Verify observations contain more than one distinct sensor_id in multiple-sensor mode.""" + from uuid import uuid4 + + from openutm_verification.simulator.geo_json_telemetry import ( + GeoJSONAirtrafficSimulator, + ) + + # Use a longer GeoJSON path so we get enough observations for randomness + coords = [[7.47 + i * 0.001, 46.97 + i * 0.001] for i in range(50)] + geojson = { + "type": "FeatureCollection", + "features": [ + { + "type": "Feature", + "geometry": {"type": "LineString", "coordinates": coords}, + "properties": {}, + } + ], + } + + config = AirTrafficGeneratorConfiguration( + geojson=geojson, + reference_time=arrow.utcnow(), + ) + sensor_ids = [uuid4(), uuid4(), uuid4()] + simulator = GeoJSONAirtrafficSimulator(config) + result = simulator.generate_air_traffic_data( + duration=50, + sensor_ids=sensor_ids, + number_of_aircraft=1, + use_multiple_sensors=True, + ) + + unique_sensor_ids = {obs.metadata["sensor_id"] for flight in result for obs in flight} + assert len(unique_sensor_ids) > 1, ( + f"Expected multiple distinct sensor IDs in multiple mode, got {len(unique_sensor_ids)}: {unique_sensor_ids}" + ) + class TestOpenSkyClientAltitudeConversion: """Tests for OpenSky client altitude conversion.""" diff --git a/tests/test_client_settings.py b/tests/test_client_settings.py index 5766edb..74de4a2 100644 --- a/tests/test_client_settings.py +++ b/tests/test_client_settings.py @@ -5,6 +5,8 @@ """ from openutm_verification.core.clients.air_traffic.base_client import ( + SENSOR_MODE_MULTIPLE, + SENSOR_MODE_SINGLE, AirTrafficSettings, BlueSkyAirTrafficSettings, ) @@ -132,7 +134,7 @@ def test_from_config_all_fields(self): assert settings.simulation_config_path == "/path/to/trajectory.json" assert settings.simulation_duration == 60 assert settings.number_of_aircraft == 5 - assert settings.single_or_multiple_sensors == "multiple" + assert settings.single_or_multiple_sensors == SENSOR_MODE_MULTIPLE assert settings.sensor_ids == ["sensor1", "sensor2"] assert settings.session_ids == ["session1"] @@ -182,7 +184,7 @@ def test_from_config_all_fields(self): assert settings.simulation_config_path == "/path/to/simulation.scn" assert settings.simulation_duration_seconds == 120 assert settings.number_of_aircraft == 10 - assert settings.single_or_multiple_sensors == "single" + assert settings.single_or_multiple_sensors == SENSOR_MODE_SINGLE assert settings.sensor_ids == ["bluesky_sensor"] assert settings.session_ids == ["bluesky_session"] @@ -207,7 +209,7 @@ def test_from_config_defaults(self): settings = BlueSkyAirTrafficSettings.from_config(sim_config) - assert settings.single_or_multiple_sensors == "single" + assert settings.single_or_multiple_sensors == SENSOR_MODE_SINGLE assert settings.sensor_ids == [] assert settings.session_ids == [] diff --git a/tests/test_client_steps.py b/tests/test_client_steps.py index bff740c..55c9604 100644 --- a/tests/test_client_steps.py +++ b/tests/test_client_steps.py @@ -4,6 +4,7 @@ import pytest from openutm_verification.core.clients.air_traffic.air_traffic_client import AirTrafficClient +from openutm_verification.core.clients.air_traffic.base_client import SENSOR_MODE_MULTIPLE, SENSOR_MODE_SINGLE from openutm_verification.core.clients.common.common_client import CommonClient from openutm_verification.core.clients.flight_blender.flight_blender_client import FlightBlenderClient from openutm_verification.core.clients.opensky.opensky_client import OpenSkyClient @@ -527,6 +528,52 @@ async def test_generate_simulated_air_traffic_data(at_client): mock_sim_instance.generate_air_traffic_data.assert_called_once() +async def test_generate_simulated_air_traffic_data_single_sensor(): + """Verify use_multiple_sensors=False is passed when sensor mode is single.""" + settings = MagicMock() + settings.simulation_config_path = "test_config.json" + settings.simulation_duration = 60 + settings.number_of_aircraft = 1 + settings.sensor_ids = [] + settings.single_or_multiple_sensors = SENSOR_MODE_SINGLE + client = AirTrafficClient(settings) + + with ( + patch("builtins.open", mock_open(read_data='{"type": "FeatureCollection"}')), + patch("openutm_verification.core.clients.air_traffic.air_traffic_client.GeoJSONAirtrafficSimulator") as MockSim, + ): + mock_sim_instance = MockSim.return_value + mock_sim_instance.generate_air_traffic_data.return_value = [[{"obs": 1}]] + + await client.generate_simulated_air_traffic_data() + + call_kwargs = mock_sim_instance.generate_air_traffic_data.call_args + assert call_kwargs[1]["use_multiple_sensors"] is False + + +async def test_generate_simulated_air_traffic_data_multiple_sensors(): + """Verify use_multiple_sensors=True is passed when sensor mode is multiple.""" + settings = MagicMock() + settings.simulation_config_path = "test_config.json" + settings.simulation_duration = 60 + settings.number_of_aircraft = 1 + settings.sensor_ids = ["a0b7d47e5eac45dc8cbaf47e6fe0e558", "b1c8e58f6fbd56ed9dcb058f70f1f669"] + settings.single_or_multiple_sensors = SENSOR_MODE_MULTIPLE + client = AirTrafficClient(settings) + + with ( + patch("builtins.open", mock_open(read_data='{"type": "FeatureCollection"}')), + patch("openutm_verification.core.clients.air_traffic.air_traffic_client.GeoJSONAirtrafficSimulator") as MockSim, + ): + mock_sim_instance = MockSim.return_value + mock_sim_instance.generate_air_traffic_data.return_value = [[{"obs": 1}]] + + await client.generate_simulated_air_traffic_data() + + call_kwargs = mock_sim_instance.generate_air_traffic_data.call_args + assert call_kwargs[1]["use_multiple_sensors"] is True + + # OpenSkyClient Tests