Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from loguru import logger

from openutm_verification.core.clients.air_traffic.base_client import (
SENSOR_MODE_MULTIPLE,
AirTrafficSettings,
BaseAirTrafficAPIClient,
)
Expand Down Expand Up @@ -76,6 +77,7 @@ async def generate_simulated_air_traffic_data(
duration = duration or self.settings.simulation_duration
number_of_aircraft = self.settings.number_of_aircraft
sensor_ids = self.settings.sensor_ids
use_multiple_sensors = self.settings.single_or_multiple_sensors == SENSOR_MODE_MULTIPLE

try:
# create a list of UUIDs with at least one UUID if session_ids is empty
Expand All @@ -96,6 +98,7 @@ async def generate_simulated_air_traffic_data(
duration=duration,
number_of_aircraft=number_of_aircraft,
sensor_ids=sensor_ids,
use_multiple_sensors=use_multiple_sensors,
)

except Exception as exc: # noqa: BLE001
Expand Down
14 changes: 9 additions & 5 deletions src/openutm_verification/core/clients/air_traffic/base_client.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from __future__ import annotations

from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Literal

from pydantic import BaseModel

# Constants for sensor mode configuration
SENSOR_MODE_SINGLE = "single"
SENSOR_MODE_MULTIPLE = "multiple"

if TYPE_CHECKING:
from openutm_verification.core.execution.config_models import (
AirTrafficSimulatorSettings as AirTrafficSimConfig,
Expand All @@ -26,7 +30,7 @@ class AirTrafficSettings(BaseModel):
simulation_config_path: str = ""
simulation_duration: int = 30
number_of_aircraft: int = 2
single_or_multiple_sensors: str = "single"
single_or_multiple_sensors: Literal["single", "multiple"] = SENSOR_MODE_SINGLE
sensor_ids: list[str] = []
session_ids: list[str] = []

Expand All @@ -35,7 +39,7 @@ def from_config(cls, sim_config: "AirTrafficSimConfig", trajectory_path: str | N
"""Create settings from config."""
return cls(
simulation_config_path=trajectory_path or "",
simulation_duration=sim_config.simulation_duration,
simulation_duration=int(sim_config.simulation_duration),
number_of_aircraft=sim_config.number_of_aircraft,
single_or_multiple_sensors=sim_config.single_or_multiple_sensors,
sensor_ids=sim_config.sensor_ids,
Expand All @@ -49,7 +53,7 @@ class BlueSkyAirTrafficSettings(BaseModel):
simulation_config_path: str = ""
simulation_duration_seconds: int = 30
number_of_aircraft: int = 2
single_or_multiple_sensors: str = "single"
single_or_multiple_sensors: Literal["single", "multiple"] = SENSOR_MODE_SINGLE
sensor_ids: list[str] = []
session_ids: list[str] = []

Expand All @@ -73,7 +77,7 @@ class BayesianAirTrafficSettings(BaseModel):
simulation_duration_seconds: int = 30
number_of_aircraft: int = 2

single_or_multiple_sensors: str = "single"
single_or_multiple_sensors: Literal["single", "multiple"] = SENSOR_MODE_SINGLE
sensor_ids: list[str] = []
session_ids: list[str] = []

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pyproj import Transformer

from openutm_verification.core.clients.air_traffic.base_client import (
SENSOR_MODE_MULTIPLE,
BayesianAirTrafficClient,
BayesianAirTrafficSettings,
)
Expand Down Expand Up @@ -61,22 +62,22 @@ async def generate_bayesian_sim_air_traffic_data(
duration_in_seconds = int(duration or self.settings.simulation_duration_seconds or 30)
number_of_aircraft = self.settings.number_of_aircraft or 3
sensor_ids = self.settings.sensor_ids
use_multiple_sensors = self.settings.single_or_multiple_sensors == SENSOR_MODE_MULTIPLE

try:
# create a list of UUIDs with at least one UUID if session_ids is empty
sensor_ids = [UUID(x) for x in sensor_ids] if sensor_ids else [uuid.uuid4()]
except ValueError as exc:
logger.error(f"Invalid sensor ID in configuration, it should be a valid UUID: {exc}")
raise
# current_sensor_id = sensor_ids[0]

# List the models bundled with the library
available_models = get_available_model_files()
logger.info(f"Available models from cam-track-gen: {available_models}")

if not available_models:
logger.info("No models found.")
return
return []

# Use one of the models
model_filename = "Light_Aircraft_Below_10000_ft_Data.mat"
Expand All @@ -87,7 +88,7 @@ async def generate_bayesian_sim_air_traffic_data(

if session is None:
logger.info("Failed to create a session.")
return
return []

# Generate a few tracks
logger.info("Generating tracks...")
Expand All @@ -109,6 +110,8 @@ async def generate_bayesian_sim_air_traffic_data(
track=track,
icao_address=icao_address,
base_timestamp=base_timestamp,
sensor_ids=sensor_ids,
use_multiple_sensors=use_multiple_sensors,
)
all_observations.append(observations)
logger.info(f"Track {track_idx} ({icao_address}): {len(observations)} observations")
Expand All @@ -124,6 +127,8 @@ def _convert_track_to_observations(
track: TrackResultData,
icao_address: str,
base_timestamp: int,
sensor_ids: list[UUID],
use_multiple_sensors: bool,
) -> list[FlightObservationSchema]:
"""Convert a raw track dict from cam-track-gen to FlightObservationSchema list.

Expand Down Expand Up @@ -153,7 +158,11 @@ def _convert_track_to_observations(
altitude_mm = float(alt_ft_val) * FEET_TO_MM
timestamp = base_timestamp + int(round(float(t)))

# Assign sensor ID: randomly select from list if multiple sensors, otherwise use first
selected_sensor_id = random.choice(sensor_ids) if use_multiple_sensors and len(sensor_ids) > 1 else sensor_ids[0]

metadata = {
"sensor_id": str(selected_sensor_id),
"speed_feet_per_second": float(speed_ft_s_val),
"bank_angle_radians": float(bank_val),
"pitch_angle_radians": float(pitch_val),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from loguru import logger

from openutm_verification.core.clients.air_traffic.base_client import (
SENSOR_MODE_MULTIPLE,
BaseBlueSkyAirTrafficClient,
BlueSkyAirTrafficSettings,
)
Expand Down Expand Up @@ -56,14 +57,14 @@ async def generate_bluesky_sim_air_traffic_data(
duration_s = int(duration or self.settings.simulation_duration_seconds or 30)

sensor_ids = self.settings.sensor_ids
use_multiple_sensors = self.settings.single_or_multiple_sensors == SENSOR_MODE_MULTIPLE

try:
# create a list of UUIDs with at least one UUID if session_ids is empty
sensor_ids = [UUID(x) for x in sensor_ids] if sensor_ids else [uuid.uuid4()]
except ValueError as exc:
logger.error(f"Invalid sensor ID in configuration, it should be a valid UUID: {exc}")
raise
current_sensor_id = sensor_ids[0]

if not scn_path:
raise ValueError("No scenario path provided. Provide config_path or set settings.simulation_config_path.")
Expand Down Expand Up @@ -115,7 +116,10 @@ async def generate_bluesky_sim_air_traffic_data(
# We store altitude_mm as "millimeters"; keep it consistent with your schema.
# If alt is actually feet, you can convert here: alt_m = alt_ft * 0.3048
altitude_mm = alt_m_or_ft * 1000.0
metadata = {"sensor_id": current_sensor_id} if current_sensor_id else {}

# Assign sensor ID: randomly select from list if multiple sensors, otherwise use first
selected_sensor_id = random.choice(sensor_ids) if use_multiple_sensors and len(sensor_ids) > 1 else sensor_ids[0]
metadata = {"sensor_id": str(selected_sensor_id)} if selected_sensor_id else {}

obs = FlightObservationSchema(
lat_dd=lat,
Expand All @@ -142,19 +146,18 @@ async def generate_bluesky_sim_air_traffic_data_with_sensor_latency_issues(
) -> list[list[FlightObservationSchema]]:
"""This method generates"""


scn_path = config_path or self.settings.simulation_config_path
duration_s = int(duration or self.settings.simulation_duration_seconds or 30)

sensor_ids = self.settings.sensor_ids
use_multiple_sensors = self.settings.single_or_multiple_sensors == SENSOR_MODE_MULTIPLE

try:
# create a list of UUIDs with at least one UUID if session_ids is empty
sensor_ids = [UUID(x) for x in sensor_ids] if sensor_ids else [uuid.uuid4()]
except ValueError as exc:
logger.error(f"Invalid sensor ID in configuration, it should be a valid UUID: {exc}")
raise
current_sensor_id = sensor_ids[0]

if not scn_path:
raise ValueError("No scenario path provided. Provide config_path or set settings.simulation_config_path.")
Expand Down Expand Up @@ -207,7 +210,10 @@ async def generate_bluesky_sim_air_traffic_data_with_sensor_latency_issues(
# We store altitude_mm as "millimeters"; keep it consistent with your schema.
# If alt is actually feet, you can convert here: alt_m = alt_ft * 0.3048
altitude_mm = alt_m_or_ft * 1000.0
metadata = {"sensor_id": current_sensor_id} if current_sensor_id else {}

# Assign sensor ID: randomly select from list if multiple sensors, otherwise use first
selected_sensor_id = random.choice(sensor_ids) if use_multiple_sensors and len(sensor_ids) > 1 else sensor_ids[0]
metadata = {"sensor_id": str(selected_sensor_id)} if selected_sensor_id else {}

obs = FlightObservationSchema(
lat_dd=lat,
Expand All @@ -226,7 +232,6 @@ async def generate_bluesky_sim_air_traffic_data_with_sensor_latency_issues(
# Convert dict -> list[list[FlightObservationSchema]] with stable ordering
flight_observations = [results_by_acid[acid] for acid in sorted(results_by_acid.keys())]


# This method modifies the retrieved simulation data by changing the timestamp and adding latency to the observed dataset
LATENCY_PROBABILITY = 0.1 # 10% chance to have latency issues
TIMESTAMP_SHIFT_RANGE_SECONDS = (-1, 2.5) # Shift timestamps by -5 to +5 seconds
Expand Down
49 changes: 28 additions & 21 deletions src/openutm_verification/simulator/geo_json_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def generate_air_traffic_data(
duration: int,
sensor_ids: list[UUID],
number_of_aircraft: int = 1,
use_multiple_sensors: bool = False,
) -> list[list[FlightObservationSchema]]:
"""Generate simulated air traffic observations for the specified duration.

Expand All @@ -88,14 +89,16 @@ def generate_air_traffic_data(

Args:
duration: Number of seconds to generate data for.
session_id: Unique identifier for the simulation session.
sensor_ids: List of sensor IDs to use for observations.
number_of_aircraft: Number of aircraft to simulate.
use_multiple_sensors: If True, randomly assign sensor IDs from the list.

Returns:
List of flight observation dictionaries containing position, altitude,
and metadata for each generated data point.
A list of flights, where each flight is a list of ``FlightObservationSchema``
instances containing position, altitude, and metadata for each generated
data point.
"""
sensor_id = str(sensor_ids[0])
logger.info(f"Generating air traffic data for {duration} seconds with sensor ID {sensor_id}")
logger.info(f"Generating air traffic data for {duration} seconds with {'multiple' if use_multiple_sensors else 'single'} sensor(s)")
all_trajectories = []
# Generate a random trajectory
# Improve to generate a trajectory where the speed of the aircraft can be controlled
Expand All @@ -109,24 +112,28 @@ def generate_air_traffic_data(
coordinates = trajectory_geojson["coordinates"]
airtraffic: list[FlightObservationSchema] = []
icao_address = "".join(random.choices("0123456789ABCDEF", k=6))
for i in range(duration):
# Each coordinate corresponds to one second of flight time
for i, point in enumerate(coordinates):
if i >= duration:
break
timestamp = self.reference_time.shift(seconds=i)
for point in coordinates:
metadata = {"sensor_id": sensor_id} if sensor_id else {}
# Convert altitude from meters to millimeters for altitude_mm field
altitude_m = self.config.altitude_of_ground_level_wgs_84
airtraffic.append(
FlightObservationSchema(
lat_dd=point[1],
lon_dd=point[0],
altitude_mm=altitude_m * 1000, # Convert m -> mm
traffic_source=1,
source_type=2,
icao_address=icao_address,
timestamp=timestamp.int_timestamp,
metadata=metadata,
)
# Assign sensor ID: randomly select from list if multiple sensors, otherwise use first
selected_sensor_id = random.choice(sensor_ids) if use_multiple_sensors and len(sensor_ids) > 1 else sensor_ids[0]
metadata = {"sensor_id": str(selected_sensor_id)} if selected_sensor_id else {}
# Convert altitude from meters to millimeters for altitude_mm field
altitude_m = self.config.altitude_of_ground_level_wgs_84
airtraffic.append(
FlightObservationSchema(
lat_dd=point[1],
lon_dd=point[0],
altitude_mm=altitude_m * 1000, # Convert m -> mm
traffic_source=1,
source_type=2,
icao_address=icao_address,
timestamp=timestamp.int_timestamp,
metadata=metadata,
)
)
all_air_traffic.append(airtraffic)
logger.info(f"Generated observations for {len(all_air_traffic)} aircraft")
return all_air_traffic
Expand Down
64 changes: 64 additions & 0 deletions tests/test_altitude_units.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,70 @@ def test_air_traffic_altitude_is_in_millimeters(self, sample_geojson):
expected_mm = altitude_meters * 1000
assert first_obs.altitude_mm == expected_mm, f"Expected altitude_mm={expected_mm} (from {altitude_meters}m), got {first_obs.altitude_mm}"

def test_single_sensor_mode_uses_one_sensor_id(self, sample_geojson):
"""Verify all observations use the same sensor_id in single-sensor mode."""
from uuid import uuid4

from openutm_verification.simulator.geo_json_telemetry import (
GeoJSONAirtrafficSimulator,
)

config = AirTrafficGeneratorConfiguration(
geojson=sample_geojson,
reference_time=arrow.utcnow(),
)
sensor_ids = [uuid4(), uuid4(), uuid4()]
simulator = GeoJSONAirtrafficSimulator(config)
result = simulator.generate_air_traffic_data(
duration=2,
sensor_ids=sensor_ids,
number_of_aircraft=1,
use_multiple_sensors=False,
)

unique_sensor_ids = {obs.metadata["sensor_id"] for flight in result for obs in flight}
assert len(unique_sensor_ids) == 1, f"Expected 1 sensor ID in single mode, got {len(unique_sensor_ids)}: {unique_sensor_ids}"
assert unique_sensor_ids == {str(sensor_ids[0])}

def test_multiple_sensor_mode_assigns_varied_sensor_ids(self, sample_geojson):
"""Verify observations contain more than one distinct sensor_id in multiple-sensor mode."""
from uuid import uuid4

from openutm_verification.simulator.geo_json_telemetry import (
GeoJSONAirtrafficSimulator,
)

# Use a longer GeoJSON path so we get enough observations for randomness
coords = [[7.47 + i * 0.001, 46.97 + i * 0.001] for i in range(50)]
geojson = {
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"geometry": {"type": "LineString", "coordinates": coords},
"properties": {},
}
],
}

config = AirTrafficGeneratorConfiguration(
geojson=geojson,
reference_time=arrow.utcnow(),
)
sensor_ids = [uuid4(), uuid4(), uuid4()]
simulator = GeoJSONAirtrafficSimulator(config)
result = simulator.generate_air_traffic_data(
duration=50,
sensor_ids=sensor_ids,
number_of_aircraft=1,
use_multiple_sensors=True,
)

unique_sensor_ids = {obs.metadata["sensor_id"] for flight in result for obs in flight}
assert len(unique_sensor_ids) > 1, (
f"Expected multiple distinct sensor IDs in multiple mode, got {len(unique_sensor_ids)}: {unique_sensor_ids}"
)


class TestOpenSkyClientAltitudeConversion:
"""Tests for OpenSky client altitude conversion."""
Expand Down
Loading