From 102d886157c0c6f1ef7adfd1d553dc07c6192803 Mon Sep 17 00:00:00 2001 From: Christopher Vestman Date: Thu, 29 May 2025 20:48:42 +0200 Subject: [PATCH 1/4] fix: major refactor. removed local files, fetch and db. --- src/coord_buffer/cli.py | 94 ++++++------------ src/coord_buffer/config.py | 2 - src/coord_buffer/coords.py | 30 ------ src/coord_buffer/db.py | 44 --------- src/coord_buffer/fetcher.py | 25 ----- src/coord_buffer/processor.py | 181 ---------------------------------- 6 files changed, 31 insertions(+), 345 deletions(-) delete mode 100644 src/coord_buffer/coords.py delete mode 100644 src/coord_buffer/db.py delete mode 100644 src/coord_buffer/fetcher.py delete mode 100644 src/coord_buffer/processor.py diff --git a/src/coord_buffer/cli.py b/src/coord_buffer/cli.py index 78d1dee..1d1b03d 100755 --- a/src/coord_buffer/cli.py +++ b/src/coord_buffer/cli.py @@ -1,38 +1,38 @@ import argparse -import os -import sys -from io import BytesIO -import geopandas as gpd - -from coord_buffer.config import DB_PARAMS, OUTPUT_FOLDER -from coord_buffer.coords import to_dms_coords, to_wgs84 -from coord_buffer.fetcher import fetch_tmas -from coord_buffer.processor import ( +from coord_buffer.utils import ( buffer_polygon, - create_geojson_files, - insert_tmas_to_db, - is_airac_current, + list_coords_from_db, + logger, read_coords, + read_coords_from_db, + to_dms_coords, ) -from coord_buffer.utils import logger def parse_args(): - parser = argparse.ArgumentParser(description="Fetch and process TMA data") + parser = argparse.ArgumentParser( + description="Creates a specified buffer around user specified area." + ) + parser.add_argument( + "-l", + "--list", + action="store_true", + help="Prints list of available geometries and their id.", + ) parser.add_argument( - "input_file", - nargs="?", + "--msid", default=None, - help="Path to a GeoJSON file with coordinates", + help="Get coords for the selected geometries.", ) parser.add_argument( - "--buffer", type=float, default=0, help="Buffer size in NM (default: 0)" + "-f", + "--input_file", + default=None, + help="Path to a GeoJSON file with coordinates", ) parser.add_argument( - "--check-airac", - type=str, - help="Check if the provided AIRAC date (YYYY-MM-DD) is the latest in the database", + "-b", "--buffer", type=float, default=0, help="Buffer size in NM (default: 0)" ) return parser.parse_args() @@ -40,55 +40,23 @@ def parse_args(): def main(): args = parse_args() - if args.check_airac: - try: - is_current = is_airac_current(DB_PARAMS, args.check_airac) - if is_current: - logger.info( - f"AIRAC date {args.check_airac} is current or newer than the latest in the database" - ) - else: - logger.warning( - f"AIRAC date {args.check_airac} is older than the latest in the database" - ) - sys.exit(0 if is_current else 1) - except Exception as e: - logger.error(f"Error checking AIRAC date: {e}") - sys.exit(1) - - if not os.path.exists(OUTPUT_FOLDER): - logger.info(f"Creating {OUTPUT_FOLDER} folder") - os.makedirs(OUTPUT_FOLDER) - - # If no input is provided, fetch TMAs & save to DB - if args.input_file is None: - logger.info("No input file provided, fetching TMAs") - try: - tmas = fetch_tmas() - gdf = gpd.read_file(BytesIO(tmas)) - gdf = to_wgs84(gdf) - create_geojson_files(gdf, OUTPUT_FOLDER) - insert_tmas_to_db(gdf, DB_PARAMS) - logger.info(f"TMAs saved to {OUTPUT_FOLDER} & inserted into database") - except Exception as e: - logger.info(f"Error fetching or processing TMA data: {e}") - sys.exit(1) - return - - # Check if coord file exists - if not os.path.isfile(args.input_file): - logger.info(f"Input file does not exist: {args.input_file}") - sys.exit(1) - logger.info(f"Processing input file: {args.input_file}") try: - coords = read_coords(args.input_file) + if args.list: + list_coords_from_db() + return + elif args.msid: + logger.info(f"Processing msid: {args.msid}") + coords = read_coords_from_db(args.msid) + elif args.input_file: + logger.info(f"Processing input file: {args.input_file}") + coords = read_coords(args.input_file) buffered_gdf = buffer_polygon(coords, args.buffer) coords_df = buffered_gdf.get_coordinates() for _, row in coords_df.iterrows(): print(to_dms_coords([row["y"], row["x"]])) except Exception as e: logger.error(f"Error processing file: {e}") - sys.exit(1) + return if __name__ == "__main__": diff --git a/src/coord_buffer/config.py b/src/coord_buffer/config.py index 274e67c..2501c60 100644 --- a/src/coord_buffer/config.py +++ b/src/coord_buffer/config.py @@ -8,8 +8,6 @@ BUFFER_MULTIPLIER = 1852 DEFAULT_EPSG = 4326 METRIC_EPSG = 3006 -TMA_URL = os.getenv("TMA_URL", "https://daim.lfv.se/geoserver/wfs") -OUTPUT_FOLDER = os.getenv("OUTPUT_FOLDER", "POLYGONES") DB_PARAMS = { "dbname": os.getenv("POSTGRES_DB"), "user": os.getenv("POSTGRES_USER"), diff --git a/src/coord_buffer/coords.py b/src/coord_buffer/coords.py deleted file mode 100644 index 4d0a9b2..0000000 --- a/src/coord_buffer/coords.py +++ /dev/null @@ -1,30 +0,0 @@ -import re - -from coord_buffer.config import DEFAULT_EPSG - - -def dms_to_dd_coords(coord): - if not re.match(r"^\d{6}[NSEW]$", coord): - raise ValueError(f"Invalid DMS format: {coord}") - degrees, minutes, seconds = int(coord[:2]), int(coord[2:4]), int(coord[4:6]) - direction = coord[6] - dd = degrees + minutes / 60 + seconds / 3600 - return dd if direction in ["N", "E"] else -dd - - -def dd_to_dms(coord): - degrees = int(abs(coord)) - minutes = (abs(coord) - degrees) * 60 - seconds = (minutes - int(minutes)) * 60 - return f"{degrees:02d}{int(minutes):02d}{int(seconds):02d}" - - -def to_dms_coords(coord): - lat, lon = coord - lat_dir = "N" if lat >= 0 else "S" - lon_dir = "E" if lon >= 0 else "W" - return f"{dd_to_dms(lat)}{lat_dir} 0{dd_to_dms(lon)}{lon_dir}" - - -def to_wgs84(geo_df): - return geo_df.to_crs(epsg=DEFAULT_EPSG) diff --git a/src/coord_buffer/db.py b/src/coord_buffer/db.py deleted file mode 100644 index 454eb24..0000000 --- a/src/coord_buffer/db.py +++ /dev/null @@ -1,44 +0,0 @@ -import psycopg - -from coord_buffer.config import DB_PARAMS - - -def create_tmas_table(): - with psycopg.connect(**DB_PARAMS) as conn: - cursor = conn.cursor() - cursor.execute("CREATE EXTENSION IF NOT EXISTS postgis;") - cursor.execute(""" - CREATE TABLE tmas ( - id SERIAL PRIMARY KEY, - name_of_area VARCHAR(255) NOT NULL, - geometry GEOMETRY(POLYGON, 4326) NOT NULL, - wef DATE NOT NULL, - type_of_area VARCHAR(50), - position_indicator VARCHAR(50), - date_time_of_chg TIMESTAMP, - name_of_operator VARCHAR(255), - origin VARCHAR(50), - location VARCHAR(255), - upper_limit VARCHAR(50), - lower_limit VARCHAR(50), - comment_1 TEXT, - comment_2 TEXT, - quality VARCHAR(50), - crc_id VARCHAR(50), - crc_pos VARCHAR(50), - crc_tot VARCHAR(50), - msid INTEGER, - idnr INTEGER, - mi_style TEXT, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - CONSTRAINT unique_name_wef UNIQUE (name_of_area, wef) - ); - CREATE INDEX tmas_geometry_idx ON tmas USING GIST (geometry); - CREATE INDEX tmas_wef_idx ON tmas (wef); - """) - conn.commit() - - -if __name__ == "__main__": - create_tmas_table() diff --git a/src/coord_buffer/fetcher.py b/src/coord_buffer/fetcher.py deleted file mode 100644 index 7e6245a..0000000 --- a/src/coord_buffer/fetcher.py +++ /dev/null @@ -1,25 +0,0 @@ -import requests -from tenacity import retry, stop_after_attempt, wait_fixed - -from coord_buffer.config import DEFAULT_EPSG, TMA_URL - - -def fetch_tmas(epsg=DEFAULT_EPSG, url=None): - """Fetch TMAs from WFS service.""" - url = url or TMA_URL - params = { - "service": "WFS", - "version": "1.1.0", - "request": "GetFeature", - "typename": "mais:TMAS,mais:TMAW", - "outputFormat": "application/json", - "srsName": f"EPSG:{epsg}", - } - - @retry(stop=stop_after_attempt(3), wait=wait_fixed(2)) - def _fetch(): - resp = requests.get(url, params=params) - resp.raise_for_status() - return resp.content - - return _fetch() diff --git a/src/coord_buffer/processor.py b/src/coord_buffer/processor.py deleted file mode 100644 index d26cc82..0000000 --- a/src/coord_buffer/processor.py +++ /dev/null @@ -1,181 +0,0 @@ -import json -import os -from datetime import datetime - -import geopandas as gpd -import psycopg -from shapely.geometry import Polygon - -from coord_buffer.config import BUFFER_MULTIPLIER, DB_PARAMS, DEFAULT_EPSG, METRIC_EPSG -from coord_buffer.utils import clean_file_name - - -def create_geojson_files(geo_df, folder_name): - """Create individual GeoJSON files for each TMA.""" - if not os.path.exists(folder_name): - os.makedirs(folder_name) - for _, row in geo_df.iterrows(): - name = clean_file_name(row["NAMEOFAREA"]) - if "TMA_" in name: - continue - single_gdf = gpd.GeoDataFrame( - {"NAMEOFAREA": [name], "geometry": [row["geometry"]]}, - crs=f"EPSG:{DEFAULT_EPSG}", - ) - filename = f"{folder_name}/{name}.geojson" - single_gdf.to_file(filename, driver="GeoJSON") - - -def buffer_polygon(coords, buffer_size_nm): - """Buffer a polygon by a distance in nautical miles.""" - gdf = gpd.GeoDataFrame(geometry=[Polygon(coords)], crs=f"EPSG:{DEFAULT_EPSG}") - gdf = gdf.to_crs(epsg=METRIC_EPSG) - buffered = gdf.buffer( - distance=buffer_size_nm * BUFFER_MULTIPLIER, single_sided=True, join_style=2 - ) - return buffered.to_crs(epsg=DEFAULT_EPSG) - - -def read_coords(filename): - """Read coordinates from a GeoJSON file.""" - with open(filename, "r") as file: - geojson_data = json.load(file) - if not geojson_data.get("features"): - raise ValueError("GeoJSON file has no features") - coords = [] - for feature in geojson_data["features"]: - if feature["geometry"]["type"] != "Polygon": - raise ValueError( - f"Unsupported geometry type: {feature['geometry']['type']}" - ) - for polygon in feature["geometry"]["coordinates"]: - for coord in polygon: - if not isinstance(coord, list) or len(coord) != 2: - raise ValueError(f"Invalid coordinate format: {coord}") - coords.append(coord) - return coords - - -def insert_tmas_to_db(geo_df, conn_params=DB_PARAMS): - """Insert TMA GeoJSON features into PostgreSQL database.""" - conn = None - try: - conn = psycopg.connect(**conn_params) - cursor = conn.cursor() - - insert_query = """ - INSERT INTO tmas ( - name_of_area, geometry, wef, type_of_area, position_indicator, - date_time_of_chg, name_of_operator, origin, location, - upper_limit, lower_limit, comment_1, comment_2, quality, - crc_id, crc_pos, crc_tot, msid, idnr, mi_style, updated_at - ) VALUES ( - %s, ST_SetSRID(ST_GeomFromGeoJSON(%s), 4326), %s, %s, %s, %s, %s, %s, %s, - %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP - ) - ON CONFLICT (name_of_area, wef) - DO UPDATE SET - geometry = EXCLUDED.geometry, - type_of_area = EXCLUDED.type_of_area, - position_indicator = EXCLUDED.position_indicator, - date_time_of_chg = EXCLUDED.date_time_of_chg, - name_of_operator = EXCLUDED.name_of_operator, - origin = EXCLUDED.origin, - location = EXCLUDED.location, - upper_limit = EXCLUDED.upper_limit, - lower_limit = EXCLUDED.lower_limit, - comment_1 = EXCLUDED.comment_1, - comment_2 = EXCLUDED.comment_2, - quality = EXCLUDED.quality, - crc_id = EXCLUDED.crc_id, - crc_pos = EXCLUDED.crc_pos, - crc_tot = EXCLUDED.crc_tot, - msid = EXCLUDED.msid, - idnr = EXCLUDED.idnr, - mi_style = EXCLUDED.mi_style, - updated_at = CURRENT_TIMESTAMP - """ - - for _, row in geo_df.iterrows(): - name = clean_file_name(row["NAMEOFAREA"]) - if "TMA_" in name: - continue - - geometry_series = gpd.GeoSeries( - [row["geometry"]], crs=f"EPSG:{DEFAULT_EPSG}" - ) - geometry_json = geometry_series.to_json() - feature_geometry = json.loads(geometry_json)["features"][0]["geometry"] - - properties = row.to_dict() - wef = properties.get("WEF") - if wef: - try: - wef = datetime.strptime(wef, "%Y-%m-%d").date() - except ValueError: - wef = None - - cursor.execute( - insert_query, - ( - name, - json.dumps(feature_geometry), - wef, - properties.get("TYPEOFAREA"), - properties.get("POSITIONINDICATOR"), - properties.get("DATETIMEOFCHG"), - properties.get("NAMEOFOPERATOR"), - properties.get("ORIGIN"), - properties.get("LOCATION"), - properties.get("UPPER"), - properties.get("LOWER"), - properties.get("COMMENT_1"), - properties.get("COMMENT_2"), - properties.get("QUALITY"), - properties.get("CRC_ID"), - properties.get("CRC_POS"), - properties.get("CRC_TOT"), - properties.get("MSID"), - properties.get("IDNR"), - properties.get("MI_STYLE"), - ), - ) - - conn.commit() - cursor.close() - except Exception as e: - if conn: - conn.rollback() - raise RuntimeError(f"Failed to insert TMAs into database: {e}") - finally: - if conn: - conn.close() - - -def get_latest_airac_date(conn_params): - """Get the latest AIRAC effective date from the database.""" - conn = None - try: - conn = psycopg.connect(**conn_params) - cursor = conn.cursor() - cursor.execute("SELECT MAX(wef) FROM tmas") - latest_wef = cursor.fetchone()[0] - cursor.close() - return latest_wef - except Exception as e: - raise RuntimeError(f"Failed to fetch latest AIRAC date: {e}") - finally: - if conn: - conn.close() - - -def is_airac_current(conn_params, airac_date): - """Check if the provided AIRAC date is the latest in the database.""" - latest_wef = get_latest_airac_date(conn_params) - if not latest_wef: - return False - try: - airac_date = datetime.strptime(airac_date, "%Y-%m-%d").date() - return airac_date >= latest_wef - except ValueError: - raise ValueError("Invalid AIRAC date format, expected YYYY-MM-DD") From 0247296251dfcb1e8cb22e9e98e02253cafdc86c Mon Sep 17 00:00:00 2001 From: Christopher Vestman Date: Thu, 29 May 2025 20:49:27 +0200 Subject: [PATCH 2/4] fix: allow user coords and db coords, closes #7 --- pyproject.toml | 3 +- src/coord_buffer/utils.py | 106 ++++++++++++++++++++++++++++++++++++++ uv.lock | 11 ++++ 3 files changed, 119 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 9934844..da52ee1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,7 @@ dependencies = [ "ruff>=0.11.7", "shapely==2.0.3", "six==1.16.0", + "tabulate>=0.9.0", "tenacity>=9.1.2", "tzdata==2024.1", "urllib3==2.2.3", @@ -39,4 +40,4 @@ dev = [ [project.scripts] coord-buffer = "coord_buffer.cli:main" [tool.uv] -package = true \ No newline at end of file +package = true diff --git a/src/coord_buffer/utils.py b/src/coord_buffer/utils.py index 59aaf8e..b4b9fc5 100644 --- a/src/coord_buffer/utils.py +++ b/src/coord_buffer/utils.py @@ -1,7 +1,15 @@ +import json import logging import re import unicodedata +import geopandas as gpd +import psycopg +from shapely.geometry import Polygon +from tabulate import tabulate + +from coord_buffer.config import BUFFER_MULTIPLIER, DB_PARAMS, DEFAULT_EPSG, METRIC_EPSG + logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -12,3 +20,101 @@ def clean_file_name(name): name = "".join(c for c in name if not unicodedata.combining(c)) name = re.sub(r"\s+", "_", name) return name.upper() + + +def dms_to_dd_coords(coord): + if not re.match(r"^\d{6}[NSEW]$", coord): + raise ValueError(f"Invalid DMS format: {coord}") + degrees, minutes, seconds = int(coord[:2]), int(coord[2:4]), int(coord[4:6]) + direction = coord[6] + dd = degrees + minutes / 60 + seconds / 3600 + return dd if direction in ["N", "E"] else -dd + + +def dd_to_dms(coord): + degrees = int(abs(coord)) + minutes = (abs(coord) - degrees) * 60 + seconds = (minutes - int(minutes)) * 60 + return f"{degrees:02d}{int(minutes):02d}{int(seconds):02d}" + + +def to_dms_coords(coord): + lat, lon = coord + lat_dir = "N" if lat >= 0 else "S" + lon_dir = "E" if lon >= 0 else "W" + return f"{dd_to_dms(lat)}{lat_dir} 0{dd_to_dms(lon)}{lon_dir}" + + +def to_wgs84(geo_df): + return geo_df.to_crs(epsg=DEFAULT_EPSG) + + +def buffer_polygon(coords, buffer_size_nm): + """Buffer a polygon by a distance in nautical miles.""" + gdf = gpd.GeoDataFrame(geometry=[Polygon(coords)], crs=f"EPSG:{DEFAULT_EPSG}") + gdf = gdf.to_crs(epsg=METRIC_EPSG) + buffered = gdf.buffer( + distance=buffer_size_nm * BUFFER_MULTIPLIER, single_sided=True, join_style=2 + ) + return buffered.to_crs(epsg=DEFAULT_EPSG) + + +def read_coords(filename): + """Read coordinates from a GeoJSON file.""" + with open(filename, "r") as file: + geojson_data = json.load(file) + if not geojson_data.get("features"): + raise ValueError("GeoJSON file has no features") + coords = [] + for feature in geojson_data["features"]: + if feature["geometry"]["type"] != "Polygon": + raise ValueError( + f"Unsupported geometry type: {feature['geometry']['type']}" + ) + for polygon in feature["geometry"]["coordinates"]: + for coord in polygon: + if not isinstance(coord, list) or len(coord) != 2: + raise ValueError(f"Invalid coordinate format: {coord}") + coords.append(coord) + return coords + + +def list_coords_from_db(): + query = """ + SELECT typeofarea, msid, nameofarea, positionindicator + FROM aip_data + ORDER BY typeofarea, nameofarea, positionindicator; + """ + with psycopg.connect(**DB_PARAMS) as conn: + with conn.cursor() as cur: + cur.execute(query) + rows = cur.fetchall() + if not rows: + raise ValueError("No geometries found") + + print( + tabulate( + rows, + headers=["Type of Area", "MSID", "Name of Area", "ICAO"], + tablefmt="pretty", + ) + ) + return rows + + +def read_coords_from_db(msid): + query = """ + SELECT ST_AsGeoJSON(geom) as geojson + FROM aip_data + where msid = %s; + """ + with psycopg.connect(**DB_PARAMS) as conn: + with conn.cursor() as cur: + cur.execute(query, (msid,)) + rows = cur.fetchall() + if not rows: + raise ValueError("Error: No geometry found for the given MSID") + + geojson_str = rows[0][0] + geojson = json.loads(geojson_str) + return geojson["coordinates"][0] diff --git a/uv.lock b/uv.lock index 7ce31cb..4a77602 100644 --- a/uv.lock +++ b/uv.lock @@ -115,6 +115,7 @@ dependencies = [ { name = "ruff" }, { name = "shapely" }, { name = "six" }, + { name = "tabulate" }, { name = "tenacity" }, { name = "tzdata" }, { name = "urllib3" }, @@ -149,6 +150,7 @@ requires-dist = [ { name = "ruff", specifier = ">=0.11.7" }, { name = "shapely", specifier = "==2.0.3" }, { name = "six", specifier = "==1.16.0" }, + { name = "tabulate", specifier = ">=0.9.0" }, { name = "tenacity", specifier = ">=9.1.2" }, { name = "tzdata", specifier = "==2024.1" }, { name = "urllib3", specifier = "==2.2.3" }, @@ -497,6 +499,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d9/5a/e7c31adbe875f2abbb91bd84cf2dc52d792b5a01506781dbcf25c91daf11/six-1.16.0-py2.py3-none-any.whl", hash = "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254", size = 11053 }, ] +[[package]] +name = "tabulate" +version = "0.9.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/ec/fe/802052aecb21e3797b8f7902564ab6ea0d60ff8ca23952079064155d1ae1/tabulate-0.9.0.tar.gz", hash = "sha256:0095b12bf5966de529c0feb1fa08671671b3368eec77d7ef7ab114be2c068b3c", size = 81090 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/40/44/4a5f08c96eb108af5cb50b41f76142f0afa346dfa99d5296fe7202a11854/tabulate-0.9.0-py3-none-any.whl", hash = "sha256:024ca478df22e9340661486f85298cff5f6dcdba14f3813e8830015b9ed1948f", size = 35252 }, +] + [[package]] name = "tenacity" version = "9.1.2" From c08d41086d9ef9a3e6fefa28eb5ea68e79c96b83 Mon Sep 17 00:00:00 2001 From: Christopher Vestman Date: Thu, 29 May 2025 20:51:13 +0200 Subject: [PATCH 3/4] docs: Update README --- README.md | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index fe2966a..8b26679 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,22 @@ # Coords Buffer -Fetches TMA geojson data from lfv echarts, creates files and add a option \ -to create a buffer around a TMA. +Fetches geojson data based on LFV echarts and returns new buffer coordiantes based on user input distance (nautical miles). -Add a buffer (in nautical miles) around a TMA. +## Usage -## Dependencies +```sh +uv run coord-buffer -h +usage: coord-buffer [-h] [-l] [--msid MSID] [-f INPUT_FILE] [-b BUFFER] -Geopandas +Creates a specified buffer around user specified area. -Shapely +options: + -h, --help show this help message and exit + -l, --list Prints list of available geometries and their + id. + --msid MSID Get coords for the selected geometries. + -f INPUT_FILE, --input_file INPUT_FILE + Path to a GeoJSON file with coordinates + -b BUFFER, --buffer BUFFER + Buffer size in NM (default: 0) +``` \ No newline at end of file From 921c22694b79365c526f288f1618ded9f0ea2ceb Mon Sep 17 00:00:00 2001 From: Christopher Vestman Date: Thu, 29 May 2025 21:21:15 +0200 Subject: [PATCH 4/4] fix(tests): update tests --- tests/test_cli.py | 107 +++++----------- tests/test_coords.py | 17 --- tests/test_processor.py | 272 ---------------------------------------- 3 files changed, 30 insertions(+), 366 deletions(-) delete mode 100644 tests/test_coords.py delete mode 100644 tests/test_processor.py diff --git a/tests/test_cli.py b/tests/test_cli.py index 51905cb..c220baf 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -13,7 +13,8 @@ def mock_args(): args = MagicMock() args.input_file = None args.buffer = 0 - args.check_airac = None + args.msid = None + args.list = False return args @@ -27,109 +28,61 @@ def mock_geodataframe(): return gdf -def test_no_arguments_fetches_tmas(mock_args, mock_geodataframe): - """Test that no arguments triggers TMA fetching.""" +def test_list_argument_triggers_db_listing(mock_args): + mock_args.list = True with ( patch("coord_buffer.cli.parse_args", return_value=mock_args), - patch("coord_buffer.cli.fetch_tmas", return_value=b"{}") as mock_fetch, - patch( - "coord_buffer.cli.gpd.read_file", return_value=mock_geodataframe - ) as mock_read, - patch( - "coord_buffer.cli.to_wgs84", return_value=mock_geodataframe - ) as mock_to_wgs84, - patch("coord_buffer.cli.create_geojson_files") as mock_create, - patch("coord_buffer.cli.insert_tmas_to_db") as mock_insert, - patch("coord_buffer.cli.os.makedirs") as mock_makedirs, - patch( - "coord_buffer.cli.os.path.exists", return_value=False - ), # Simulate folder not existing + patch("coord_buffer.cli.list_coords_from_db") as mock_list, ): main() - mock_fetch.assert_called_once() - mock_read.assert_called_once() - mock_to_wgs84.assert_called_once() - mock_create.assert_called_once() - mock_insert.assert_called_once() - mock_makedirs.assert_called_once_with("POLYGONES") + mock_list.assert_called_once() -def test_file_not_exists(mock_args): - """Test that a non-existent input file raises SystemExit.""" - mock_args.input_file = "nonexistent.geojson" +def test_msid_argument_triggers_db_read(mock_args, mock_geodataframe): + mock_args.msid = "123" + mock_args.buffer = 10 with ( patch("coord_buffer.cli.parse_args", return_value=mock_args), - patch("coord_buffer.cli.os.path.isfile", return_value=False), + patch( + "coord_buffer.cli.read_coords_from_db", return_value=[[0, 0], [1, 1]] + ) as mock_read_db, + patch( + "coord_buffer.cli.buffer_polygon", return_value=mock_geodataframe + ) as mock_buffer, + patch("coord_buffer.cli.to_dms_coords") as mock_dms, ): - with pytest.raises(SystemExit): - main() + main() + mock_read_db.assert_called_once_with("123") + mock_buffer.assert_called_once() + mock_dms.assert_called() -def test_valid_file_processing(mock_args, mock_geodataframe): - """Test processing a valid input file.""" - mock_args.input_file = "valid.geojson" +def test_input_file_argument_triggers_file_read(mock_args, mock_geodataframe): + mock_args.input_file = "test.geojson" mock_args.buffer = 5 with ( patch("coord_buffer.cli.parse_args", return_value=mock_args), - patch("coord_buffer.cli.os.path.isfile", return_value=True), patch( "coord_buffer.cli.read_coords", return_value=[[0, 0], [1, 1]] - ) as mock_read, + ) as mock_read_file, patch( "coord_buffer.cli.buffer_polygon", return_value=mock_geodataframe ) as mock_buffer, patch("coord_buffer.cli.to_dms_coords") as mock_dms, ): main() - mock_read.assert_called_once_with("valid.geojson") + mock_read_file.assert_called_once_with("test.geojson") mock_buffer.assert_called_once_with([[0, 0], [1, 1]], 5) mock_dms.assert_called() - assert mock_dms.call_count == 2 -def test_dms_output(mock_args): - """Test DMS output for known coordinates.""" - mock_args.input_file = "valid.geojson" - mock_args.buffer = 0 - coords_df = pd.DataFrame({"x": [12.582222222222223], "y": [1.0]}) - gdf = MagicMock(spec=gpd.GeoDataFrame) - gdf.get_coordinates.return_value = coords_df +def test_exception_handling_logs_error(mock_args): + mock_args.input_file = "test.geojson" with ( patch("coord_buffer.cli.parse_args", return_value=mock_args), - patch("coord_buffer.cli.os.path.isfile", return_value=True), - patch("coord_buffer.cli.read_coords", return_value=[[0, 0]]) as mock_read, - patch("coord_buffer.cli.buffer_polygon", return_value=gdf) as mock_buffer, - patch("coord_buffer.cli.to_dms_coords") as mock_dms, + patch("coord_buffer.cli.read_coords", side_effect=Exception("Boom!")), + patch("coord_buffer.cli.logger.error") as mock_log, ): main() - mock_read.assert_called_once_with("valid.geojson") - mock_buffer.assert_called_once_with([[0, 0]], 0) - mock_dms.assert_called_with([1.0, 12.582222222222223]) - - -def test_check_airac_current(mock_args): - """Test checking AIRAC date is current.""" - mock_args.check_airac = "2025-05-15" - with ( - patch("coord_buffer.cli.parse_args", return_value=mock_args), - patch("coord_buffer.cli.is_airac_current", return_value=True), - patch("coord_buffer.cli.logger.info") as mock_logger, - ): - with pytest.raises(SystemExit) as exc: - main() - assert exc.value.code == 0 - mock_logger.assert_called_once_with( - "AIRAC date 2025-05-15 is current or newer than the latest in the database" - ) - - -def test_check_airac_outdated(mock_args): - """Test checking outdated AIRAC date.""" - mock_args.check_airac = "2025-04-17" - with ( - patch("coord_buffer.cli.parse_args", return_value=mock_args), - patch("coord_buffer.cli.is_airac_current", return_value=False), - ): - with pytest.raises(SystemExit) as exc: - main() - assert exc.value.code == 1 + mock_log.assert_called_once() + assert "Boom!" in str(mock_log.call_args[0][0]) diff --git a/tests/test_coords.py b/tests/test_coords.py deleted file mode 100644 index f04d522..0000000 --- a/tests/test_coords.py +++ /dev/null @@ -1,17 +0,0 @@ -import pytest - -from coord_buffer.coords import dms_to_dd_coords, to_dms_coords - - -def test_dms_to_dd_coords(): - """Test DMS to decimal degrees conversion.""" - assert abs(dms_to_dd_coords("123456N") - 12.582222222222223) < 1e-10 - assert abs(dms_to_dd_coords("123456S") - (-12.582222222222223)) < 1e-10 - with pytest.raises(ValueError): - dms_to_dd_coords("invalid") - - -def test_to_dms_coords(): - """Test decimal degrees to DMS string conversion.""" - assert to_dms_coords([12.582222222222223, 1.0]) == "123456N 0010000E" - assert to_dms_coords([-12.582222222222223, -1.0]) == "123456S 0010000W" diff --git a/tests/test_processor.py b/tests/test_processor.py deleted file mode 100644 index 9e9dc14..0000000 --- a/tests/test_processor.py +++ /dev/null @@ -1,272 +0,0 @@ -import json -from datetime import datetime -from unittest.mock import MagicMock, patch - -import geopandas as gpd -import psycopg -import pytest -from shapely.geometry import Polygon - -from coord_buffer.coords import to_wgs84 -from coord_buffer.processor import ( - buffer_polygon, - create_geojson_files, - get_latest_airac_date, - insert_tmas_to_db, - is_airac_current, - read_coords, -) - - -@pytest.fixture -def mock_geodataframe(): - """Fixture for a mock GeoDataFrame.""" - gdf = gpd.GeoDataFrame( - { - "NAMEOFAREA": ["Test Area"], - "TYPEOFAREA": ["TMAS"], - "POSITIONINDICATOR": ["TEST"], - "WEF": ["2025-05-15"], - "DATETIMEOFCHG": [None], - "NAMEOFOPERATOR": [None], - "ORIGIN": ["AIP"], - "LOCATION": ["Test Location"], - "UPPER": ["FL 95"], - "LOWER": ["4500"], - "COMMENT_1": ["C"], - "COMMENT_2": [None], - "QUALITY": ["ROUTINE"], - "CRC_ID": [None], - "CRC_POS": [None], - "CRC_TOT": [None], - "MSID": [4194], - "IDNR": [21636], - "MI_STYLE": [None], - "geometry": [Polygon([(0, 0), (1, 1), (1, 0), (0, 0)])], - }, - crs="EPSG:4326", - ) - return gdf - - -def test_to_wgs84(mock_geodataframe): - """Test conversion to WGS84 CRS.""" - with patch.object(mock_geodataframe, "to_crs") as mock_to_crs: - mock_to_crs.return_value = mock_geodataframe - result = to_wgs84(mock_geodataframe) - mock_to_crs.assert_called_once_with(epsg=4326) - assert result is mock_geodataframe - - -def test_create_geojson_files(tmp_path, mock_geodataframe): - """Test creating GeoJSON files.""" - folder = tmp_path / "output" - with ( - patch("coord_buffer.processor.clean_file_name", return_value="TEST_AREA"), - patch("coord_buffer.processor.os.makedirs") as mock_makedirs, - patch.object(gpd.GeoDataFrame, "to_file") as mock_to_file, - ): - create_geojson_files(mock_geodataframe, str(folder)) - mock_makedirs.assert_called_once_with(str(folder)) - mock_to_file.assert_called_once_with( - str(folder / "TEST_AREA.geojson"), driver="GeoJSON" - ) - - -def test_create_geojson_files_skips_tma(mock_geodataframe): - """Test skipping TMA_ prefixed areas.""" - mock_geodataframe["NAMEOFAREA"] = ["TMA_Skip"] - with ( - patch("coord_buffer.processor.clean_file_name", return_value="TMA_SKIP"), - patch("coord_buffer.processor.os.makedirs"), - patch.object(gpd.GeoDataFrame, "to_file") as mock_to_file, - ): - create_geojson_files(mock_geodataframe, "output") - mock_to_file.assert_not_called() - - -def test_insert_tmas_to_db(mock_geodataframe): - """Test inserting TMAs into database.""" - conn_params = { - "dbname": "test", - "user": "test", - "password": "test", - "host": "localhost", - "port": "5432", - } - with patch("coord_buffer.processor.psycopg.connect") as mock_connect: - mock_conn = MagicMock() - mock_cursor = MagicMock() - mock_connect.return_value = mock_conn - mock_conn.cursor.return_value = mock_cursor - - insert_tmas_to_db(mock_geodataframe, conn_params) - - mock_connect.assert_called_once_with(**conn_params) - mock_cursor.execute.assert_called_once() - mock_conn.commit.assert_called_once() - mock_cursor.close.assert_called_once() - mock_conn.close.assert_called_once() - - -def test_insert_tmas_to_db_skips_tma(mock_geodataframe): - """Test skipping TMA_ prefixed areas.""" - mock_geodataframe["NAMEOFAREA"] = ["TMA_Skip"] - conn_params = { - "dbname": "test", - "user": "test", - "password": "test", - "host": "localhost", - "port": "5432", - } - with patch("coord_buffer.processor.psycopg.connect") as mock_connect: - mock_conn = MagicMock() - mock_cursor = MagicMock() - mock_connect.return_value = mock_conn - mock_conn.cursor.return_value = mock_cursor - - insert_tmas_to_db(mock_geodataframe, conn_params) - - mock_cursor.execute.assert_not_called() - - -def test_insert_tmas_to_db_connection_error(mock_geodataframe): - """Test handling database connection error.""" - conn_params = { - "dbname": "test", - "user": "test", - "password": "test", - "host": "localhost", - "port": "5432", - } - with patch( - "coord_buffer.processor.psycopg.connect", - side_effect=psycopg.Error("Connection failed"), - ): - with pytest.raises(RuntimeError, match="Failed to insert TMAs into database"): - insert_tmas_to_db(mock_geodataframe, conn_params) - - -def test_buffer_polygon(): - """Test buffering a polygon.""" - coords = [[0, 0], [1, 1], [1, 0], [0, 0]] - gdf = gpd.GeoDataFrame(geometry=[Polygon(coords)], crs="EPSG:4326") - with ( - patch("geopandas.GeoDataFrame.to_crs") as mock_gdf_to_crs, - patch("geopandas.GeoSeries.to_crs") as mock_gs_to_crs, - patch.object(gpd.GeoDataFrame, "buffer") as mock_buffer, - ): - mock_gdf_to_crs.return_value = gdf - mock_gs_to_crs.return_value = gdf.geometry - mock_buffer.return_value = gdf.geometry - result = buffer_polygon(coords, 5) - mock_gdf_to_crs.assert_called_once_with(epsg=3006) - mock_gs_to_crs.assert_called_once_with(epsg=4326) - mock_buffer.assert_called_once_with( - distance=5 * 1852, single_sided=True, join_style=2 - ) - assert isinstance(result, gpd.GeoSeries) - - -def test_read_coords_valid(tmp_path): - """Test reading valid GeoJSON coordinates.""" - geojson = { - "type": "FeatureCollection", - "features": [ - { - "type": "Feature", - "geometry": { - "type": "Polygon", - "coordinates": [[[0, 0], [1, 1], [1, 0], [0, 0]]], - }, - } - ], - } - file = tmp_path / "test.geojson" - file.write_text(json.dumps(geojson)) - coords = read_coords(file) - assert coords == [[0, 0], [1, 1], [1, 0], [0, 0]] - - -def test_read_coords_no_features(tmp_path): - """Test GeoJSON with no features.""" - geojson = {"type": "FeatureCollection", "features": []} - file = tmp_path / "empty.geojson" - file.write_text(json.dumps(geojson)) - with pytest.raises(ValueError, match="GeoJSON file has no features"): - read_coords(file) - - -def test_read_coords_invalid_geometry(tmp_path): - """Test GeoJSON with invalid geometry type.""" - geojson = { - "type": "FeatureCollection", - "features": [ - {"type": "Feature", "geometry": {"type": "Point", "coordinates": [0, 0]}} - ], - } - file = tmp_path / "invalid.geojson" - file.write_text(json.dumps(geojson)) - with pytest.raises(ValueError, match="Unsupported geometry type: Point"): - read_coords(file) - - -def test_read_coords_invalid_coords(tmp_path): - """Test GeoJSON with invalid coordinate format.""" - geojson = { - "type": "FeatureCollection", - "features": [ - { - "type": "Feature", - "geometry": { - "type": "Polygon", - "coordinates": [[[0, 0], [1], [1, 0], [0, 0]]], - }, - } - ], - } - file = tmp_path / "invalid_coords.geojson" - file.write_text(json.dumps(geojson)) - with pytest.raises(ValueError, match="Invalid coordinate format:"): - read_coords(file) - - -def test_get_latest_airac_date(): - """Test retrieving the latest AIRAC date.""" - conn_params = { - "dbname": "test", - "user": "test", - "password": "test", - "host": "localhost", - "port": "5432", - } - with patch("coord_buffer.processor.psycopg.connect") as mock_connect: - mock_conn = MagicMock() - mock_cursor = MagicMock() - mock_connect.return_value = mock_conn - mock_conn.cursor.return_value = mock_cursor - mock_cursor.fetchone.return_value = [datetime(2025, 5, 15).date()] - - latest_wef = get_latest_airac_date(conn_params) - mock_cursor.execute.assert_called_once_with("SELECT MAX(wef) FROM tmas") - assert latest_wef == datetime(2025, 5, 15).date() - - -def test_is_airac_current(): - """Test checking if an AIRAC date is current.""" - conn_params = { - "dbname": "test", - "user": "test", - "password": "test", - "host": "localhost", - "port": "5432", - } - with patch( - "coord_buffer.processor.get_latest_airac_date", - return_value=datetime(2025, 5, 15).date(), - ): - assert is_airac_current(conn_params, "2025-05-15") is True - assert is_airac_current(conn_params, "2025-06-12") is True - assert is_airac_current(conn_params, "2025-04-17") is False - with pytest.raises(ValueError, match="Invalid AIRAC date format"): - is_airac_current(conn_params, "invalid")