diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e3ef567..4ad379b6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ - Dynamic version in user agent [#328](https://github.com/datagouv/hydra/pull/328) and [#337](https://github.com/datagouv/hydra/pull/337) - Add performance tests for csv to geojson and geojson to pmtiles conversion on geographical big data, and CLI commands to convert geographical files locally [#319](https://github.com/datagouv/hydra/pull/319) - Allow crawling of non datagouv URLs with CLI [#312](https://github.com/datagouv/hydra/pull/312) +- Fix CRUD types returns and reduce DB queries when cleanup statuses [#331](https://github.com/datagouv/hydra/pull/331) ## 2.4.1 (2025-09-03) diff --git a/tests/conftest.py b/tests/conftest.py index f721c511..e4fa5fc1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -284,7 +284,7 @@ async def _fake_check( "geojson_url": "https://example.org/file.geojson" if pmtiles_url else None, "geojson_size": 1024 if geojson_url else None, } - check: dict = await Check.insert(data=data, returning="*") + check: dict = await Check.insert(data=data, returning="*", as_dict=True) data["id"] = check["id"] if check.get("dataset_id"): data["dataset_id"] = check["dataset_id"] diff --git a/tests/test_crawl/test_crawl.py b/tests/test_crawl/test_crawl.py index 010e61c4..8136ca27 100644 --- a/tests/test_crawl/test_crawl.py +++ b/tests/test_crawl/test_crawl.py @@ -687,10 +687,10 @@ async def test_wrong_url_in_catalog( if url_changed: r = await Resource.get(resource_id=RESOURCE_ID, column_name="url") assert r["url"] == new_url - check = await Check.get_by_resource_id(RESOURCE_ID) + check = await Check.get_by_resource_id(RESOURCE_ID, as_dict=True) assert check.get("parsing_finished_at") else: - check = await Check.get_by_resource_id(RESOURCE_ID) + check = await Check.get_by_resource_id(RESOURCE_ID, as_dict=True) assert check["status"] == 404 diff --git a/udata_hydra/analysis/csv.py b/udata_hydra/analysis/csv.py index f84751ae..b3983f7d 100644 --- a/udata_hydra/analysis/csv.py +++ b/udata_hydra/analysis/csv.py @@ -113,7 +113,9 @@ async def analyse_csv( table_name = hashlib.md5(url.encode("utf-8")).hexdigest() timer.mark("download-file") - check = await Check.update(check["id"], {"parsing_started_at": datetime.now(timezone.utc)}) + check = await Check.update( + check["id"], {"parsing_started_at": datetime.now(timezone.utc)}, as_dict=True + ) # type: ignore # Launch csv-detective against given file try: @@ -156,7 +158,7 @@ async def analyse_csv( resource_id=resource_id, debug_insert=debug_insert, ) - check = await Check.update(check["id"], {"parsing_table": table_name}) + check = await Check.update(check["id"], {"parsing_table": table_name}, as_dict=True) # type: ignore timer.mark("csv-to-db") try: @@ -200,7 +202,8 @@ async def analyse_csv( { "parsing_finished_at": datetime.now(timezone.utc), }, - ) + as_dict=True, + ) # type: ignore await csv_to_db_index(table_name, csv_inspection, check) except (ParseException, IOException) as e: @@ -442,7 +445,7 @@ async def csv_to_db( await db.execute(q, *data.values()) -async def csv_to_db_index(table_name: str, inspection: dict, check: Record) -> None: +async def csv_to_db_index(table_name: str, inspection: dict, check: dict) -> None: """Store meta info about a converted CSV table in `DATABASE_URL_CSV.tables_index`""" db = await context.pool("csv") q = "INSERT INTO tables_index(parsing_table, csv_detective, resource_id, url) VALUES($1, $2, $3, $4)" diff --git a/udata_hydra/analysis/geojson.py b/udata_hydra/analysis/geojson.py index 57938958..ba48ffc1 100644 --- a/udata_hydra/analysis/geojson.py +++ b/udata_hydra/analysis/geojson.py @@ -67,7 +67,9 @@ async def analyse_geojson( ) timer.mark("download-file") - check = await Check.update(check["id"], {"parsing_started_at": datetime.now(timezone.utc)}) + check = await Check.update( + check["id"], {"parsing_started_at": datetime.now(timezone.utc)}, as_dict=True + ) # type: ignore # Convert to PMTiles try: @@ -93,7 +95,8 @@ async def analyse_geojson( "pmtiles_url": pmtiles_url, "pmtiles_size": pmtiles_size, }, - ) + as_dict=True, + ) # type: ignore except (ParseException, IOException) as e: check = await handle_parse_exception(e, None, check) diff --git a/udata_hydra/cli.py b/udata_hydra/cli.py index 53c0f86d..7e4dad95 100644 --- a/udata_hydra/cli.py +++ b/udata_hydra/cli.py @@ -185,7 +185,7 @@ async def check_resource(resource_id: str, method: str = "get", force_analysis: @cli(name="analyse-resource") async def analyse_resource_cli(resource_id: str): """Trigger a resource analysis, mainly useful for local debug (with breakpoints)""" - check: Record | None = await Check.get_by_resource_id(resource_id) + check: Record | None = await Check.get_by_resource_id(resource_id) # type: ignore if not check: log.error("Could not find a check linked to the specified resource ID") return @@ -210,21 +210,19 @@ async def analyse_csv_cli( # Try to get check from check_id if check_id: - record = await Check.get_by_id(int(check_id), with_deleted=True) - check = dict(record) if record else None + check: Record | None = await Check.get_by_id(int(check_id), with_deleted=True) # type: ignore # Try to get check from URL if not check and url: - records = await Check.get_by_url(url) - if records: - if len(records) > 1: + checks: list[Record] | None = await Check.get_by_url(url) # type: ignore + if checks: + if len(checks) > 1: log.warning(f"Multiple checks found for URL {url}, using the latest one") - check = dict(records[0]) + check = checks[0] # Try to get check from resource_id if not check and resource_id: - record = await Check.get_by_resource_id(resource_id) - check = dict(record) if record else None + check: Record | None = await Check.get_by_resource_id(resource_id) # type: ignore # We cannot get a check, it's an external URL analysis, we need to create a temporary check if not check and url: @@ -239,13 +237,13 @@ async def analyse_csv_cli( "timeout": False, }, returning="*", - ) + ) # type: ignore elif not check: log.error("Could not find a check for the specified parameters") return - await analyse_csv(check=check, debug_insert=debug_insert) + await analyse_csv(check=dict(check), debug_insert=debug_insert) log.info("CSV analysis completed") if url and tmp_resource_id: @@ -259,7 +257,7 @@ async def analyse_csv_cli( await csv_pool.execute(f"DELETE FROM tables_index WHERE parsing_table='{table_hash}'") # Clean up the temporary resource and temporary check from catalog - check = await Check.get_by_resource_id(tmp_resource_id) + check: Record | None = await Check.get_by_resource_id(tmp_resource_id) # type: ignore if check: await Check.delete(check["id"]) await Resource.delete(resource_id=tmp_resource_id, hard_delete=True) @@ -284,14 +282,14 @@ async def analyse_geojson_cli( assert check_id or url or resource_id check = None if check_id: - check: Record | None = await Check.get_by_id(int(check_id), with_deleted=True) + check: Record | None = await Check.get_by_id(int(check_id), with_deleted=True) # type: ignore if not check and url: - checks: list[Record] | None = await Check.get_by_url(url) + checks: list[Record] | None = await Check.get_by_url(url) # type: ignore if checks and len(checks) > 1: log.warning(f"Multiple checks found for URL {url}, using the latest one") check = checks[0] if checks else None if not check and resource_id: - check: Record | None = await Check.get_by_resource_id(resource_id) + check: Record | None = await Check.get_by_resource_id(resource_id) # type: ignore if not check: if check_id: log.error("Could not retrieve the specified check") diff --git a/udata_hydra/crawl/preprocess_check_data.py b/udata_hydra/crawl/preprocess_check_data.py index 0805d1e8..c5bf7169 100644 --- a/udata_hydra/crawl/preprocess_check_data.py +++ b/udata_hydra/crawl/preprocess_check_data.py @@ -1,8 +1,6 @@ import json from datetime import datetime, timezone -from asyncpg import Record - from udata_hydra.crawl.calculate_next_check import calculate_next_check_date from udata_hydra.crawl.helpers import get_content_type_from_header, is_valid_status from udata_hydra.db.check import Check @@ -26,14 +24,14 @@ async def preprocess_check_data(dataset_id: str, check_data: dict) -> tuple[dict check_data["resource_id"] = str(check_data["resource_id"]) - last_check: dict | None = None - last_check_record: Record | None = await Check.get_by_resource_id(check_data["resource_id"]) - if last_check_record: - last_check = dict(last_check_record) + last_check: dict | None = await Check.get_by_resource_id( + check_data["resource_id"], as_dict=True + ) # type: ignore has_changed: bool = await has_check_changed(check_data, last_check) check_data["next_check_at"] = calculate_next_check_date(has_changed, last_check, None) - new_check: dict = await Check.insert(data=check_data, returning="*") + + new_check: dict = await Check.insert(data=check_data, returning="*", as_dict=True) # type: ignore if has_changed: queue.enqueue( diff --git a/udata_hydra/db/check.py b/udata_hydra/db/check.py index 844d7fb0..ce6179af 100644 --- a/udata_hydra/db/check.py +++ b/udata_hydra/db/check.py @@ -13,8 +13,24 @@ class Check: """Represents a check in the "checks" DB table""" + @staticmethod + def _convert_to_dict_if_needed(result: Record | None, as_dict: bool) -> Record | dict | None: + if as_dict and result: + return dict(result) + return result + + @staticmethod + def _convert_list_to_dict_if_needed( + results: list[Record], as_dict: bool + ) -> list[Record] | list[dict]: + if as_dict: + return [dict(result) for result in results] + return results + @classmethod - async def get_by_id(cls, check_id: int, with_deleted: bool = False) -> Record | None: + async def get_by_id( + cls, check_id: int, with_deleted: bool = False, as_dict: bool = False + ) -> Record | dict | None: pool = await context.pool() async with pool.acquire() as connection: q = """ @@ -24,12 +40,13 @@ async def get_by_id(cls, check_id: int, with_deleted: bool = False) -> Record | """ if not with_deleted: q += " AND catalog.deleted = FALSE" - return await connection.fetchrow(q, check_id) + result = await connection.fetchrow(q, check_id) + return cls._convert_to_dict_if_needed(result, as_dict) @classmethod async def get_by_resource_id( - cls, resource_id: str, with_deleted: bool = False - ) -> Record | None: + cls, resource_id: str, with_deleted: bool = False, as_dict: bool = False + ) -> Record | dict | None: pool = await context.pool() async with pool.acquire() as connection: q = """ @@ -39,10 +56,11 @@ async def get_by_resource_id( """ if not with_deleted: q += " AND catalog.deleted = FALSE" - return await connection.fetchrow(q, resource_id) + result = await connection.fetchrow(q, resource_id) + return cls._convert_to_dict_if_needed(result, as_dict) @classmethod - async def get_by_url(cls, url: str) -> list[Record]: + async def get_by_url(cls, url: str, as_dict: bool = False) -> list[Record] | list[dict]: pool = await context.pool() async with pool.acquire() as connection: q = """ @@ -50,12 +68,13 @@ async def get_by_url(cls, url: str) -> list[Record]: WHERE url = $1 ORDER BY created_at DESC """ - return await connection.fetch(q, url) + results = await connection.fetch(q, url) + return cls._convert_list_to_dict_if_needed(results, as_dict) @classmethod async def get_latest( - cls, url: str | None = None, resource_id: str | None = None - ) -> Record | None: + cls, url: str | None = None, resource_id: str | None = None, as_dict: bool = False + ) -> Record | dict | None: column: str = "url" if url else "resource_id" pool = await context.pool() async with pool.acquire() as connection: @@ -66,10 +85,13 @@ async def get_latest( WHERE catalog.{column} = $1 AND checks.id = catalog.last_check """ - return await connection.fetchrow(q, url or resource_id) + result = await connection.fetchrow(q, url or resource_id) + return cls._convert_to_dict_if_needed(result, as_dict) @classmethod - async def get_all(cls, url: str | None = None, resource_id: str | None = None) -> list[Record]: + async def get_all( + cls, url: str | None = None, resource_id: str | None = None, as_dict: bool = False + ) -> list[Record] | list[dict]: column: str = "url" if url else "resource_id" pool = await context.pool() async with pool.acquire() as connection: @@ -81,12 +103,13 @@ async def get_all(cls, url: str | None = None, resource_id: str | None = None) - AND catalog.{column} = checks.{column} ORDER BY created_at DESC """ - return await connection.fetch(q, url or resource_id) + results = await connection.fetch(q, url or resource_id) + return cls._convert_list_to_dict_if_needed(results, as_dict) @classmethod async def get_group_by_for_date( - cls, column: str, date: date, page_size: int = 20 - ) -> list[Record]: + cls, column: str, date: date, page_size: int = 20, as_dict: bool = False + ) -> list[Record] | list[dict]: pool = await context.pool() async with pool.acquire() as connection: q = f""" @@ -97,22 +120,20 @@ async def get_group_by_for_date( ORDER BY count desc LIMIT $2 """ - return await connection.fetch(q, date, page_size) + results = await connection.fetch(q, date, page_size) + return cls._convert_list_to_dict_if_needed(results, as_dict) @classmethod - async def insert(cls, data: dict, returning: str = "id") -> dict: + async def insert(cls, data: dict, returning: str = "id", as_dict: bool = True) -> Record | dict: """ Insert a new check in DB, associate it with the resource and return the check dict, optionally associated with the resource dataset_id. This uses the info from the last check of the same resource. - - Note: Returns dict instead of Record because this method performs additional operations beyond simple insertion (joins with catalog table, adds dataset_id). """ json_data = convert_dict_values_to_json(data) q1: str = compute_insert_query(table_name="checks", data=json_data, returning=returning) pool = await context.pool() async with pool.acquire() as connection: last_check: Record = await connection.fetchrow(q1, *json_data.values()) - last_check_dict = dict(last_check) q2 = ( """UPDATE catalog SET last_check = $1 WHERE resource_id = $2 RETURNING dataset_id""" ) @@ -121,17 +142,21 @@ async def insert(cls, data: dict, returning: str = "id") -> dict: ) # Add the dataset_id arg to the check response, if we can, and if it's asked if returning in ["*", "dataset_id"] and updated_resource: + last_check_dict = dict(last_check) last_check_dict["dataset_id"] = updated_resource["dataset_id"] - return last_check_dict + return last_check_dict if as_dict else last_check + return dict(last_check) if as_dict else last_check @classmethod - async def update(cls, check_id: int, data: dict) -> Record | None: - """Update a check in DB with new data and return the check id in DB""" - return await update_table_record(table_name="checks", record_id=check_id, data=data) + async def update(cls, check_id: int, data: dict, as_dict: bool = False) -> Record | dict | None: + check: Record | None = await update_table_record( + table_name="checks", record_id=check_id, data=data + ) + return cls._convert_to_dict_if_needed(check, as_dict) @classmethod - async def delete(cls, check_id: int) -> int: + async def delete(cls, check_id: int) -> None: pool = await context.pool() async with pool.acquire() as connection: q = """DELETE FROM checks WHERE id = $1""" - return await connection.fetch(q, check_id) + await connection.execute(q, check_id) diff --git a/udata_hydra/db/resource.py b/udata_hydra/db/resource.py index 371f920c..4468c631 100644 --- a/udata_hydra/db/resource.py +++ b/udata_hydra/db/resource.py @@ -157,26 +157,23 @@ def get_excluded_clause() -> str: ) @staticmethod - async def get_stuck_resources() -> list[str]: + async def clean_up_statuses() -> int: """Some resources end up being stuck in a not null status forever, - we want to get them back on track. - This returns all resource ids of such stuck resources. - """ - threshold = ( - datetime.now(timezone.utc) - timedelta(seconds=config.STUCK_THRESHOLD_SECONDS) - ).strftime("%Y-%m-%d %H:%M:%S") - q = f"""SELECT ca.resource_id - FROM checks c - JOIN catalog ca - ON c.id = ca.last_check - WHERE ca.status IS NOT NULL AND c.created_at < '{threshold}';""" + Reset status to None for all those stuck resources in a single query.""" + threshold = datetime.now(timezone.utc) - timedelta(seconds=config.STUCK_THRESHOLD_SECONDS) + pool = await context.pool() async with pool.acquire() as connection: - rows = await connection.fetch(q) - return [str(r["resource_id"]) for r in rows] if rows else [] - - @classmethod - async def clean_up_statuses(cls): - stuck_resources: list[str] = await cls.get_stuck_resources() - for rid in stuck_resources: - await cls.update(rid, {"status": None}) + # Update all stuck resources in a single query + q = """ + UPDATE catalog + SET status = NULL, status_since = $1 + WHERE resource_id IN ( + SELECT ca.resource_id + FROM checks c + JOIN catalog ca ON c.id = ca.last_check + WHERE ca.status IS NOT NULL AND c.created_at < $2 + ) + """ + result = await connection.execute(q, datetime.now(timezone.utc), threshold) + return result # Returns the number of affected rows diff --git a/udata_hydra/routes/checks.py b/udata_hydra/routes/checks.py index 88981240..ad0f9cdb 100644 --- a/udata_hydra/routes/checks.py +++ b/udata_hydra/routes/checks.py @@ -16,22 +16,22 @@ async def get_latest_check(request: web.Request) -> web.Response: """Get the latest check for a given URL or resource_id""" url, resource_id = get_request_params(request, params_names=["url", "resource_id"]) - data: Record | None = await Check.get_latest(url, resource_id) + data: dict | None = await Check.get_latest(url, resource_id, as_dict=True) # type: ignore if not data: raise web.HTTPNotFound() if data["deleted"]: raise web.HTTPGone() - return web.json_response(CheckSchema().dump(dict(data))) + return web.json_response(CheckSchema().dump(data)) async def get_all_checks(request: web.Request) -> web.Response: url, resource_id = get_request_params(request, params_names=["url", "resource_id"]) - data: list | None = await Check.get_all(url, resource_id) - if not data: + checks: list | None = await Check.get_all(url, resource_id, as_dict=True) + if not checks: raise web.HTTPNotFound() - return web.json_response([CheckSchema().dump(dict(r)) for r in data]) + return web.json_response([CheckSchema().dump(c) for c in checks]) async def get_checks_aggregate(request: web.Request) -> web.Response: @@ -49,11 +49,11 @@ async def get_checks_aggregate(request: web.Request) -> web.Response: column: str = request.query.get("group_by") if not column: raise web.HTTPBadRequest(text="Missing mandatory 'group_by' param.") - data: list | None = await Check.get_group_by_for_date(column, created_at_date) - if not data: + checks: list | None = await Check.get_group_by_for_date(column, created_at_date, as_dict=True) + if not checks: raise web.HTTPNotFound() - return web.json_response([CheckGroupBy().dump(dict(r)) for r in data]) + return web.json_response([CheckGroupBy().dump(c) for c in checks]) async def create_check(request: web.Request) -> web.Response: @@ -89,8 +89,8 @@ async def create_check(request: web.Request) -> web.Response: ) context.monitor().refresh(status) - check: Record | None = await Check.get_latest(url, resource_id) + check: dict | None = await Check.get_latest(url, resource_id, as_dict=True) # type: ignore if not check: raise web.HTTPBadRequest(text=f"Check not created, status: {status}") - return web.json_response(CheckSchema().dump(dict(check)), status=201) + return web.json_response(CheckSchema().dump(check), status=201) diff --git a/udata_hydra/utils/errors.py b/udata_hydra/utils/errors.py index c61353cf..148b0c28 100644 --- a/udata_hydra/utils/errors.py +++ b/udata_hydra/utils/errors.py @@ -2,7 +2,6 @@ from datetime import datetime, timezone import sentry_sdk -from asyncpg import Record from udata_hydra import context from udata_hydra.db.check import Check @@ -111,8 +110,8 @@ class IOException(ExceptionWithSentryDetails): async def handle_parse_exception( - e: IOException | ParseException, table_name: str | None, check: Record | None -) -> Record | None: + e: IOException | ParseException, table_name: str | None, check: dict | None +) -> dict | None: """Specific IO/ParseException handling. Store error in :check: if in a check context. Also cleanup :table_name: if needed.""" if table_name is not None and (check and not check.get("parsing_table")): # only deleting the table if we have not successfully completed csv_to_db