From 5bb95204f2ffff79aed1766efeba6e81d3db5bef Mon Sep 17 00:00:00 2001 From: Adrien Carpentier Date: Tue, 2 Sep 2025 14:33:00 +0200 Subject: [PATCH 1/5] fix: fix CRUD method types and add a return_as_dict param for Check.update() --- udata_hydra/db/check.py | 11 +++++++++-- udata_hydra/db/resource.py | 32 +++++++++++++++++++++++--------- 2 files changed, 32 insertions(+), 11 deletions(-) diff --git a/udata_hydra/db/check.py b/udata_hydra/db/check.py index 844d7fb0..14c92883 100644 --- a/udata_hydra/db/check.py +++ b/udata_hydra/db/check.py @@ -125,9 +125,16 @@ async def insert(cls, data: dict, returning: str = "id") -> dict: return last_check_dict @classmethod - async def update(cls, check_id: int, data: dict) -> Record | None: + async def update( + cls, check_id: int, data: dict, return_as_dict: bool = False + ) -> Record | dict | None: """Update a check in DB with new data and return the check id in DB""" - return await update_table_record(table_name="checks", record_id=check_id, data=data) + check: Record | None = await update_table_record( + table_name="checks", record_id=check_id, data=data + ) + if return_as_dict: + return dict(check) if check else None + return check @classmethod async def delete(cls, check_id: int) -> int: diff --git a/udata_hydra/db/resource.py b/udata_hydra/db/resource.py index 2f3d6940..93541d16 100644 --- a/udata_hydra/db/resource.py +++ b/udata_hydra/db/resource.py @@ -151,8 +151,8 @@ def get_excluded_clause() -> str: ] ) - @staticmethod - async def get_stuck_resources() -> list[str]: + @classmethod + async def get_stuck_resources(cls) -> list[Record]: """Some resources end up being stuck in a not null status forever, we want to get them back on track. This returns all resource ids of such stuck resources. @@ -167,11 +167,25 @@ async def get_stuck_resources() -> list[str]: WHERE ca.status IS NOT NULL AND c.created_at < '{threshold}';""" pool = await context.pool() async with pool.acquire() as connection: - rows = await connection.fetch(q) - return [str(r["resource_id"]) for r in rows] if rows else [] + return await connection.fetch(q) - @classmethod - async def clean_up_statuses(cls): - stuck_resources: list[str] = await cls.get_stuck_resources() - for rid in stuck_resources: - await cls.update(rid, {"status": None}) + @staticmethod + async def clean_up_statuses() -> int: + """Reset status to None for all stuck resources in a single query.""" + threshold = datetime.now(timezone.utc) - timedelta(seconds=config.STUCK_THRESHOLD_SECONDS) + + pool = await context.pool() + async with pool.acquire() as connection: + # Update all stuck resources in a single query + q = """ + UPDATE catalog + SET status = NULL, status_since = $1 + WHERE resource_id IN ( + SELECT ca.resource_id + FROM checks c + JOIN catalog ca ON c.id = ca.last_check + WHERE ca.status IS NOT NULL AND c.created_at < $2 + ) + """ + result = await connection.execute(q, datetime.now(timezone.utc), threshold) + return result # Returns the number of affected rows From 9799347252f20a4145f0e45534a6e98662077a72 Mon Sep 17 00:00:00 2001 From: Adrien Carpentier Date: Tue, 2 Sep 2025 14:34:57 +0200 Subject: [PATCH 2/5] feat: use return_as_dict --- udata_hydra/analysis/csv.py | 7 +++++-- udata_hydra/analysis/geojson.py | 5 ++++- udata_hydra/db/check.py | 4 ++-- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/udata_hydra/analysis/csv.py b/udata_hydra/analysis/csv.py index 907c5e66..28ada447 100644 --- a/udata_hydra/analysis/csv.py +++ b/udata_hydra/analysis/csv.py @@ -113,7 +113,9 @@ async def analyse_csv( table_name = hashlib.md5(url.encode("utf-8")).hexdigest() timer.mark("download-file") - check = await Check.update(check["id"], {"parsing_started_at": datetime.now(timezone.utc)}) + check = await Check.update( + check["id"], {"parsing_started_at": datetime.now(timezone.utc)}, return_as_dict=True + ) # Launch csv-detective against given file try: @@ -156,7 +158,7 @@ async def analyse_csv( resource_id=resource_id, debug_insert=debug_insert, ) - check = await Check.update(check["id"], {"parsing_table": table_name}) + check = await Check.update(check["id"], {"parsing_table": table_name}, return_as_dict=True) timer.mark("csv-to-db") try: @@ -200,6 +202,7 @@ async def analyse_csv( { "parsing_finished_at": datetime.now(timezone.utc), }, + return_as_dict=True, ) await csv_to_db_index(table_name, csv_inspection, check) diff --git a/udata_hydra/analysis/geojson.py b/udata_hydra/analysis/geojson.py index 7243a161..caabbf8e 100644 --- a/udata_hydra/analysis/geojson.py +++ b/udata_hydra/analysis/geojson.py @@ -63,7 +63,9 @@ async def analyse_geojson( ) timer.mark("download-file") - check = await Check.update(check["id"], {"parsing_started_at": datetime.now(timezone.utc)}) + check = await Check.update( + check["id"], {"parsing_started_at": datetime.now(timezone.utc)}, return_as_dict=True + ) # Convert to PMTiles try: @@ -88,6 +90,7 @@ async def analyse_geojson( "pmtiles_url": pmtiles_url, "pmtiles_size": pmtiles_size, }, + return_as_dict=True, ) except (ParseException, IOException) as e: diff --git a/udata_hydra/db/check.py b/udata_hydra/db/check.py index 14c92883..f776f2ae 100644 --- a/udata_hydra/db/check.py +++ b/udata_hydra/db/check.py @@ -137,8 +137,8 @@ async def update( return check @classmethod - async def delete(cls, check_id: int) -> int: + async def delete(cls, check_id: int) -> None: pool = await context.pool() async with pool.acquire() as connection: q = """DELETE FROM checks WHERE id = $1""" - return await connection.fetch(q, check_id) + await connection.execute(q, check_id) From 7154d09ed19c379f934153423d0b75186b563bae Mon Sep 17 00:00:00 2001 From: Adrien Carpentier Date: Tue, 2 Sep 2025 14:51:43 +0200 Subject: [PATCH 3/5] fix: fix more types --- udata_hydra/analysis/csv.py | 8 ++++---- udata_hydra/analysis/geojson.py | 4 ++-- udata_hydra/db/__init__.py | 2 +- udata_hydra/db/check.py | 9 +++------ udata_hydra/utils/errors.py | 3 +-- 5 files changed, 11 insertions(+), 15 deletions(-) diff --git a/udata_hydra/analysis/csv.py b/udata_hydra/analysis/csv.py index 28ada447..7893a6c9 100644 --- a/udata_hydra/analysis/csv.py +++ b/udata_hydra/analysis/csv.py @@ -115,7 +115,7 @@ async def analyse_csv( check = await Check.update( check["id"], {"parsing_started_at": datetime.now(timezone.utc)}, return_as_dict=True - ) + ) # type: ignore # Launch csv-detective against given file try: @@ -158,7 +158,7 @@ async def analyse_csv( resource_id=resource_id, debug_insert=debug_insert, ) - check = await Check.update(check["id"], {"parsing_table": table_name}, return_as_dict=True) + check = await Check.update(check["id"], {"parsing_table": table_name}, return_as_dict=True) # type: ignore timer.mark("csv-to-db") try: @@ -203,7 +203,7 @@ async def analyse_csv( "parsing_finished_at": datetime.now(timezone.utc), }, return_as_dict=True, - ) + ) # type: ignore await csv_to_db_index(table_name, csv_inspection, check) except (ParseException, IOException) as e: @@ -438,7 +438,7 @@ async def csv_to_db( await db.execute(q, *data.values()) -async def csv_to_db_index(table_name: str, inspection: dict, check: Record) -> None: +async def csv_to_db_index(table_name: str, inspection: dict, check: dict) -> None: """Store meta info about a converted CSV table in `DATABASE_URL_CSV.tables_index`""" db = await context.pool("csv") q = "INSERT INTO tables_index(parsing_table, csv_detective, resource_id, url) VALUES($1, $2, $3, $4)" diff --git a/udata_hydra/analysis/geojson.py b/udata_hydra/analysis/geojson.py index caabbf8e..90346503 100644 --- a/udata_hydra/analysis/geojson.py +++ b/udata_hydra/analysis/geojson.py @@ -65,7 +65,7 @@ async def analyse_geojson( check = await Check.update( check["id"], {"parsing_started_at": datetime.now(timezone.utc)}, return_as_dict=True - ) + ) # type: ignore # Convert to PMTiles try: @@ -91,7 +91,7 @@ async def analyse_geojson( "pmtiles_size": pmtiles_size, }, return_as_dict=True, - ) + ) # type: ignore except (ParseException, IOException) as e: await handle_parse_exception(e, None, check) diff --git a/udata_hydra/db/__init__.py b/udata_hydra/db/__init__.py index 78db2ee8..ef1099f7 100644 --- a/udata_hydra/db/__init__.py +++ b/udata_hydra/db/__init__.py @@ -38,7 +38,7 @@ def compute_update_query(table_name: str, data: dict, returning: str = "*") -> s """ -async def update_table_record(table_name: str, record_id: int, data: dict) -> Record | None: +async def update_table_record(table_name: str, record_id: int, data: dict) -> Record: data = convert_dict_values_to_json(data) q = compute_update_query(table_name, data) pool = await context.pool() diff --git a/udata_hydra/db/check.py b/udata_hydra/db/check.py index f776f2ae..aa1c9753 100644 --- a/udata_hydra/db/check.py +++ b/udata_hydra/db/check.py @@ -125,15 +125,12 @@ async def insert(cls, data: dict, returning: str = "id") -> dict: return last_check_dict @classmethod - async def update( - cls, check_id: int, data: dict, return_as_dict: bool = False - ) -> Record | dict | None: - """Update a check in DB with new data and return the check id in DB""" - check: Record | None = await update_table_record( + async def update(cls, check_id: int, data: dict, return_as_dict: bool = False) -> Record | dict: + check: Record = await update_table_record( table_name="checks", record_id=check_id, data=data ) if return_as_dict: - return dict(check) if check else None + return dict(check) return check @classmethod diff --git a/udata_hydra/utils/errors.py b/udata_hydra/utils/errors.py index d6564d6e..91fe9a59 100644 --- a/udata_hydra/utils/errors.py +++ b/udata_hydra/utils/errors.py @@ -2,7 +2,6 @@ from datetime import datetime, timezone import sentry_sdk -from asyncpg import Record from udata_hydra import context from udata_hydra.db.check import Check @@ -111,7 +110,7 @@ class IOException(ExceptionWithSentryDetails): async def handle_parse_exception( - e: IOException | ParseException, table_name: str | None, check: Record | None + e: IOException | ParseException, table_name: str | None, check: dict | None ) -> None: """Specific IO/ParseException handling. Store error in :check: if in a check context. Also cleanup :table_name: if needed.""" if table_name is not None: From d02c9927547b2d479dcea965a0cd5d6bb6c48f60 Mon Sep 17 00:00:00 2001 From: Adrien Carpentier Date: Tue, 9 Sep 2025 17:31:46 +0200 Subject: [PATCH 4/5] docs: update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e3ef567..4ad379b6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ - Dynamic version in user agent [#328](https://github.com/datagouv/hydra/pull/328) and [#337](https://github.com/datagouv/hydra/pull/337) - Add performance tests for csv to geojson and geojson to pmtiles conversion on geographical big data, and CLI commands to convert geographical files locally [#319](https://github.com/datagouv/hydra/pull/319) - Allow crawling of non datagouv URLs with CLI [#312](https://github.com/datagouv/hydra/pull/312) +- Fix CRUD types returns and reduce DB queries when cleanup statuses [#331](https://github.com/datagouv/hydra/pull/331) ## 2.4.1 (2025-09-03) From 48594ed7c108b55ecdfd497fe53a41b1932849bf Mon Sep 17 00:00:00 2001 From: Adrien Carpentier Date: Thu, 11 Sep 2025 11:01:25 +0200 Subject: [PATCH 5/5] feat: use as_dict in all Check CRUD methods --- tests/conftest.py | 2 +- tests/test_crawl/test_crawl.py | 4 +- udata_hydra/analysis/csv.py | 6 +- udata_hydra/analysis/geojson.py | 4 +- udata_hydra/cli.py | 28 ++++----- udata_hydra/crawl/preprocess_check_data.py | 12 ++-- udata_hydra/db/__init__.py | 2 +- udata_hydra/db/check.py | 71 ++++++++++++++-------- udata_hydra/db/resource.py | 21 +------ udata_hydra/routes/checks.py | 20 +++--- 10 files changed, 85 insertions(+), 85 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index f721c511..e4fa5fc1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -284,7 +284,7 @@ async def _fake_check( "geojson_url": "https://example.org/file.geojson" if pmtiles_url else None, "geojson_size": 1024 if geojson_url else None, } - check: dict = await Check.insert(data=data, returning="*") + check: dict = await Check.insert(data=data, returning="*", as_dict=True) data["id"] = check["id"] if check.get("dataset_id"): data["dataset_id"] = check["dataset_id"] diff --git a/tests/test_crawl/test_crawl.py b/tests/test_crawl/test_crawl.py index 010e61c4..8136ca27 100644 --- a/tests/test_crawl/test_crawl.py +++ b/tests/test_crawl/test_crawl.py @@ -687,10 +687,10 @@ async def test_wrong_url_in_catalog( if url_changed: r = await Resource.get(resource_id=RESOURCE_ID, column_name="url") assert r["url"] == new_url - check = await Check.get_by_resource_id(RESOURCE_ID) + check = await Check.get_by_resource_id(RESOURCE_ID, as_dict=True) assert check.get("parsing_finished_at") else: - check = await Check.get_by_resource_id(RESOURCE_ID) + check = await Check.get_by_resource_id(RESOURCE_ID, as_dict=True) assert check["status"] == 404 diff --git a/udata_hydra/analysis/csv.py b/udata_hydra/analysis/csv.py index 58be3f4f..b3983f7d 100644 --- a/udata_hydra/analysis/csv.py +++ b/udata_hydra/analysis/csv.py @@ -114,7 +114,7 @@ async def analyse_csv( timer.mark("download-file") check = await Check.update( - check["id"], {"parsing_started_at": datetime.now(timezone.utc)}, return_as_dict=True + check["id"], {"parsing_started_at": datetime.now(timezone.utc)}, as_dict=True ) # type: ignore # Launch csv-detective against given file @@ -158,7 +158,7 @@ async def analyse_csv( resource_id=resource_id, debug_insert=debug_insert, ) - check = await Check.update(check["id"], {"parsing_table": table_name}, return_as_dict=True) # type: ignore + check = await Check.update(check["id"], {"parsing_table": table_name}, as_dict=True) # type: ignore timer.mark("csv-to-db") try: @@ -202,7 +202,7 @@ async def analyse_csv( { "parsing_finished_at": datetime.now(timezone.utc), }, - return_as_dict=True, + as_dict=True, ) # type: ignore await csv_to_db_index(table_name, csv_inspection, check) diff --git a/udata_hydra/analysis/geojson.py b/udata_hydra/analysis/geojson.py index 66dd0e3f..ba48ffc1 100644 --- a/udata_hydra/analysis/geojson.py +++ b/udata_hydra/analysis/geojson.py @@ -68,7 +68,7 @@ async def analyse_geojson( timer.mark("download-file") check = await Check.update( - check["id"], {"parsing_started_at": datetime.now(timezone.utc)}, return_as_dict=True + check["id"], {"parsing_started_at": datetime.now(timezone.utc)}, as_dict=True ) # type: ignore # Convert to PMTiles @@ -95,7 +95,7 @@ async def analyse_geojson( "pmtiles_url": pmtiles_url, "pmtiles_size": pmtiles_size, }, - return_as_dict=True, + as_dict=True, ) # type: ignore except (ParseException, IOException) as e: diff --git a/udata_hydra/cli.py b/udata_hydra/cli.py index 53c0f86d..7e4dad95 100644 --- a/udata_hydra/cli.py +++ b/udata_hydra/cli.py @@ -185,7 +185,7 @@ async def check_resource(resource_id: str, method: str = "get", force_analysis: @cli(name="analyse-resource") async def analyse_resource_cli(resource_id: str): """Trigger a resource analysis, mainly useful for local debug (with breakpoints)""" - check: Record | None = await Check.get_by_resource_id(resource_id) + check: Record | None = await Check.get_by_resource_id(resource_id) # type: ignore if not check: log.error("Could not find a check linked to the specified resource ID") return @@ -210,21 +210,19 @@ async def analyse_csv_cli( # Try to get check from check_id if check_id: - record = await Check.get_by_id(int(check_id), with_deleted=True) - check = dict(record) if record else None + check: Record | None = await Check.get_by_id(int(check_id), with_deleted=True) # type: ignore # Try to get check from URL if not check and url: - records = await Check.get_by_url(url) - if records: - if len(records) > 1: + checks: list[Record] | None = await Check.get_by_url(url) # type: ignore + if checks: + if len(checks) > 1: log.warning(f"Multiple checks found for URL {url}, using the latest one") - check = dict(records[0]) + check = checks[0] # Try to get check from resource_id if not check and resource_id: - record = await Check.get_by_resource_id(resource_id) - check = dict(record) if record else None + check: Record | None = await Check.get_by_resource_id(resource_id) # type: ignore # We cannot get a check, it's an external URL analysis, we need to create a temporary check if not check and url: @@ -239,13 +237,13 @@ async def analyse_csv_cli( "timeout": False, }, returning="*", - ) + ) # type: ignore elif not check: log.error("Could not find a check for the specified parameters") return - await analyse_csv(check=check, debug_insert=debug_insert) + await analyse_csv(check=dict(check), debug_insert=debug_insert) log.info("CSV analysis completed") if url and tmp_resource_id: @@ -259,7 +257,7 @@ async def analyse_csv_cli( await csv_pool.execute(f"DELETE FROM tables_index WHERE parsing_table='{table_hash}'") # Clean up the temporary resource and temporary check from catalog - check = await Check.get_by_resource_id(tmp_resource_id) + check: Record | None = await Check.get_by_resource_id(tmp_resource_id) # type: ignore if check: await Check.delete(check["id"]) await Resource.delete(resource_id=tmp_resource_id, hard_delete=True) @@ -284,14 +282,14 @@ async def analyse_geojson_cli( assert check_id or url or resource_id check = None if check_id: - check: Record | None = await Check.get_by_id(int(check_id), with_deleted=True) + check: Record | None = await Check.get_by_id(int(check_id), with_deleted=True) # type: ignore if not check and url: - checks: list[Record] | None = await Check.get_by_url(url) + checks: list[Record] | None = await Check.get_by_url(url) # type: ignore if checks and len(checks) > 1: log.warning(f"Multiple checks found for URL {url}, using the latest one") check = checks[0] if checks else None if not check and resource_id: - check: Record | None = await Check.get_by_resource_id(resource_id) + check: Record | None = await Check.get_by_resource_id(resource_id) # type: ignore if not check: if check_id: log.error("Could not retrieve the specified check") diff --git a/udata_hydra/crawl/preprocess_check_data.py b/udata_hydra/crawl/preprocess_check_data.py index 0805d1e8..c5bf7169 100644 --- a/udata_hydra/crawl/preprocess_check_data.py +++ b/udata_hydra/crawl/preprocess_check_data.py @@ -1,8 +1,6 @@ import json from datetime import datetime, timezone -from asyncpg import Record - from udata_hydra.crawl.calculate_next_check import calculate_next_check_date from udata_hydra.crawl.helpers import get_content_type_from_header, is_valid_status from udata_hydra.db.check import Check @@ -26,14 +24,14 @@ async def preprocess_check_data(dataset_id: str, check_data: dict) -> tuple[dict check_data["resource_id"] = str(check_data["resource_id"]) - last_check: dict | None = None - last_check_record: Record | None = await Check.get_by_resource_id(check_data["resource_id"]) - if last_check_record: - last_check = dict(last_check_record) + last_check: dict | None = await Check.get_by_resource_id( + check_data["resource_id"], as_dict=True + ) # type: ignore has_changed: bool = await has_check_changed(check_data, last_check) check_data["next_check_at"] = calculate_next_check_date(has_changed, last_check, None) - new_check: dict = await Check.insert(data=check_data, returning="*") + + new_check: dict = await Check.insert(data=check_data, returning="*", as_dict=True) # type: ignore if has_changed: queue.enqueue( diff --git a/udata_hydra/db/__init__.py b/udata_hydra/db/__init__.py index ef1099f7..78db2ee8 100644 --- a/udata_hydra/db/__init__.py +++ b/udata_hydra/db/__init__.py @@ -38,7 +38,7 @@ def compute_update_query(table_name: str, data: dict, returning: str = "*") -> s """ -async def update_table_record(table_name: str, record_id: int, data: dict) -> Record: +async def update_table_record(table_name: str, record_id: int, data: dict) -> Record | None: data = convert_dict_values_to_json(data) q = compute_update_query(table_name, data) pool = await context.pool() diff --git a/udata_hydra/db/check.py b/udata_hydra/db/check.py index aa1c9753..ce6179af 100644 --- a/udata_hydra/db/check.py +++ b/udata_hydra/db/check.py @@ -13,8 +13,24 @@ class Check: """Represents a check in the "checks" DB table""" + @staticmethod + def _convert_to_dict_if_needed(result: Record | None, as_dict: bool) -> Record | dict | None: + if as_dict and result: + return dict(result) + return result + + @staticmethod + def _convert_list_to_dict_if_needed( + results: list[Record], as_dict: bool + ) -> list[Record] | list[dict]: + if as_dict: + return [dict(result) for result in results] + return results + @classmethod - async def get_by_id(cls, check_id: int, with_deleted: bool = False) -> Record | None: + async def get_by_id( + cls, check_id: int, with_deleted: bool = False, as_dict: bool = False + ) -> Record | dict | None: pool = await context.pool() async with pool.acquire() as connection: q = """ @@ -24,12 +40,13 @@ async def get_by_id(cls, check_id: int, with_deleted: bool = False) -> Record | """ if not with_deleted: q += " AND catalog.deleted = FALSE" - return await connection.fetchrow(q, check_id) + result = await connection.fetchrow(q, check_id) + return cls._convert_to_dict_if_needed(result, as_dict) @classmethod async def get_by_resource_id( - cls, resource_id: str, with_deleted: bool = False - ) -> Record | None: + cls, resource_id: str, with_deleted: bool = False, as_dict: bool = False + ) -> Record | dict | None: pool = await context.pool() async with pool.acquire() as connection: q = """ @@ -39,10 +56,11 @@ async def get_by_resource_id( """ if not with_deleted: q += " AND catalog.deleted = FALSE" - return await connection.fetchrow(q, resource_id) + result = await connection.fetchrow(q, resource_id) + return cls._convert_to_dict_if_needed(result, as_dict) @classmethod - async def get_by_url(cls, url: str) -> list[Record]: + async def get_by_url(cls, url: str, as_dict: bool = False) -> list[Record] | list[dict]: pool = await context.pool() async with pool.acquire() as connection: q = """ @@ -50,12 +68,13 @@ async def get_by_url(cls, url: str) -> list[Record]: WHERE url = $1 ORDER BY created_at DESC """ - return await connection.fetch(q, url) + results = await connection.fetch(q, url) + return cls._convert_list_to_dict_if_needed(results, as_dict) @classmethod async def get_latest( - cls, url: str | None = None, resource_id: str | None = None - ) -> Record | None: + cls, url: str | None = None, resource_id: str | None = None, as_dict: bool = False + ) -> Record | dict | None: column: str = "url" if url else "resource_id" pool = await context.pool() async with pool.acquire() as connection: @@ -66,10 +85,13 @@ async def get_latest( WHERE catalog.{column} = $1 AND checks.id = catalog.last_check """ - return await connection.fetchrow(q, url or resource_id) + result = await connection.fetchrow(q, url or resource_id) + return cls._convert_to_dict_if_needed(result, as_dict) @classmethod - async def get_all(cls, url: str | None = None, resource_id: str | None = None) -> list[Record]: + async def get_all( + cls, url: str | None = None, resource_id: str | None = None, as_dict: bool = False + ) -> list[Record] | list[dict]: column: str = "url" if url else "resource_id" pool = await context.pool() async with pool.acquire() as connection: @@ -81,12 +103,13 @@ async def get_all(cls, url: str | None = None, resource_id: str | None = None) - AND catalog.{column} = checks.{column} ORDER BY created_at DESC """ - return await connection.fetch(q, url or resource_id) + results = await connection.fetch(q, url or resource_id) + return cls._convert_list_to_dict_if_needed(results, as_dict) @classmethod async def get_group_by_for_date( - cls, column: str, date: date, page_size: int = 20 - ) -> list[Record]: + cls, column: str, date: date, page_size: int = 20, as_dict: bool = False + ) -> list[Record] | list[dict]: pool = await context.pool() async with pool.acquire() as connection: q = f""" @@ -97,22 +120,20 @@ async def get_group_by_for_date( ORDER BY count desc LIMIT $2 """ - return await connection.fetch(q, date, page_size) + results = await connection.fetch(q, date, page_size) + return cls._convert_list_to_dict_if_needed(results, as_dict) @classmethod - async def insert(cls, data: dict, returning: str = "id") -> dict: + async def insert(cls, data: dict, returning: str = "id", as_dict: bool = True) -> Record | dict: """ Insert a new check in DB, associate it with the resource and return the check dict, optionally associated with the resource dataset_id. This uses the info from the last check of the same resource. - - Note: Returns dict instead of Record because this method performs additional operations beyond simple insertion (joins with catalog table, adds dataset_id). """ json_data = convert_dict_values_to_json(data) q1: str = compute_insert_query(table_name="checks", data=json_data, returning=returning) pool = await context.pool() async with pool.acquire() as connection: last_check: Record = await connection.fetchrow(q1, *json_data.values()) - last_check_dict = dict(last_check) q2 = ( """UPDATE catalog SET last_check = $1 WHERE resource_id = $2 RETURNING dataset_id""" ) @@ -121,17 +142,17 @@ async def insert(cls, data: dict, returning: str = "id") -> dict: ) # Add the dataset_id arg to the check response, if we can, and if it's asked if returning in ["*", "dataset_id"] and updated_resource: + last_check_dict = dict(last_check) last_check_dict["dataset_id"] = updated_resource["dataset_id"] - return last_check_dict + return last_check_dict if as_dict else last_check + return dict(last_check) if as_dict else last_check @classmethod - async def update(cls, check_id: int, data: dict, return_as_dict: bool = False) -> Record | dict: - check: Record = await update_table_record( + async def update(cls, check_id: int, data: dict, as_dict: bool = False) -> Record | dict | None: + check: Record | None = await update_table_record( table_name="checks", record_id=check_id, data=data ) - if return_as_dict: - return dict(check) - return check + return cls._convert_to_dict_if_needed(check, as_dict) @classmethod async def delete(cls, check_id: int) -> None: diff --git a/udata_hydra/db/resource.py b/udata_hydra/db/resource.py index 8846f466..4468c631 100644 --- a/udata_hydra/db/resource.py +++ b/udata_hydra/db/resource.py @@ -156,27 +156,10 @@ def get_excluded_clause() -> str: ] ) - @classmethod - async def get_stuck_resources(cls) -> list[Record]: - """Some resources end up being stuck in a not null status forever, - we want to get them back on track. - This returns all resource ids of such stuck resources. - """ - threshold = ( - datetime.now(timezone.utc) - timedelta(seconds=config.STUCK_THRESHOLD_SECONDS) - ).strftime("%Y-%m-%d %H:%M:%S") - q = f"""SELECT ca.resource_id - FROM checks c - JOIN catalog ca - ON c.id = ca.last_check - WHERE ca.status IS NOT NULL AND c.created_at < '{threshold}';""" - pool = await context.pool() - async with pool.acquire() as connection: - return await connection.fetch(q) - @staticmethod async def clean_up_statuses() -> int: - """Reset status to None for all stuck resources in a single query.""" + """Some resources end up being stuck in a not null status forever, + Reset status to None for all those stuck resources in a single query.""" threshold = datetime.now(timezone.utc) - timedelta(seconds=config.STUCK_THRESHOLD_SECONDS) pool = await context.pool() diff --git a/udata_hydra/routes/checks.py b/udata_hydra/routes/checks.py index 88981240..ad0f9cdb 100644 --- a/udata_hydra/routes/checks.py +++ b/udata_hydra/routes/checks.py @@ -16,22 +16,22 @@ async def get_latest_check(request: web.Request) -> web.Response: """Get the latest check for a given URL or resource_id""" url, resource_id = get_request_params(request, params_names=["url", "resource_id"]) - data: Record | None = await Check.get_latest(url, resource_id) + data: dict | None = await Check.get_latest(url, resource_id, as_dict=True) # type: ignore if not data: raise web.HTTPNotFound() if data["deleted"]: raise web.HTTPGone() - return web.json_response(CheckSchema().dump(dict(data))) + return web.json_response(CheckSchema().dump(data)) async def get_all_checks(request: web.Request) -> web.Response: url, resource_id = get_request_params(request, params_names=["url", "resource_id"]) - data: list | None = await Check.get_all(url, resource_id) - if not data: + checks: list | None = await Check.get_all(url, resource_id, as_dict=True) + if not checks: raise web.HTTPNotFound() - return web.json_response([CheckSchema().dump(dict(r)) for r in data]) + return web.json_response([CheckSchema().dump(c) for c in checks]) async def get_checks_aggregate(request: web.Request) -> web.Response: @@ -49,11 +49,11 @@ async def get_checks_aggregate(request: web.Request) -> web.Response: column: str = request.query.get("group_by") if not column: raise web.HTTPBadRequest(text="Missing mandatory 'group_by' param.") - data: list | None = await Check.get_group_by_for_date(column, created_at_date) - if not data: + checks: list | None = await Check.get_group_by_for_date(column, created_at_date, as_dict=True) + if not checks: raise web.HTTPNotFound() - return web.json_response([CheckGroupBy().dump(dict(r)) for r in data]) + return web.json_response([CheckGroupBy().dump(c) for c in checks]) async def create_check(request: web.Request) -> web.Response: @@ -89,8 +89,8 @@ async def create_check(request: web.Request) -> web.Response: ) context.monitor().refresh(status) - check: Record | None = await Check.get_latest(url, resource_id) + check: dict | None = await Check.get_latest(url, resource_id, as_dict=True) # type: ignore if not check: raise web.HTTPBadRequest(text=f"Check not created, status: {status}") - return web.json_response(CheckSchema().dump(dict(check)), status=201) + return web.json_response(CheckSchema().dump(check), status=201)