From 357ed61d73c42d2b2482f7fda00ff5ca4082199f Mon Sep 17 00:00:00 2001 From: Darren <3921919+pendingintent@users.noreply.github.com> Date: Thu, 5 Mar 2026 15:13:46 -0500 Subject: [PATCH 1/7] Fixed bc property population and bc label,description population --- src/soa_builder/web/app.py | 330 +++++++++++++++++-------------------- 1 file changed, 153 insertions(+), 177 deletions(-) diff --git a/src/soa_builder/web/app.py b/src/soa_builder/web/app.py index cf1f79f..5634029 100644 --- a/src/soa_builder/web/app.py +++ b/src/soa_builder/web/app.py @@ -103,6 +103,7 @@ from .utils import ( get_cdisc_api_key as _get_cdisc_api_key, get_concepts_override as _get_concepts_override, + get_next_alias_code_uid as _get_next_alias_code_uid, get_next_code_uid as _get_next_code_uid, get_next_concept_uid as _get_next_concept_uid, load_epoch_type_options, @@ -1843,7 +1844,7 @@ async def lifespan(app: FastAPI): ) def _run_enrichment_pool(_rows=_unenriched): - with ThreadPoolExecutor(max_workers=4) as _pool: + with ThreadPoolExecutor(max_workers=1) as _pool: for _concept_code, _soa_id in _rows: _pool.submit(_enrich_code_bg, _concept_code, _soa_id) @@ -2462,18 +2463,33 @@ def _lookup_and_save_dss(soa_id: int, activity_id: int, concept_code: str) -> No pass # silent failure — DSS column remains unset; user can assign manually -def _populate_bc_properties_bg( - soa_id: int, activity_id: int, concept_code: str -) -> None: - """Background task: populate biomedical_concept_property from DSS variables. - - Prefers activity_concept.dss_href (set by manual or auto DSS assignment) over - fresh step-1 discovery, so the exact selected DSS drives property population. - UIDs are immutable — existing rows are skipped. All writes in one transaction. +async def _populate_bc_properties_bg(soa_id: int, activity_id: int, concept_code: str): """ - import os - import requests as _requests + Background task: fetch DSS variables, parse them, insert BiomedicalConceptProperty rows. + Restructured to only hold database lock during quick writes, not during slow API calls. + """ + # Step 1: Quick read - fetch activity concept data (NO WRITE LOCK YET) + conn = _connect() + cur = conn.cursor() + cur.execute( + """ + SELECT ac.concept_uid, ac.dss_href + FROM activity_concept ac + WHERE ac.soa_id = ? AND ac.activity_id = ? AND ac.concept_code = ? + """, + (soa_id, activity_id, concept_code), + ) + row = cur.fetchone() + conn.close() # Close connection immediately after read + if not row: + return + + concept_uid, dss_href = row + if not dss_href: + return + + # Step 2: Slow API calls (NO DATABASE CONNECTION HELD) api_key = os.environ.get("CDISC_API_KEY") or os.environ.get( "CDISC_SUBSCRIPTION_KEY" ) @@ -2486,170 +2502,130 @@ def _populate_bc_properties_bg( headers["api-key"] = api_key try: - # Read existing dss_href and bc_uid from DB before any API call - _conn = _connect() - _cur = _conn.cursor() - _cur.execute( - "SELECT concept_uid, dss_href FROM activity_concept" - " WHERE activity_id=? AND concept_code=? AND soa_id=?", - (activity_id, concept_code, soa_id), - ) - _ac = _cur.fetchone() - _conn.close() - bc_uid = _ac[0] if _ac else None - dss_href = (_ac[1] if _ac else None) or None - if not bc_uid: - return - - if dss_href: - # Use the already-known href — skip step-1 - if dss_href.startswith("/"): - dss_href = "https://api.library.cdisc.org/api/cosmos/v2" + dss_href - else: - # Step 1: discover DSS href for this concept - list_url = ( - "https://api.library.cdisc.org/api/cosmos/v2/mdr/specializations" - "/datasetspecializations?biomedicalconcept=" + concept_code - ) - r1 = _requests.get(list_url, headers=headers, timeout=15) - if r1.status_code != 200: - return - data1 = r1.json() - sdtm_links = data1["_links"]["datasetSpecializations"]["sdtm"] - if not sdtm_links: - return - dss_href = sdtm_links[0]["href"] - if dss_href.startswith("/"): - dss_href = "https://api.library.cdisc.org/api/cosmos/v2" + dss_href - - # Step 2: fetch DSS detail - r2 = _requests.get(dss_href, headers=headers, timeout=15) - if r2.status_code != 200: + resp = requests.get(dss_href, headers=headers, timeout=15) + if resp.status_code != 200: return - raw = r2.json() + dss_data = resp.json() + except (requests.RequestException, ValueError) as e: + print(f"API error fetching DSS for {concept_code}: {e}") + return - variables = raw.get("variables") or [] - if not variables: - return + # Step 3: Parse response data (NO DATABASE CONNECTION) + variables = dss_data.get("variables", []) + if not variables: + return - pkg_href = (raw.get("_links") or {}).get("parentPackage") or {} - pkg_href = pkg_href.get("href", "") if isinstance(pkg_href, dict) else "" - try: - code_system_version = pkg_href.split("/")[5] - except Exception: - code_system_version = "" + # Extract parent package info for code system + try: + dss_code_system = dss_data["_links"]["parentPackage"]["href"] + dss_code_system_version = dss_code_system.split("/")[5] + except (KeyError, TypeError, IndexError): + dss_code_system = "" + dss_code_system_version = "" + + # Prepare all insert data in memory (no DB connection held during API work) + properties_to_insert = [] + for var in variables: + var_concept_id = var.get("dataElementConceptId", "") + name = var.get("name", "") + label = var.get("label", name) + is_required = var.get("mandatoryVariable", False) + datatype = var.get("dataType", "") + + properties_to_insert.append( + { + "concept_uid": concept_uid, + "name": name, + "label": label, + "is_required": is_required, + "datatype": datatype, + "var_concept_id": var_concept_id, + } + ) - # Step 3: persist — single write-locked transaction - conn = _connect() - conn.isolation_level = None # manual transaction management - cur = conn.cursor() - try: - cur.execute("BEGIN IMMEDIATE") # acquire write lock before UID reads + # Step 4: NOW acquire write lock and do fast inserts + conn = _connect() + conn.isolation_level = None # manual transaction management + cur = conn.cursor() + try: + cur.execute("BEGIN IMMEDIATE") - for var in variables: - var_concept_id = var.get("dataElementConceptId") - if not var_concept_id: - continue - var_name = var.get("name") - var_required = var.get("mandatoryVariable") - var_datatype = var.get("dataType") + # Delete existing properties for this concept + cur.execute( + "DELETE FROM biomedical_concept_property" + " WHERE biomedical_concept_uid = ? AND soa_id = ?", + (concept_uid, soa_id), + ) - # skip if this named property already exists for this BC — UIDs are immutable - cur.execute( - "SELECT id FROM biomedical_concept_property" - " WHERE soa_id=? AND biomedical_concept_uid=? AND name=?", - (soa_id, bc_uid, var_name), - ) - if cur.fetchone(): - continue + # Insert all new properties, creating code + alias_code rows per property + for prop in properties_to_insert: + # code row: stores the CDISC dataElementConceptId + code_uid = _get_next_code_uid(cur, soa_id) + cur.execute( + "INSERT INTO code" + " (soa_id, code_uid, code, code_system, code_system_version, decode)" + " VALUES (?,?,?,?,?,?)", + ( + soa_id, + code_uid, + prop["var_concept_id"], + dss_code_system, + dss_code_system_version, + prop["name"], + ), + ) - # always create a new code row for this property (never reuse) - cur.execute( - "SELECT code_uid FROM code WHERE soa_id=? AND code_uid LIKE 'Code_%'" - " UNION" - " SELECT code_uid FROM code_association WHERE soa_id=? AND code_uid LIKE 'Code_%'", - (soa_id, soa_id), - ) - existing_codes = [x[0] for x in cur.fetchall() if x[0]] - code_n = ( - max((int(x.split("_")[1]) for x in existing_codes), default=0) + 1 - ) - code_uid = f"Code_{code_n}" - cur.execute( - "INSERT INTO code" - " (soa_id, code_uid, code, code_system, code_system_version, decode)" - " VALUES (?,?,?,?,?,?)", - ( - soa_id, - code_uid, - var_concept_id, - pkg_href, - code_system_version, - var_name, - ), - ) + # alias_code row pointing at the code row + alias_uid = _get_next_alias_code_uid(cur, soa_id) + cur.execute( + "INSERT INTO alias_code (soa_id, alias_code_uid, standard_code)" + " VALUES (?,?,?)", + (soa_id, alias_uid, code_uid), + ) - # always create a new alias_code row for this property (never reuse) - cur.execute( - "SELECT alias_code_uid FROM alias_code" - " WHERE soa_id=? AND alias_code_uid LIKE 'AliasCode_%'", - (soa_id,), - ) - existing_aliases = [x[0] for x in cur.fetchall() if x[0]] - alias_n = ( - max((int(x.split("_")[1]) for x in existing_aliases), default=0) + 1 - ) - alias_uid = f"AliasCode_{alias_n}" - cur.execute( - "INSERT INTO alias_code (soa_id, alias_code_uid, standard_code)" - " VALUES (?,?,?)", - (soa_id, alias_uid, code_uid), - ) + # generate monotonic BiomedicalConceptProperty_N uid + cur.execute( + "SELECT MAX(CAST(SUBSTR(biomedical_concept_property_uid, 27) AS INTEGER))" + " FROM biomedical_concept_property" + " WHERE soa_id=?" + " AND biomedical_concept_property_uid LIKE 'BiomedicalConceptProperty_%'", + (soa_id,), + ) + n = (cur.fetchone()[0] or 0) + 1 + bcp_uid = f"BiomedicalConceptProperty_{n}" - # generate monotonic BiomedicalConceptProperty_N uid - cur.execute( - "SELECT biomedical_concept_property_uid FROM biomedical_concept_property" - " WHERE soa_id=? AND biomedical_concept_property_uid" - " LIKE 'BiomedicalConceptProperty_%'", - (soa_id,), - ) - existing_uids = [r[0] for r in cur.fetchall() if r[0]] - n = max((int(u.split("_")[1]) for u in existing_uids), default=0) + 1 - bcp_uid = f"BiomedicalConceptProperty_{n}" + cur.execute( + "INSERT INTO biomedical_concept_property" + " (soa_id, biomedical_concept_uid, biomedical_concept_property_uid," + " name, label, isRequired, datatype, code)" + " VALUES (?,?,?,?,?,?,?,?)", + ( + soa_id, + prop["concept_uid"], + bcp_uid, + prop["name"], + prop["label"], + prop["is_required"], + prop["datatype"], + alias_uid, + ), + ) - cur.execute( - "INSERT INTO biomedical_concept_property" - " (soa_id, biomedical_concept_uid, biomedical_concept_property_uid," - " name, label, isRequired, datatype, code)" - " VALUES (?,?,?,?,?,?,?,?)", - ( - soa_id, - bc_uid, - bcp_uid, - var_name, - var_name, - var_required, - var_datatype, - alias_uid, - ), - ) + cur.execute("COMMIT") - cur.execute("COMMIT") - except Exception as _exc: - try: - cur.execute("ROLLBACK") - except Exception: - pass - logger.warning( - "_populate_bc_properties_bg: soa_id=%s concept=%s failed: %s", - soa_id, - concept_code, - _exc, - ) - finally: - conn.close() - except Exception: - pass # steps 1/2 network errors already printed above + except Exception as exc: + try: + cur.execute("ROLLBACK") + except Exception: + pass + logger.warning( + "_populate_bc_properties_bg: soa_id=%s concept=%s failed: %s", + soa_id, + concept_code, + exc, + ) + finally: + conn.close() def _upsert_code(cur, soa_id: int, concept_code: str): @@ -2879,6 +2855,7 @@ def _enrich_biomedical_concept_bg(concept_code: str, soa_id: int) -> None: if api_key: headers["Authorization"] = f"Bearer {api_key}" headers["api-key"] = api_key + conn = None try: url = ( "https://api.library.cdisc.org/api/cosmos/v2/mdr/bc/biomedicalconcepts/" @@ -2893,23 +2870,22 @@ def _enrich_biomedical_concept_bg(concept_code: str, soa_id: int) -> None: conn = _connect() cur = conn.cursor() cur.execute( - "SELECT ac.alias_code_uid FROM alias_code ac " - "JOIN code c ON ac.standard_code = c.code_uid " - "WHERE c.soa_id=? AND c.code=?", - (soa_id, concept_code), - ) - row = cur.fetchone() - if not row: - conn.close() - return - cur.execute( - "UPDATE biomedical_concept SET label=?, description=? WHERE code=? AND soa_id=?", - (label, description, row[0], soa_id), + """ + UPDATE biomedical_concept SET label=?, description=? + WHERE soa_id=? + AND biomedical_concept_uid IN ( + SELECT concept_uid FROM activity_concept + WHERE soa_id=? AND concept_code=? AND concept_uid IS NOT NULL + ) + """, + (label, description, soa_id, soa_id, concept_code), ) conn.commit() - conn.close() except Exception: pass + finally: + if conn: + conn.close() def _cleanup_orphaned_concept_rows(cur, soa_id: int, removed_pairs) -> None: From b2c33cd6d0414120d200e9b595203eac65156906 Mon Sep 17 00:00:00 2001 From: Darren <3921919+pendingintent@users.noreply.github.com> Date: Thu, 5 Mar 2026 16:07:23 -0500 Subject: [PATCH 2/7] Query most values from database tables with minimal API request; performance improvements included --- src/usdm/generate_biomedical_concepts.py | 374 +++++++++-------------- 1 file changed, 137 insertions(+), 237 deletions(-) diff --git a/src/usdm/generate_biomedical_concepts.py b/src/usdm/generate_biomedical_concepts.py index 68012dc..f49aee6 100644 --- a/src/usdm/generate_biomedical_concepts.py +++ b/src/usdm/generate_biomedical_concepts.py @@ -1,54 +1,17 @@ #!/usr/bin/env python3 -# Prefer absolute import; fallback to adding src/ to sys.path when run directly +from concurrent.futures import ThreadPoolExecutor from typing import Optional, List, Dict, Any import functools import os import requests -try: - from soa_builder.web.app import _connect # reuse existing DB connector -except ImportError: - import sys - from pathlib import Path - - here = Path(__file__).resolve() - src_dir = here.parents[2] / "src" - if src_dir.exists() and str(src_dir) not in sys.path: - sys.path.insert(0, str(src_dir)) - from soa_builder.web.app import _connect # type: ignore - -try: - from soa_builder.web.utils import ( - _get_biomedical_concept_ids as _get_biomedical_concept_ids, - ) -except ImportError: - import sys - from pathlib import Path - - sys.path.insert(0, str(Path(__file__).parent.parent)) - from soa_builder.web.utils import ( - _get_biomedical_concept_ids as _get_biomedical_concept_ids, - ) - -try: - from soa_builder.web.utils import _nz as _nz -except ImportError: - import sys - from pathlib import Path - - sys.path.insert(0, str(Path(__file__).parent.parent)) - from soa_builder.web.utils import _nz - +from soa_builder.web.utils import _connect # GLobal API URL prefix URL_PREFIX = "https://api.library.cdisc.org/api/cosmos/v2/" -@functools.lru_cache(maxsize=256) -def _get_concept_by_code(concept_code: str) -> Optional[Dict[str, Any]]: - """Fetch full concept data for concept_code from the CDISC Library API.""" - - url = URL_PREFIX + "mdr/bc/biomedicalconcepts/" + concept_code +def _build_api_headers() -> dict: api_key = os.environ.get("CDISC_API_KEY") or os.environ.get( "CDISC_SUBSCRIPTION_KEY" ) @@ -59,153 +22,107 @@ def _get_concept_by_code(concept_code: str) -> Optional[Dict[str, Any]]: if api_key: headers["Authorization"] = f"Bearer {api_key}" headers["api-key"] = api_key + return headers + +@functools.lru_cache(maxsize=128) +def _fetch_dss_variable_map(dss_href: str) -> Dict[str, List[str]]: + """Fetch a DSS href once and return {variable_name: valueList}. Cached per href.""" try: - resp = requests.get(url, headers=headers, timeout=15) + resp = requests.get(dss_href, headers=_build_api_headers(), timeout=15) if resp.status_code != 200: - return None + return {} data = resp.json() - if not isinstance(data, dict): - return None - code = data.get("conceptId") - title = data.get("title") or data.get("name") or data.get("label") or code - return {"code": str(code), "title": str(title), "href": url, "raw": data} - except Exception: - return None + return {v["name"]: v.get("valueList", []) for v in data.get("variables", [])} + except (requests.RequestException, ValueError) as e: + print(f"Error fetching DSS variable map: {e}") + return {} -def _get_dss_url_from_concept(concept_code: str) -> str: - """Helper to fetch url for dataset specialization using biomedical concept code.""" - - url = ( - URL_PREFIX - + "mdr/specializations/datasetspecializations?biomedicalconcept=" - + concept_code - ) - api_key = os.environ.get("CDISC_API_KEY") or os.environ.get( - "CDISC_SUBSCRIPTION_KEY" +@functools.lru_cache(maxsize=256) +def _get_dss_response_codes( + biomedical_concept_uid: str, variable_name: str, soa_id: int +) -> List[str]: + """Return responseCodes for a single DSS variable. Cached per (bc_uid, name, soa_id).""" + conn = _connect() + cur = conn.cursor() + cur.execute( + "SELECT dss_href FROM activity_concept WHERE concept_uid=? AND soa_id=?", + (biomedical_concept_uid, soa_id), ) - subscription_key = os.environ.get("CDISC_SUBSCRIPTION_KEY") or api_key - headers: dict = {"Accept": "application/json"} - if subscription_key: - headers["Ocp-Apim-Subscription-Key"] = subscription_key - if api_key: - headers["Authorization"] = f"Bearer {api_key}" - headers["api-key"] = api_key - try: - resp = requests.get(url, headers=headers, timeout=15) - if resp.status_code != 200: - return None - data = resp.json() - if not isinstance(data, dict): - return None - sdtm_links = data["_links"]["datasetSpecializations"]["sdtm"] or None - href = sdtm_links[0]["href"] - if href.startswith("/"): - href = "https://api.library.cdisc.org/api/cosmos/v2" + href - return href - except Exception: - return None + row = cur.fetchone() + conn.close() + if not row or not row[0]: + return [] + return _fetch_dss_variable_map(row[0]).get(variable_name, []) @functools.lru_cache(maxsize=256) -def _get_dss_by_url(url: str) -> Optional[Dict[str, Any]]: - """Helper to return the raw response from request to the DSS Library API.""" - url = url - api_key = os.environ.get("CDISC_API_KEY") or os.environ.get( - "CDISC_SUBSCRIPTION_KEY" - ) - subscription_key = os.environ.get("CDISC_SUBSCRIPTION_KEY") or api_key - headers: dict = {"Accept": "application/json"} - if subscription_key: - headers["Ocp-Apim-Subscription-Key"] = subscription_key - if api_key: - headers["Authorization"] = f"Bearer {api_key}" - headers["api-key"] = api_key +def _get_biomedical_concept_synonyms(concept_code: str) -> List[str]: + """Fetch the synonyms of a biomedical concept using the CDISC API. Cached per code.""" + url = URL_PREFIX + "mdr/bc/biomedicalconcepts/" + concept_code try: - resp = requests.get(url, headers=headers, timeout=15) + resp = requests.get(url, headers=_build_api_headers(), timeout=15) if resp.status_code != 200: - return None + return [] data = resp.json() - if not isinstance(data, dict): - return None - return {"raw": data} - except Exception: - return None + return data.get("synonyms", []) + except (requests.RequestException, ValueError) as e: + print(f"Error fetching biomedical concept synonyms: {e}") + return [] -def _get_bc_properties( - bc_raw_data: Dict[str, Any], dss_raw_data: Dict[str, Any] -) -> Optional[Dict[str, Any]]: - """Helper to construct the USDM JSON properties attribute for biomedical concept - - - id: "BiomedicalConceptProperty_{}", - - extensionAttributes": [], - - name: "", - - label: "", - - isRequired: None, - - isEnabled": None, - - datatype": "", - - responseCodes": {}, - - code: { - - id - - extensionAttributes - - code - - codeSystem - - codeSystemVersion - - decode - - instanceType: "Code" - }, - - notes": [], - "instanceType": "BiomedicalConceptProperty" - """ - try: - concept_ids = [ - dec["conceptId"] - for dec in bc_raw_data["raw"]["dataElementConcepts"] - if "conceptId" in dec - ] - except: - concept_ids = [] +def _get_biomedical_concept_properties( + soa_id: int, biomedical_concept_uid: str +) -> Optional[Dict[str, any]]: + """Fetch biomedical concept properties from the database using the BiomedicalConcept_{}.""" + conn = _connect() + cur = conn.cursor() + cur.execute( + """ + SELECT + p.biomedical_concept_property_uid id, + p.name name, + p.label label, + p.isRequired isRequired, + p.datatype datatype, + p.biomedical_concept_uid biomedical_concept_uid, + a.alias_code_uid alias_code_uid, + a.standard_code standard_code, + c.code code, + c.code_system code_system, + c.code_system_version code_system_version, + c.decode decode + FROM biomedical_concept_property p + INNER JOIN alias_code a ON p.code = a.alias_code_uid AND p.soa_id = a.soa_id + INNER JOIN code c ON a.standard_code = c.code_uid AND a.soa_id = c.soa_id + WHERE p.soa_id = ? AND p.biomedical_concept_uid = ? + ORDER BY p.id; + """, + (soa_id, biomedical_concept_uid), + ) + rows = cur.fetchall() + cur.close() + conn.close() out: List[Dict[str, Any]] = [] - try: - dss_vars = dss_raw_data["raw"]["variables"] - except Exception: - dss_vars = [] - - try: - dss_code_system = dss_raw_data["raw"]["_links"]["parentPackage"]["href"] - dss_code_system_version = dss_code_system.split("/")[5] - except: - dss_code_system = "" - dss_code_system_version = "" - - # dss_url = _get_dss_url_from_concept(bc_raw_data["code"]) - for idx, concept in enumerate(concept_ids): - # print("idx: " + str(idx) + ", " + concept) - id = "BiomedicalConceptProperty_" + str(idx) - dss_var = dss_vars[idx] if idx < len(dss_vars) else {} - if "shortName" in bc_raw_data["raw"]["dataElementConcepts"][idx]: - name = bc_raw_data["raw"]["dataElementConcepts"][idx]["shortName"] - else: - name = "" - if "shortName" in bc_raw_data["raw"]["dataElementConcepts"][idx]: - label = bc_raw_data["raw"]["dataElementConcepts"][idx]["shortName"] - else: - label = "" - isRequired = dss_var.get("mandatoryVariable", "") - isEnabled = "" - if "dataType" in bc_raw_data["raw"]["dataElementConcepts"][idx]: - datatype = bc_raw_data["raw"]["dataElementConcepts"][idx]["dataType"] - else: - datatype = "" - responseCodes = dss_var.get("valueList", []) - decode = dss_var.get("name", "") - code = bc_raw_data["raw"]["dataElementConcepts"][idx]["conceptId"] - notes = [] - instanceType = "BiomedicalConceptProperty" + for r in rows: + id = r[0] + name = r[1] + label = r[2] + isRequired = bool(r[3]) + datatype = r[4] + bc_uid = r[5] + alias_code_uid = r[6] + standard_code = r[7] + code = r[8] + code_system = r[9] + code_system_version = r[10] + decode = r[11] + + isEnabled = None + response_codes = _get_dss_response_codes(bc_uid, name, soa_id) property = { "id": id, @@ -214,24 +131,25 @@ def _get_bc_properties( "isRequired": isRequired, "isEnabled": isEnabled, "datatype": datatype, - "responseCodes": responseCodes, + "responseCodes": response_codes, "code": { - "id": "AliasCode_" + str(idx), + "id": alias_code_uid, "extensionAttributes": [], "standardCode": { - "id": "Code_{}", + "id": standard_code, "extensionAttributes": [], "code": code, - "codeSystem": dss_code_system, # parentPackage/href of the DSS - "codeSystemVersion": dss_code_system_version, + "codeSystem": code_system, + "codeSystemVersion": code_system_version, "decode": decode, "instanceType": "Code", }, }, - "notes": notes, - "instanceType": instanceType, + "notes": [], + "instanceType": "BiomedicalConceptProperty", } out.append(property) + return out @@ -254,87 +172,69 @@ def build_usdm_biomedical_concepts(soa_id: int) -> List[Dict[str, Any]]: cur = conn.cursor() cur.execute( """ - SELECT concept_uid,concept_code,concept_title,dss_href FROM activity_concept WHERE soa_id=? - ORDER BY COALESCE(concept_uid, 'zzz') + SELECT + bc.biomedical_concept_uid id, + bc.name name, + bc.label label, + bc.code alias_code, + ac.concept_code concept_code, + ac.dss_href reference, + c.code_uid code_uid, + c.code_system code_system, + c.code_system_version code_system_version, + c.decode decode + FROM biomedical_concept bc + INNER JOIN activity_concept ac ON bc.biomedical_concept_uid = ac.concept_uid AND bc.soa_id = ac.soa_id + INNER JOIN alias_code a ON bc.code = a.alias_code_uid AND bc.soa_id = a.soa_id + INNER JOIN code c ON a.standard_code = c.code_uid AND a.soa_id = c.soa_id + WHERE bc.soa_id = ? + ORDER BY bc.id; """, (soa_id,), ) rows = cur.fetchall() conn.close() - code_system = ( - "https://api.library.cdisc.org/api/cosmos/v2/mdr/bc/biomedicalconcepts/" - ) + # Prefetch all synonyms in parallel — one API call per concept, all concurrent + concept_codes = [r[4] for r in rows] + with ThreadPoolExecutor(max_workers=8) as pool: + synonyms_list = list(pool.map(_get_biomedical_concept_synonyms, concept_codes)) + synonyms_map = dict(zip(concept_codes, synonyms_list)) out: List[Dict[str, Any]] = [] - for idx, r in enumerate(rows): - concept_uid = r[0] - concept_code = r[1] - concept_title = r[2] - dss_href = r[3] - - bc_raw_data = _get_concept_by_code(concept_code) - if bc_raw_data is None: - bc_raw_data = {"code": concept_code, "raw": {}} - bc = bc_raw_data - # Use stored dss_href when available; fall back to live lookup - if dss_href: - dss_raw_data = _get_dss_by_url(dss_href) - else: - dss_url = _get_dss_url_from_concept(concept_code) - dss_raw_data = _get_dss_by_url(dss_url) - if dss_raw_data is None: - dss_raw_data = {"raw": {}} - try: - concept_ids = [ - dec["conceptId"] - for dec in bc["raw"]["dataElementConcepts"] - if "conceptId" in dec - ] - except: - concept_ids = [] + for r in rows: + id = r[0] + name = r[1] + label = r[2] + alias_code = r[3] + concept_code = r[4] + reference = r[5] + code_uid = r[6] + code_system = r[7] + code_system_version = r[8] + decode = r[9] - # print(concept_ids) - - try: - synonyms = bc["raw"]["synonyms"] - except: - synonyms = [] - - try: - reference = bc["raw"]["_links"]["parentPackage"]["href"] - version = reference.split("/")[4] - except: - reference = "" - version = "" - try: - label = bc["raw"]["_links"]["self"]["title"] - except: - label = "" - try: - name = bc["raw"]["shortName"] - except: - name = "" + synonyms = synonyms_map[concept_code] biomedical_concept = { - "id": concept_uid, + "id": id, "extensionAttributes": [], "name": name, "label": label, "synonyms": synonyms, - "reference": reference + "/" + bc["code"], - "properties": _get_bc_properties(bc_raw_data, dss_raw_data), + "reference": reference, + "properties": _get_biomedical_concept_properties(soa_id, id), "code": { - "id": "AliasCode_{}", + "id": alias_code, "extensionAttributes": [], "standardCode": { - "id": "Code_{}", + "id": code_uid, "extensionAttributes": [], - "code": bc["code"], - "codeSystem": "https://api.library.cdisc.org/api/cosmos/v2", - "codeSystemVersion": version, - "decode": _nz(concept_title), + "code": concept_code, + "codeSystem": code_system, + "codeSystemVersion": code_system_version, + "decode": decode, "instanceType": "Code", }, "standardCodeAliases": [], From 99169acb3bc27dc784f4ae50410ac781fd133f6d Mon Sep 17 00:00:00 2001 From: Darren <3921919+pendingintent@users.noreply.github.com> Date: Fri, 6 Mar 2026 11:08:43 -0500 Subject: [PATCH 3/7] Refactored usdm generators and extracted helper functions to usdm_utils.py; added caching for API functions --- src/soa_builder/web/utils.py | 14 - src/usdm/generate_activities.py | 42 +- src/usdm/generate_arms.py | 71 +- src/usdm/generate_biomedical_concepts.py | 154 +---- src/usdm/generate_elements.py | 75 +- src/usdm/generate_encounters.py | 131 +--- src/usdm/generate_schedule_timelines.py | 119 +--- .../generate_scheduled_activity_instances.py | 44 +- .../generate_scheduled_decision_instances.py | 46 +- src/usdm/generate_study_cells.py | 44 +- src/usdm/generate_study_epochs.py | 65 +- src/usdm/generate_study_timings.py | 44 +- src/usdm/generate_usdm.py | 42 +- src/usdm/usdm_utils.py | 643 ++++++++++++++++++ 14 files changed, 711 insertions(+), 823 deletions(-) create mode 100644 src/usdm/usdm_utils.py diff --git a/src/soa_builder/web/utils.py b/src/soa_builder/web/utils.py index b715f69..a7875a7 100644 --- a/src/soa_builder/web/utils.py +++ b/src/soa_builder/web/utils.py @@ -48,20 +48,6 @@ # USDM JSON generator helper -def _get_biomedical_concept_ids(soa_id: int, activity_uid: int) -> List[str]: - conn = _connect() - cur = conn.cursor() - cur.execute( - "SELECT concept_uid from activity_concept where soa_id=? and activity_uid=?", - ( - soa_id, - activity_uid, - ), - ) - rows = cur.fetchall() - conn.close() - bc_uids = [r[0] for r in rows] or [] - return bc_uids def redirect_url_from_referer(request: Request, fallback: str) -> str: diff --git a/src/usdm/generate_activities.py b/src/usdm/generate_activities.py index 23ef3b7..16d2d6d 100755 --- a/src/usdm/generate_activities.py +++ b/src/usdm/generate_activities.py @@ -1,40 +1,8 @@ #!/usr/bin/env python3 -# Prefer absolute import; fallback to adding src/ to sys.path when run directly -from typing import Optional, List, Dict, Any - -try: - from soa_builder.web.app import _connect # reuse existing DB connector -except ImportError: - import sys - from pathlib import Path - - here = Path(__file__).resolve() - src_dir = here.parents[2] / "src" - if src_dir.exists() and str(src_dir) not in sys.path: - sys.path.insert(0, str(src_dir)) - from soa_builder.web.app import _connect # type: ignore - -try: - from soa_builder.web.utils import ( - _get_biomedical_concept_ids as _get_biomedical_concept_ids, - ) -except ImportError: - import sys - from pathlib import Path - - sys.path.insert(0, str(Path(__file__).parent.parent)) - from soa_builder.web.utils import ( - _get_biomedical_concept_ids as _get_biomedical_concept_ids, - ) - -try: - from soa_builder.web.utils import _nz as _nz -except ImportError: - import sys - from pathlib import Path - - sys.path.insert(0, str(Path(__file__).parent.parent)) - from soa_builder.web.utils import _nz +from typing import List, Dict, Any +from soa_builder.web.db import _connect +from soa_builder.web.utils import _nz +from .usdm_utils import _get_biomedical_concept_ids def build_usdm_activities(soa_id: int) -> List[Dict[str, Any]]: @@ -83,7 +51,7 @@ def build_usdm_activities(soa_id: int) -> List[Dict[str, Any]]: out: List[Dict[str, Any]] = [] for i, r in enumerate(rows): - id, activity_uid, name, label, description = r[0], r[1], r[2], r[3], r[4] + _, activity_uid, name, label, description = r[0], r[1], r[2], r[3], r[4] aid = activity_uid prev_id = id_by_index.get(i - 1) next_id = id_by_index.get(i + 1) diff --git a/src/usdm/generate_arms.py b/src/usdm/generate_arms.py index 6bcdf48..0b9443b 100644 --- a/src/usdm/generate_arms.py +++ b/src/usdm/generate_arms.py @@ -1,69 +1,8 @@ #!/usr/bin/env python3 -# Prefer absolute import; fallback to adding src/ to sys.path when run directly -from typing import Optional, List, Dict, Any, Tuple - -try: - from soa_builder.web.app import _connect # reuse existing DB connector -except ImportError: - import sys - from pathlib import Path - - here = Path(__file__).resolve() - src_dir = here.parents[2] / "src" - if src_dir.exists() and str(src_dir) not in sys.path: - sys.path.insert(0, str(src_dir)) - from soa_builder.web.app import _connect # type: ignore - - -def _nz(s: Optional[str]) -> Optional[str]: - s = (s or "").strip() - return s or None - - -def _get_type_code_tuple(soa_id: int, code_uid: str) -> Tuple[str, str, str, str]: - conn = _connect() - cur = conn.cursor() - cur.execute( - "SELECT DISTINCT c.codelist_table, p.code,p.cdisc_submission_value,p.dataset_date " - "FROM code_association c INNER JOIN protocol_terminology p ON c.codelist_code = p.codelist_code " - "AND c.code = p.code WHERE c.soa_id=? AND c.code_uid=?", - ( - soa_id, - code_uid, - ), - ) - rows = cur.fetchall() - conn.close() - code_system = [r[0] for r in rows] - code_code = [r[1] for r in rows] - code_decode = [r[2] for r in rows] - code_system_version = [r[3] for r in rows] - - return code_code, code_decode, code_system, code_system_version - - -def _get_data_origin_type_tuple( - soa_id: int, code_uid: str -) -> Tuple[str, str, str, str]: - conn = _connect() - cur = conn.cursor() - cur.execute( - "SELECT DISTINCT c.codelist_table,d.code,d.cdisc_submission_value,d.dataset_date " - "FROM code_association c INNER JOIN ddf_terminology d ON c.codelist_code = d.codelist_code " - "AND c.code = d.code WHERE c.soa_id=? AND c.code_uid=?", - ( - soa_id, - code_uid, - ), - ) - rows = cur.fetchall() - conn.close() - code_system = [r[0] for r in rows] - code_code = [r[1] for r in rows] - code_decode = [r[2] for r in rows] - code_system_version = [r[3] for r in rows] - - return code_code, code_decode, code_system, code_system_version +from typing import List, Dict, Any +from soa_builder.web.utils import _nz +from soa_builder.web.db import _connect +from .usdm_utils import _get_type_code_tuple, _get_data_origin_type_tuple def build_usdm_arms(soa_id: int) -> List[Dict[str, Any]]: @@ -110,7 +49,7 @@ def build_usdm_arms(soa_id: int) -> List[Dict[str, Any]]: out: List[Dict[str, Any]] = [] for i, r in enumerate(rows): - id, arm_uid, name, label, description, type, data_origin_type = ( + _, arm_uid, name, label, description, type, data_origin_type = ( r[0], r[1], r[2], diff --git a/src/usdm/generate_biomedical_concepts.py b/src/usdm/generate_biomedical_concepts.py index f49aee6..fcd7dfa 100644 --- a/src/usdm/generate_biomedical_concepts.py +++ b/src/usdm/generate_biomedical_concepts.py @@ -1,156 +1,12 @@ #!/usr/bin/env python3 from concurrent.futures import ThreadPoolExecutor -from typing import Optional, List, Dict, Any -import functools -import os -import requests +from typing import List, Dict, Any from soa_builder.web.utils import _connect - -# GLobal API URL prefix -URL_PREFIX = "https://api.library.cdisc.org/api/cosmos/v2/" - - -def _build_api_headers() -> dict: - api_key = os.environ.get("CDISC_API_KEY") or os.environ.get( - "CDISC_SUBSCRIPTION_KEY" - ) - subscription_key = os.environ.get("CDISC_SUBSCRIPTION_KEY") or api_key - headers: dict = {"Accept": "application/json"} - if subscription_key: - headers["Ocp-Apim-Subscription-Key"] = subscription_key - if api_key: - headers["Authorization"] = f"Bearer {api_key}" - headers["api-key"] = api_key - return headers - - -@functools.lru_cache(maxsize=128) -def _fetch_dss_variable_map(dss_href: str) -> Dict[str, List[str]]: - """Fetch a DSS href once and return {variable_name: valueList}. Cached per href.""" - try: - resp = requests.get(dss_href, headers=_build_api_headers(), timeout=15) - if resp.status_code != 200: - return {} - data = resp.json() - return {v["name"]: v.get("valueList", []) for v in data.get("variables", [])} - except (requests.RequestException, ValueError) as e: - print(f"Error fetching DSS variable map: {e}") - return {} - - -@functools.lru_cache(maxsize=256) -def _get_dss_response_codes( - biomedical_concept_uid: str, variable_name: str, soa_id: int -) -> List[str]: - """Return responseCodes for a single DSS variable. Cached per (bc_uid, name, soa_id).""" - conn = _connect() - cur = conn.cursor() - cur.execute( - "SELECT dss_href FROM activity_concept WHERE concept_uid=? AND soa_id=?", - (biomedical_concept_uid, soa_id), - ) - row = cur.fetchone() - conn.close() - if not row or not row[0]: - return [] - return _fetch_dss_variable_map(row[0]).get(variable_name, []) - - -@functools.lru_cache(maxsize=256) -def _get_biomedical_concept_synonyms(concept_code: str) -> List[str]: - """Fetch the synonyms of a biomedical concept using the CDISC API. Cached per code.""" - url = URL_PREFIX + "mdr/bc/biomedicalconcepts/" + concept_code - try: - resp = requests.get(url, headers=_build_api_headers(), timeout=15) - if resp.status_code != 200: - return [] - data = resp.json() - return data.get("synonyms", []) - except (requests.RequestException, ValueError) as e: - print(f"Error fetching biomedical concept synonyms: {e}") - return [] - - -def _get_biomedical_concept_properties( - soa_id: int, biomedical_concept_uid: str -) -> Optional[Dict[str, any]]: - """Fetch biomedical concept properties from the database using the BiomedicalConcept_{}.""" - - conn = _connect() - cur = conn.cursor() - cur.execute( - """ - SELECT - p.biomedical_concept_property_uid id, - p.name name, - p.label label, - p.isRequired isRequired, - p.datatype datatype, - p.biomedical_concept_uid biomedical_concept_uid, - a.alias_code_uid alias_code_uid, - a.standard_code standard_code, - c.code code, - c.code_system code_system, - c.code_system_version code_system_version, - c.decode decode - FROM biomedical_concept_property p - INNER JOIN alias_code a ON p.code = a.alias_code_uid AND p.soa_id = a.soa_id - INNER JOIN code c ON a.standard_code = c.code_uid AND a.soa_id = c.soa_id - WHERE p.soa_id = ? AND p.biomedical_concept_uid = ? - ORDER BY p.id; - """, - (soa_id, biomedical_concept_uid), - ) - rows = cur.fetchall() - cur.close() - conn.close() - out: List[Dict[str, Any]] = [] - - for r in rows: - id = r[0] - name = r[1] - label = r[2] - isRequired = bool(r[3]) - datatype = r[4] - bc_uid = r[5] - alias_code_uid = r[6] - standard_code = r[7] - code = r[8] - code_system = r[9] - code_system_version = r[10] - decode = r[11] - - isEnabled = None - response_codes = _get_dss_response_codes(bc_uid, name, soa_id) - - property = { - "id": id, - "name": name, - "label": label, - "isRequired": isRequired, - "isEnabled": isEnabled, - "datatype": datatype, - "responseCodes": response_codes, - "code": { - "id": alias_code_uid, - "extensionAttributes": [], - "standardCode": { - "id": standard_code, - "extensionAttributes": [], - "code": code, - "codeSystem": code_system, - "codeSystemVersion": code_system_version, - "decode": decode, - "instanceType": "Code", - }, - }, - "notes": [], - "instanceType": "BiomedicalConceptProperty", - } - out.append(property) - - return out +from .usdm_utils import ( + _get_biomedical_concept_synonyms as _get_biomedical_concept_synonyms, + _get_biomedical_concept_properties as _get_biomedical_concept_properties, +) def build_usdm_biomedical_concepts(soa_id: int) -> List[Dict[str, Any]]: diff --git a/src/usdm/generate_elements.py b/src/usdm/generate_elements.py index 70377c6..e6338fe 100644 --- a/src/usdm/generate_elements.py +++ b/src/usdm/generate_elements.py @@ -1,75 +1,8 @@ #!/usr/bin/env python3 -# Prefer absolute import; fallback to adding src/ to sys.path when run directly -from typing import Optional, List, Dict, Any, Tuple - -try: - from soa_builder.web.app import _connect # reuse existing DB connector -except ImportError: - import sys - from pathlib import Path - - here = Path(__file__).resolve() - src_dir = here.parents[2] / "src" - if src_dir.exists() and str(src_dir) not in sys.path: - sys.path.insert(0, str(src_dir)) - from soa_builder.web.app import _connect # type: ignore - - -def _nz(s: Optional[str]) -> Optional[str]: - s = (s or "").strip() - return s or None - - -def _get_transition_start_rule( - soa_id: int, transition_rule_uid: Optional[str] -) -> Optional[Dict[str, Any]]: - if not transition_rule_uid: - return None - conn = _connect() - cur = conn.cursor() - cur.execute( - "SELECT tr.name, tr.label, tr.description, tr.text FROM transition_rule tr WHERE soa_id=? AND transition_rule_uid=?", - (soa_id, transition_rule_uid), - ) - row = cur.fetchone() - conn.close() - if not row: - return None - return { - "id": transition_rule_uid, - "extensionAttributes": [], - "name": row[0] or None, - "label": row[1] or None, - "description": row[2] or None, - "text": row[3] or None, - "instanceType": "TransitionRule", - } - - -def _get_transition_end_rule( - soa_id: int, transition_rule_uid: Optional[str] -) -> Optional[Dict[str, Any]]: - if not transition_rule_uid: - return None - conn = _connect() - cur = conn.cursor() - cur.execute( - "SELECT tr.name, tr.label, tr.description, tr.text FROM transition_rule tr WHERE soa_id=? AND transition_rule_uid=?", - (soa_id, transition_rule_uid), - ) - row = cur.fetchone() - conn.close() - if not row: - return None - return { - "id": transition_rule_uid, - "extensionAttributes": [], - "name": row[0] or None, - "label": row[1] or None, - "description": row[2] or None, - "text": row[3] or None, - "instanceType": "TransitionRule", - } +from typing import List, Dict, Any +from soa_builder.web.db import _connect +from soa_builder.web.utils import _nz +from .usdm_utils import _get_transition_end_rule, _get_transition_start_rule def build_usdm_elements(soa_id: int) -> List[Dict[str, Any]]: diff --git a/src/usdm/generate_encounters.py b/src/usdm/generate_encounters.py index 47d4c95..b645ca7 100644 --- a/src/usdm/generate_encounters.py +++ b/src/usdm/generate_encounters.py @@ -1,99 +1,19 @@ #!/usr/bin/env python3 -# Prefer absolute import; fallback to adding src/ to sys.path when run directly -from typing import Optional, List, Dict, Any, Tuple - -try: - from soa_builder.web.app import _connect # reuse existing DB connector - from soa_builder.web.utils import get_encounter_environment_sv - from soa_builder.web.utils import get_submission_value_for_code -except ImportError: - import sys - from pathlib import Path - - here = Path(__file__).resolve() - src_dir = here.parents[2] / "src" - if src_dir.exists() and str(src_dir) not in sys.path: - sys.path.insert(0, str(src_dir)) - from soa_builder.web.app import _connect # type: ignore - from soa_builder.web.utils import get_encounter_environment_sv - from soa_builder.web.utils import get_submission_value_for_code - - -def _nz(s: Optional[str]) -> Optional[str]: - s = (s or "").strip() - return s or None - - -def _get_timing_name(soa_id: int, timing_id: Optional[int]) -> str: - conn = _connect() - cur = conn.cursor() - cur.execute( - "SELECT timing_uid FROM timing WHERE id=? AND soa_id=?", - ( - timing_id, - soa_id, - ), - ) - row = cur.fetchone() - conn.close() - timing_uid = row[0] if (row and row[0] is not None) else None - - return timing_uid - - -def _get_transition_start_rule( - soa_id: int, transition_rule_uid: Optional[str] -) -> Optional[Dict[str, Any]]: - if not transition_rule_uid: - return None - conn = _connect() - cur = conn.cursor() - cur.execute( - "SELECT tr.name, tr.label, tr.description, tr.text FROM transition_rule tr WHERE soa_id=? AND transition_rule_uid=?", - (soa_id, transition_rule_uid), - ) - row = cur.fetchone() - conn.close() - if not row: - return None - return { - "id": transition_rule_uid, - "extensionAttributes": [], - "name": row[0] or None, - "label": row[1] or None, - "description": row[2] or None, - "text": row[3] or None, - "instanceType": "TransitionRule", - } - - -def _get_transition_end_rule( - soa_id: int, transition_rule_uid: Optional[str] -) -> Optional[Dict[str, Any]]: - if not transition_rule_uid: - return None - conn = _connect() - cur = conn.cursor() - cur.execute( - "SELECT tr.name, tr.label, tr.description, tr.text FROM transition_rule tr WHERE soa_id=? AND transition_rule_uid=?", - (soa_id, transition_rule_uid), - ) - row = cur.fetchone() - conn.close() - if not row: - return None - return { - "id": transition_rule_uid, - "extensionAttributes": [], - "name": row[0] or None, - "label": row[1] or None, - "description": row[2] or None, - "text": row[3] or None, - "instanceType": "TransitionRule", - } - - +from typing import List, Dict, Any, Tuple +from soa_builder.web.utils import get_submission_value_for_code, _nz +from soa_builder.web.db import _connect +from .usdm_utils import ( + _get_timing_name, + _get_transition_start_rule, + _get_transition_end_rule, + _get_code_tuple, +) + + +# Override the definition in usdm_utils.py +# Encounters are currently storing type codes in the ddf_terminology table def _get_type_code_tuple(soa_id: int, code_uid: str) -> Tuple[str, str, str, str]: + """Fetch type codes for ENCOUNTERS only. These values are stored in the ddf_terminology table.""" conn = _connect() cur = conn.cursor() cur.execute( @@ -115,25 +35,6 @@ def _get_type_code_tuple(soa_id: int, code_uid: str) -> Tuple[str, str, str, str return code_code, code_decode, code_system, code_system_version -def _get_code_tuple(soa_id: int, code_uid: str) -> Tuple[str, str]: - conn = _connect() - cur = conn.cursor() - cur.execute( - "SELECT DISTINCT c.codelist_table,c.code " - "FROM code_association c WHERE c.soa_id=? AND c.code_uid=?", - ( - soa_id, - code_uid, - ), - ) - rows = cur.fetchall() - conn.close() - code_system = [r[0] for r in rows] - code = [r[1] for r in rows] - - return code, code_system - - def build_usdm_encounters(soa_id: int) -> List[Dict[str, Any]]: """ Build USDM Encounters-Output objects for the given SOA @@ -192,7 +93,7 @@ def build_usdm_encounters(soa_id: int) -> List[Dict[str, Any]]: ( name, label, - order_index, + _, encounter_uid, description, type, @@ -308,7 +209,7 @@ def build_usdm_encounters(soa_id: int) -> List[Dict[str, Any]]: "id": type, "extensionAttributes": [], "code": t_code[0], - "codeSystem": "db://" + t_codeSystem[0], + "codeSystem": t_codeSystem[0], "codeSystemVersion": t_codeSystemVersion[0], "decode": t_decode[0], "instanceType": "Code", diff --git a/src/usdm/generate_schedule_timelines.py b/src/usdm/generate_schedule_timelines.py index 46ff34a..d61a2d3 100644 --- a/src/usdm/generate_schedule_timelines.py +++ b/src/usdm/generate_schedule_timelines.py @@ -1,113 +1,13 @@ #!/usr/bin/env python3 -# Prefer absolute import; fallback to adding src/ to sys.path when run directly -from typing import Optional, List, Dict, Any, Callable, Set +from typing import List, Dict, Any import logging - -try: - from soa_builder.web.app import _connect # reuse existing DB connector -except ImportError: - import sys - from pathlib import Path - - here = Path(__file__).resolve() - src_dir = here.parents[2] / "src" - if src_dir.exists() and str(src_dir) not in sys.path: - sys.path.insert(0, str(src_dir)) - from soa_builder.web.app import _connect # type: ignore - - -def _timing_uids_for_timeline(soa_id: int, schedule_timeline_uid: str) -> Set[str]: - conn = _connect() - cur = conn.cursor() - cur.execute( - """ - SELECT t.timing_uid FROM schedule_timelines s INNER JOIN timing t - ON s.schedule_timeline_uid = t.member_of_timeline AND s.soa_id = t.soa_id - WHERE s.soa_id=? AND s.schedule_timeline_uid=? ORDER BY t.timing_uid - """, - ( - soa_id, - schedule_timeline_uid, - ), - ) - rows = cur.fetchall() - conn.close() - return {r[0] for r in rows if r and r[0]} - - -def _load_generate_study_timings(): - """Return the timing builder from usdm.generate_study_timings (tries several names).""" - try: - import usdm.generate_study_timings as gst - except Exception: - import sys - from pathlib import Path - - here = Path(__file__).resolve() - src_dir = here.parents[2] / "src" - if src_dir.exists() and str(src_dir) not in sys.path: - sys.path.insert(0, str(src_dir)) - import usdm.generate_study_timings as gst - for name in ( - "build_usdm_study_timings", - "build_usdm_timings", - "generate_study_timings", - ): - fn = getattr(gst, name, None) - if callable(fn): - return fn - raise ImportError("usdm.generate_study_timings missing expected builder function") - - -def _load_generate_study_instances(): - """Return the instances builder from usdm.generate_scheduled_activity_instances.""" - try: - import usdm.generate_scheduled_activity_instances as gsai - except Exception: - import sys - from pathlib import Path - - here = Path(__file__).resolve() - src_dir = here.parents[2] / "src" - if src_dir.exists() and str(src_dir) not in sys.path: - sys.path.insert(0, str(src_dir)) - import usdm.generate_scheduled_activity_instances as gsai - for name in ( - "build_usdm_instances", - "generate_scheduled_activity_instances", - ): - fn = getattr(gsai, name, None) - if callable(fn): - return fn - raise ImportError( - "usdm.generate_scheduled_activity_instances missing expected builder function" - ) - - -def _load_generate_decision_instances(): - """Return the decision instances builder from usdm.generate_scheduled_decision_instances.""" - try: - import usdm.generate_scheduled_decision_instances as gsdi - except Exception: - import sys - from pathlib import Path - - here = Path(__file__).resolve() - src_dir = here.parents[2] / "src" - if src_dir.exists() and str(src_dir) not in sys.path: - sys.path.insert(0, str(src_dir)) - import usdm.generate_scheduled_decision_instances as gsdi - fn = getattr(gsdi, "build_usdm_decision_instances", None) - if callable(fn): - return fn - raise ImportError( - "usdm.generate_scheduled_decision_instances missing build_usdm_decision_instances" - ) - - -def _nz(s: Optional[str]) -> Optional[str]: - s = (s or "").strip() - return s or None +from soa_builder.web.db import _connect +from soa_builder.web.utils import _nz +from .usdm_utils import ( + _load_generate_study_timings, + _load_generate_study_instances, + _load_generate_decision_instances, +) generate_study_timings = _load_generate_study_timings() @@ -218,7 +118,7 @@ def build_usdm_schedule_timelines(soa_id: int) -> List[Dict[str, Any]]: mainTimeline, entryCondition, entryId, - exitId, + _, ) = ( r[0], r[1], @@ -255,7 +155,6 @@ def build_usdm_schedule_timelines(soa_id: int) -> List[Dict[str, Any]]: if __name__ == "__main__": import argparse import json - import logging import sys logger = logging.getLogger("usdm.generate_schedule_timelines") diff --git a/src/usdm/generate_scheduled_activity_instances.py b/src/usdm/generate_scheduled_activity_instances.py index 971181e..1cac13d 100644 --- a/src/usdm/generate_scheduled_activity_instances.py +++ b/src/usdm/generate_scheduled_activity_instances.py @@ -1,43 +1,10 @@ #!/usr/bin/env python3 # Prefer absolute import; fallback to adding src/ to sys.path when run directly from typing import Optional, List, Dict, Any - -try: - from soa_builder.web.app import _connect # reuse existing DB connector -except ImportError: - import sys - from pathlib import Path - - here = Path(__file__).resolve() - src_dir = here.parents[2] / "src" - if src_dir.exists() and str(src_dir) not in sys.path: - sys.path.insert(0, str(src_dir)) - from soa_builder.web.app import _connect # type: ignore - - -def _nz(s: Optional[str]) -> Optional[str]: - s = (s or "").strip() - return s or None - - -def _get_activity_ids(soa_id: int, encounter_uid: str) -> List[str]: - conn = _connect() - cur = conn.cursor() - cur.execute( - "SELECT a.activity_uid from activity a " - "INNER JOIN matrix_cells m ON a.id = m.activity_id AND a.soa_id = m.soa_id " - "INNER JOIN visit v ON m.visit_id = v.id AND m.soa_id = v.soa_id " - "INNER JOIN instances i ON v.encounter_uid = i.encounter_uid AND v.soa_id = i.soa_id " - "WHERE i.soa_id=? and i.encounter_uid=?", - ( - soa_id, - encounter_uid, - ), - ) - rows = cur.fetchall() - conn.close() - activity_uids = [r[0] for r in rows] or [] - return activity_uids +import sys +from soa_builder.web.db import _connect +from soa_builder.web.utils import _nz +from .usdm_utils import _get_activity_ids def build_usdm_instances( @@ -86,7 +53,7 @@ def build_usdm_instances( for i, r in enumerate(rows): ( - id, + _, instance_uid, name, label, @@ -132,7 +99,6 @@ def build_usdm_instances( import argparse import json import logging - import sys logger = logging.getLogger("usdm.generate_instances") diff --git a/src/usdm/generate_scheduled_decision_instances.py b/src/usdm/generate_scheduled_decision_instances.py index 04706b6..70710a2 100644 --- a/src/usdm/generate_scheduled_decision_instances.py +++ b/src/usdm/generate_scheduled_decision_instances.py @@ -1,49 +1,9 @@ #!/usr/bin/env python3 # Prefer absolute import; fallback to adding src/ to sys.path when run directly from typing import Optional, List, Dict, Any - -try: - from soa_builder.web.app import _connect # reuse existing DB connector -except ImportError: - import sys - from pathlib import Path - - here = Path(__file__).resolve() - src_dir = here.parents[2] / "src" - if src_dir.exists() and str(src_dir) not in sys.path: - sys.path.insert(0, str(src_dir)) - from soa_builder.web.app import _connect # type: ignore - - -def _nz(s: Optional[str]) -> Optional[str]: - s = (s or "").strip() - return s or None - - -def _get_condition_assignments( - soa_id: int, decision_instance_uid: str -) -> List[Dict[str, Any]]: - conn = _connect() - cur = conn.cursor() - cur.execute( - "SELECT condition_assignment_uid, condition, condition_target_uid " - "FROM condition_assignment " - "WHERE soa_id=? AND decision_instance_uid=? " - "ORDER BY order_index, id", - (soa_id, decision_instance_uid), - ) - rows = cur.fetchall() - conn.close() - return [ - { - "id": r[0], - "extensionAttributes": [], - "condition": r[1], - "conditionTargetId": r[2], - "instanceType": "Condition", - } - for r in rows - ] +from soa_builder.web.db import _connect +from soa_builder.web.utils import _nz +from .usdm_utils import _get_condition_assignments def build_usdm_decision_instances( diff --git a/src/usdm/generate_study_cells.py b/src/usdm/generate_study_cells.py index 48e1b20..f5aca77 100644 --- a/src/usdm/generate_study_cells.py +++ b/src/usdm/generate_study_cells.py @@ -1,46 +1,8 @@ #!/usr/bin/env python3 # Prefer absolute import; fallback to adding src/ to sys.path when run directly -from typing import Optional, List, Dict, Any - -try: - from soa_builder.web.app import _connect # reuse existing DB connector -except ImportError: - import sys - from pathlib import Path - - here = Path(__file__).resolve() - src_dir = here.parents[2] / "src" - if src_dir.exists() and str(src_dir) not in sys.path: - sys.path.insert(0, str(src_dir)) - from soa_builder.web.app import _connect # type: ignore - - -def _nz(s: Optional[str]) -> Optional[str]: - s = (s or "").strip() - return s or None - - -def _get_element_ids(soa_id: int, study_cell_uid: str) -> List[str]: - conn = _connect() - cur = conn.cursor() - cur.execute( - "SELECT element_uid from study_cell WHERE soa_id=? AND study_cell_uid=? ORDER BY element_uid", - ( - soa_id, - study_cell_uid, - ), - ) - rows = cur.fetchall() - conn.close() - # Deduplicate and preserve stable order - element_uids = [r[0] for r in rows] or [] - seen = set() - ordered_unique: List[str] = [] - for uid in element_uids: - if uid and uid not in seen: - seen.add(uid) - ordered_unique.append(uid) - return ordered_unique +from typing import List, Dict, Any +from soa_builder.web.db import _connect +from .usdm_utils import _get_element_ids def build_usdm_study_cells(soa_id: int) -> List[Dict[str, Any]]: diff --git a/src/usdm/generate_study_epochs.py b/src/usdm/generate_study_epochs.py index 5b3a9cb..7186b95 100644 --- a/src/usdm/generate_study_epochs.py +++ b/src/usdm/generate_study_epochs.py @@ -1,64 +1,9 @@ #!/usr/bin/env python3 # Prefer absolute import; fallback to adding src/ to sys.path when run directly -from typing import Optional, List, Dict, Any, Tuple -from urllib.parse import urlparse - - -import os -import requests - -try: - from soa_builder.web.app import _connect # reuse existing DB connector -except ImportError: - import sys - from pathlib import Path - - here = Path(__file__).resolve() - src_dir = here.parents[2] / "src" - if src_dir.exists() and str(src_dir) not in sys.path: - sys.path.insert(0, str(src_dir)) - from soa_builder.web.app import _connect # type: ignore - - -def _nz(s: Optional[str]) -> Optional[str]: - s = (s or "").strip() - return s or None - - -def _get_epoch_code_values(soa_id: int, epoch_type: str, code: str) -> Tuple[ - str, - str, - str, -]: - logger = logging.getLogger("usdm.generate_epochs") - url = "https://library.cdisc.org/api/mdr/ct/packages/sdtmct-2025-09-26/codelists/C99079" - headers: dict[str, str] = {"Accept": "application/json"} - subscription_key = os.environ.get("CDISC_SUBSCRIPTION_KEY") - api_key = os.environ.get("CDISC_API_KEY") or os.environ.get( - "CDISC_SUBSCRIPTION_KEY" - ) - unified_key = subscription_key or api_key - if unified_key: - headers["Ocp-Apim-Subscription-Key"] = unified_key - if api_key: - headers["Authorization"] = f"Bearer {api_key}" - headers["api-key"] = api_key - - resp = requests.get(url, headers=headers, timeout=10) - if resp.status_code != 200: - logger.exception("No response from {} for code {}".format(url, epoch_type)) - else: - content = resp.json() - parsed_url = urlparse(url) - code_system = parsed_url.scheme + "://" + parsed_url.netloc - code_system_version = parsed_url.path.split("/", 7)[5] - - top_terms = content.get("terms") - for term in top_terms: - if term.get("conceptId") == code: - decode = term.get("submissionValue") - - return code_system, code_system_version, decode +from typing import List, Dict, Any +from soa_builder.web.db import _connect +from soa_builder.web.utils import _nz +from .usdm_utils import _get_epoch_code_values def build_usdm_epochs(soa_id: int) -> List[Dict[str, Any]]: @@ -113,7 +58,7 @@ def build_usdm_epochs(soa_id: int) -> List[Dict[str, Any]]: out: List[Dict[str, Any]] = [] for i, r in enumerate(rows): - id, epoch_uid, name, label, description, epoch_type, code = ( + _, epoch_uid, name, label, description, epoch_type, code = ( r[0], r[1], r[2], diff --git a/src/usdm/generate_study_timings.py b/src/usdm/generate_study_timings.py index 1f10e64..209bf5b 100644 --- a/src/usdm/generate_study_timings.py +++ b/src/usdm/generate_study_timings.py @@ -1,45 +1,9 @@ #!/usr/bin/env python3 # Prefer absolute import; fallback to adding src/ to sys.path when run directly -from typing import Optional, List, Dict, Any, Tuple - -try: - from soa_builder.web.app import _connect # reuse existing DB connector -except ImportError: - import sys - from pathlib import Path - - here = Path(__file__).resolve() - src_dir = here.parents[2] / "src" - if src_dir.exists() and str(src_dir) not in sys.path: - sys.path.insert(0, str(src_dir)) - from soa_builder.web.app import _connect # type: ignore - - -def _nz(s: Optional[str]) -> Optional[str]: - s = (s or "").strip() - return s or None - - -def _get_timing_code_values(soa_id: int, code_uid: str) -> Tuple[str, str, str, str]: - conn = _connect() - cur = conn.cursor() - cur.execute( - "SELECT DISTINCT c.codelist_table,d.code,d.cdisc_submission_value,d.dataset_date " - "FROM code_association c INNER JOIN ddf_terminology d ON c.codelist_code = d.codelist_code " - "AND c.code = d.code WHERE c.soa_id=? AND c.code_uid=?", - ( - soa_id, - code_uid, - ), - ) - rows = cur.fetchall() - conn.close() - code_system = [r[0] for r in rows] - code_code = [r[1] for r in rows] - code_decode = [r[2] for r in rows] - code_system_version = [r[3] for r in rows] - - return code_code, code_decode, code_system, code_system_version +from typing import Optional, List, Dict, Any +from soa_builder.web.db import _connect +from soa_builder.web.utils import _nz +from .usdm_utils import _get_timing_code_values def build_usdm_timings( diff --git a/src/usdm/generate_usdm.py b/src/usdm/generate_usdm.py index 245bdcd..175f1c2 100644 --- a/src/usdm/generate_usdm.py +++ b/src/usdm/generate_usdm.py @@ -5,22 +5,10 @@ Produces a Study-Output → StudyVersion-Output → InterventionalStudyDesign-Output hierarchy, populating sub-entities from the existing per-entity generators. """ -from typing import Optional, List, Dict, Any +from typing import List, Dict, Any import logging - -logger = logging.getLogger("usdm.generate_usdm") - -try: - from soa_builder.web.app import _connect -except ImportError: - import sys - from pathlib import Path - - here = Path(__file__).resolve() - src_dir = here.parents[2] / "src" - if src_dir.exists() and str(src_dir) not in sys.path: - sys.path.insert(0, str(src_dir)) - from soa_builder.web.app import _connect # type: ignore +from .usdm_utils import _get_soa_metadata +from soa_builder.web.utils import _nz from usdm.generate_activities import build_usdm_activities from usdm.generate_arms import build_usdm_arms @@ -31,29 +19,7 @@ from usdm.generate_study_epochs import build_usdm_epochs from usdm.generate_biomedical_concepts import build_usdm_biomedical_concepts - -def _nz(s: Optional[str]) -> Optional[str]: - s = (s or "").strip() - return s or None - - -def _get_soa_metadata(soa_id: int) -> Dict[str, Optional[str]]: - conn = _connect() - cur = conn.cursor() - cur.execute( - "SELECT name, study_id, study_label, study_description FROM soa WHERE id=?", - (soa_id,), - ) - row = cur.fetchone() - conn.close() - if row is None: - raise ValueError(f"No SOA found with id={soa_id}") - return { - "name": row[0], - "study_id": row[1], - "study_label": row[2], - "study_description": row[3], - } +logger = logging.getLogger("usdm.generate_usdm") def build_usdm(soa_id: int) -> Dict[str, Any]: diff --git a/src/usdm/usdm_utils.py b/src/usdm/usdm_utils.py new file mode 100644 index 0000000..ada8c43 --- /dev/null +++ b/src/usdm/usdm_utils.py @@ -0,0 +1,643 @@ +# utils for usdm generators +import os +import functools +import requests +import logging +from urllib.parse import urlparse +from typing import List, Dict, Optional, Any, Tuple +from soa_builder.web.db import _connect +from soa_builder.web.utils import get_latest_sdtm_ct_href as _get_latest_sdtm_ct_href + +URL_PREFIX = "https://api.library.cdisc.org/api/cosmos/v2/" + + +# Generic helper functions for USDM generator scripts +def _build_api_headers() -> dict: + api_key = os.environ.get("CDISC_API_KEY") or os.environ.get( + "CDISC_SUBSCRIPTION_KEY" + ) + subscription_key = os.environ.get("CDISC_SUBSCRIPTION_KEY") or api_key + headers: dict = {"Accept": "application/json"} + if subscription_key: + headers["Ocp-Apim-Subscription-Key"] = subscription_key + if api_key: + headers["Authorization"] = f"Bearer {api_key}" + headers["api-key"] = api_key + return headers + + +# Generic function to return submission value for provided codelist_code and code +@functools.lru_cache(maxsize=256) +def get_submission_value_for_code(soa_id: int, codelist_code: str, code_uid: str): + """Resolve the environmental setting submission value via CDISC Library.""" + if not code_uid: + return None + + conn = _connect() + cur = conn.cursor() + cur.execute( + "SELECT code FROM code_association WHERE soa_id=? AND code_uid=?", + (soa_id, code_uid), + ) + row = cur.fetchone() + conn.close() + if not row: + return None + target_code = str(row[0]).strip() + + package_slug = _get_latest_sdtm_ct_href() + if not package_slug: + return None + + url = ( + f"https://library.cdisc.org/api/mdr/ct/packages/" + f"{package_slug}/codelists/{codelist_code}" + ) + + headers: dict[str, str] = {"Accept": "application/json"} + subscription_key = os.environ.get("CDISC_SUBSCRIPTION_KEY") + api_key = os.environ.get("CDISC_API_KEY") or subscription_key + if subscription_key: + headers["Ocp-Apim-Subscription-Key"] = subscription_key + if api_key: + headers["Authorization"] = f"Bearer {api_key}" + headers["api-key"] = api_key + + def _match_term(term: dict[str, Any]) -> str | None: + term_id = next( + ( + term.get(field) + for field in ( + "conceptId", + "concept_id", + "code", + "termCode", + "term_code", + ) + if term.get(field) + ), + None, + ) + if term_id and str(term_id).lower() == target_code.lower(): + submission = term.get("submissionValue") or term.get( + "cdisc_submission_value" + ) + if submission: + return str(submission).strip() + return None + + def _extract_terms(data: Any) -> List[dict]: + if isinstance(data, list): + return [t for t in data if isinstance(t, dict)] + if isinstance(data, dict): + if isinstance(data.get("terms"), list): + return [t for t in data["terms"] if isinstance(t, dict)] + embedded = data.get("_embedded", {}) + if isinstance(embedded, dict) and isinstance(embedded.get("terms"), list): + return [t for t in embedded["terms"] if isinstance(t, dict)] + return [] + + try: + resp = requests.get(url, headers=headers, timeout=10) + if resp.status_code != 200: + return None + payload = resp.json() or {} + except Exception: + return None + + for term in _extract_terms(payload): + submission = _match_term(term) + if submission: + return submission + + term_links = payload.get("_links", {}).get("terms") or [] + if isinstance(term_links, dict): + term_links = [term_links] + + for link in term_links: + href = link.get("href") + if not href: + continue + if href.startswith("/"): + href = f"https://library.cdisc.org{href}" + try: + term_resp = requests.get(href, headers=headers, timeout=10) + if term_resp.status_code != 200: + continue + term_data = term_resp.json() or {} + except Exception: + continue + submission = _match_term(term_data if isinstance(term_data, dict) else {}) + if submission: + return submission + + return None + + +# Helper functions for populating biomedical concepts +@functools.lru_cache(maxsize=128) +def _fetch_dss_variable_map(dss_href: str) -> Dict[str, List[str]]: + """Fetch a DSS href once and return {variable_name: valueList}. Cached per href.""" + try: + resp = requests.get(dss_href, headers=_build_api_headers(), timeout=15) + if resp.status_code != 200: + return {} + data = resp.json() + return {v["name"]: v.get("valueList", []) for v in data.get("variables", [])} + except (requests.RequestException, ValueError) as e: + print(f"Error fetching DSS variable map: {e}") + return {} + + +@functools.lru_cache(maxsize=256) +def _get_dss_response_codes( + biomedical_concept_uid: str, variable_name: str, soa_id: int +) -> List[str]: + """Return responseCodes for a single DSS variable. Cached per (bc_uid, name, soa_id).""" + conn = _connect() + cur = conn.cursor() + cur.execute( + "SELECT dss_href FROM activity_concept WHERE concept_uid=? AND soa_id=?", + (biomedical_concept_uid, soa_id), + ) + row = cur.fetchone() + conn.close() + if not row or not row[0]: + return [] + return _fetch_dss_variable_map(row[0]).get(variable_name, []) + + +@functools.lru_cache(maxsize=256) +def _get_biomedical_concept_synonyms(concept_code: str) -> List[str]: + """Fetch the synonyms of a biomedical concept using the CDISC API. Cached per code.""" + url = URL_PREFIX + "mdr/bc/biomedicalconcepts/" + concept_code + try: + resp = requests.get(url, headers=_build_api_headers(), timeout=15) + if resp.status_code != 200: + return [] + data = resp.json() + return data.get("synonyms", []) + except (requests.RequestException, ValueError) as e: + print(f"Error fetching biomedical concept synonyms: {e}") + return [] + + +@functools.lru_cache(maxsize=256) +def _get_biomedical_concept_properties( + soa_id: int, biomedical_concept_uid: str +) -> Optional[Dict[str, any]]: + """Fetch biomedical concept properties from the database using the BiomedicalConcept_{}.""" + + conn = _connect() + cur = conn.cursor() + cur.execute( + """ + SELECT + p.biomedical_concept_property_uid id, + p.name name, + p.label label, + p.isRequired isRequired, + p.datatype datatype, + p.biomedical_concept_uid biomedical_concept_uid, + a.alias_code_uid alias_code_uid, + a.standard_code standard_code, + c.code code, + c.code_system code_system, + c.code_system_version code_system_version, + c.decode decode + FROM biomedical_concept_property p + INNER JOIN alias_code a ON p.code = a.alias_code_uid AND p.soa_id = a.soa_id + INNER JOIN code c ON a.standard_code = c.code_uid AND a.soa_id = c.soa_id + WHERE p.soa_id = ? AND p.biomedical_concept_uid = ? + ORDER BY p.id; + """, + (soa_id, biomedical_concept_uid), + ) + rows = cur.fetchall() + cur.close() + conn.close() + out: List[Dict[str, Any]] = [] + + for r in rows: + id = r[0] + name = r[1] + label = r[2] + isRequired = bool(r[3]) + datatype = r[4] + bc_uid = r[5] + alias_code_uid = r[6] + standard_code = r[7] + code = r[8] + code_system = r[9] + code_system_version = r[10] + decode = r[11] + + isEnabled = None + response_codes = _get_dss_response_codes(bc_uid, name, soa_id) + + property = { + "id": id, + "name": name, + "label": label, + "isRequired": isRequired, + "isEnabled": isEnabled, + "datatype": datatype, + "responseCodes": response_codes, + "code": { + "id": alias_code_uid, + "extensionAttributes": [], + "standardCode": { + "id": standard_code, + "extensionAttributes": [], + "code": code, + "codeSystem": code_system, + "codeSystemVersion": code_system_version, + "decode": decode, + "instanceType": "Code", + }, + }, + "notes": [], + "instanceType": "BiomedicalConceptProperty", + } + out.append(property) + + return out + + +# Helper for Activities +@functools.lru_cache(maxsize=256) +def _get_biomedical_concept_ids(soa_id: int, activity_uid: int) -> List[str]: + conn = _connect() + cur = conn.cursor() + cur.execute( + "SELECT concept_uid from activity_concept where soa_id=? and activity_uid=?", + ( + soa_id, + activity_uid, + ), + ) + rows = cur.fetchall() + conn.close() + bc_uids = [r[0] for r in rows] or [] + return bc_uids + + +# Helper for Arms +def _get_type_code_tuple(soa_id: int, code_uid: str) -> Tuple[str, str, str, str]: + """Fetch type codes for ARMS only. These values are stored in the protocol_terminology table.""" + conn = _connect() + cur = conn.cursor() + cur.execute( + "SELECT DISTINCT c.codelist_table, p.code,p.cdisc_submission_value,p.dataset_date " + "FROM code_association c INNER JOIN protocol_terminology p ON c.codelist_code = p.codelist_code " + "AND c.code = p.code WHERE c.soa_id=? AND c.code_uid=?", + ( + soa_id, + code_uid, + ), + ) + rows = cur.fetchall() + conn.close() + code_system = [r[0] for r in rows] + code_code = [r[1] for r in rows] + code_decode = [r[2] for r in rows] + code_system_version = [r[3] for r in rows] + + return code_code, code_decode, code_system, code_system_version + + +def _get_data_origin_type_tuple( + soa_id: int, code_uid: str +) -> Tuple[str, str, str, str]: + conn = _connect() + cur = conn.cursor() + cur.execute( + "SELECT DISTINCT c.codelist_table,d.code,d.cdisc_submission_value,d.dataset_date " + "FROM code_association c INNER JOIN ddf_terminology d ON c.codelist_code = d.codelist_code " + "AND c.code = d.code WHERE c.soa_id=? AND c.code_uid=?", + ( + soa_id, + code_uid, + ), + ) + rows = cur.fetchall() + conn.close() + code_system = [r[0] for r in rows] + code_code = [r[1] for r in rows] + code_decode = [r[2] for r in rows] + code_system_version = [r[3] for r in rows] + + return code_code, code_decode, code_system, code_system_version + + +# Helpders for Elements +def _get_transition_end_rule( + soa_id: int, transition_rule_uid: Optional[str] +) -> Optional[Dict[str, Any]]: + if not transition_rule_uid: + return None + conn = _connect() + cur = conn.cursor() + cur.execute( + "SELECT tr.name, tr.label, tr.description, tr.text FROM transition_rule tr WHERE soa_id=? AND transition_rule_uid=?", + (soa_id, transition_rule_uid), + ) + row = cur.fetchone() + conn.close() + if not row: + return None + return { + "id": transition_rule_uid, + "extensionAttributes": [], + "name": row[0] or None, + "label": row[1] or None, + "description": row[2] or None, + "text": row[3] or None, + "instanceType": "TransitionRule", + } + + +def _get_transition_start_rule( + soa_id: int, transition_rule_uid: Optional[str] +) -> Optional[Dict[str, Any]]: + if not transition_rule_uid: + return None + conn = _connect() + cur = conn.cursor() + cur.execute( + "SELECT tr.name, tr.label, tr.description, tr.text FROM transition_rule tr WHERE soa_id=? AND transition_rule_uid=?", + (soa_id, transition_rule_uid), + ) + row = cur.fetchone() + conn.close() + if not row: + return None + return { + "id": transition_rule_uid, + "extensionAttributes": [], + "name": row[0] or None, + "label": row[1] or None, + "description": row[2] or None, + "text": row[3] or None, + "instanceType": "TransitionRule", + } + + +def _get_timing_name(soa_id: int, timing_id: Optional[int]) -> str: + conn = _connect() + cur = conn.cursor() + cur.execute( + "SELECT timing_uid FROM timing WHERE id=? AND soa_id=?", + ( + timing_id, + soa_id, + ), + ) + row = cur.fetchone() + conn.close() + timing_uid = row[0] if (row and row[0] is not None) else None + + return timing_uid + + +def _get_code_tuple(soa_id: int, code_uid: str) -> Tuple[str, str]: + conn = _connect() + cur = conn.cursor() + cur.execute( + "SELECT DISTINCT c.codelist_table,c.code " + "FROM code_association c WHERE c.soa_id=? AND c.code_uid=?", + ( + soa_id, + code_uid, + ), + ) + rows = cur.fetchall() + conn.close() + code_system = [r[0] for r in rows] + code = [r[1] for r in rows] + + return code, code_system + + +# Helper functions for study timing +def _load_generate_study_timings(): + """Return the timing builder from usdm.generate_study_timings (tries several names).""" + try: + import usdm.generate_study_timings as gst + except Exception: + import sys + from pathlib import Path + + here = Path(__file__).resolve() + src_dir = here.parents[2] / "src" + if src_dir.exists() and str(src_dir) not in sys.path: + sys.path.insert(0, str(src_dir)) + import usdm.generate_study_timings as gst + for name in ( + "build_usdm_study_timings", + "build_usdm_timings", + "generate_study_timings", + ): + fn = getattr(gst, name, None) + if callable(fn): + return fn + raise ImportError("usdm.generate_study_timings missing expected builder function") + + +def _load_generate_study_instances(): + """Return the instances builder from usdm.generate_scheduled_activity_instances.""" + try: + import usdm.generate_scheduled_activity_instances as gsai + except Exception: + import sys + from pathlib import Path + + here = Path(__file__).resolve() + src_dir = here.parents[2] / "src" + if src_dir.exists() and str(src_dir) not in sys.path: + sys.path.insert(0, str(src_dir)) + import usdm.generate_scheduled_activity_instances as gsai + for name in ( + "build_usdm_instances", + "generate_scheduled_activity_instances", + ): + fn = getattr(gsai, name, None) + if callable(fn): + return fn + raise ImportError( + "usdm.generate_scheduled_activity_instances missing expected builder function" + ) + + +def _load_generate_decision_instances(): + """Return the decision instances builder from usdm.generate_scheduled_decision_instances.""" + try: + import usdm.generate_scheduled_decision_instances as gsdi + except Exception: + import sys + from pathlib import Path + + here = Path(__file__).resolve() + src_dir = here.parents[2] / "src" + if src_dir.exists() and str(src_dir) not in sys.path: + sys.path.insert(0, str(src_dir)) + import usdm.generate_scheduled_decision_instances as gsdi + fn = getattr(gsdi, "build_usdm_decision_instances", None) + if callable(fn): + return fn + raise ImportError( + "usdm.generate_scheduled_decision_instances missing build_usdm_decision_instances" + ) + + +# Helpers for Scheduled Activity Instances +def _get_activity_ids(soa_id: int, encounter_uid: str) -> List[str]: + conn = _connect() + cur = conn.cursor() + cur.execute( + "SELECT a.activity_uid from activity a " + "INNER JOIN matrix_cells m ON a.id = m.activity_id AND a.soa_id = m.soa_id " + "INNER JOIN visit v ON m.visit_id = v.id AND m.soa_id = v.soa_id " + "INNER JOIN instances i ON v.encounter_uid = i.encounter_uid AND v.soa_id = i.soa_id " + "WHERE i.soa_id=? and i.encounter_uid=?", + ( + soa_id, + encounter_uid, + ), + ) + rows = cur.fetchall() + conn.close() + activity_uids = [r[0] for r in rows] or [] + return activity_uids + + +# Helpers for Scheduled Decision Instances +def _get_condition_assignments( + soa_id: int, decision_instance_uid: str +) -> List[Dict[str, Any]]: + conn = _connect() + cur = conn.cursor() + cur.execute( + "SELECT condition_assignment_uid, condition, condition_target_uid " + "FROM condition_assignment " + "WHERE soa_id=? AND decision_instance_uid=? " + "ORDER BY order_index, id", + (soa_id, decision_instance_uid), + ) + rows = cur.fetchall() + conn.close() + return [ + { + "id": r[0], + "extensionAttributes": [], + "condition": r[1], + "conditionTargetId": r[2], + "instanceType": "Condition", + } + for r in rows + ] + + +# Helpers for Study Cells +def _get_element_ids(soa_id: int, study_cell_uid: str) -> List[str]: + conn = _connect() + cur = conn.cursor() + cur.execute( + "SELECT element_uid from study_cell WHERE soa_id=? AND study_cell_uid=? ORDER BY element_uid", + ( + soa_id, + study_cell_uid, + ), + ) + rows = cur.fetchall() + conn.close() + # Deduplicate and preserve stable order + element_uids = [r[0] for r in rows] or [] + seen = set() + ordered_unique: List[str] = [] + for uid in element_uids: + if uid and uid not in seen: + seen.add(uid) + ordered_unique.append(uid) + return ordered_unique + + +# Helpers for Study Epochs +@functools.lru_cache(maxsize=256) +def _get_epoch_code_values(soa_id: int, epoch_type: str, code: str) -> Tuple[ + str, + str, + str, +]: + logger = logging.getLogger("usdm.generate_epochs") + url = "https://library.cdisc.org/api/mdr/ct/packages/sdtmct-2025-09-26/codelists/C99079" + headers: dict[str, str] = {"Accept": "application/json"} + subscription_key = os.environ.get("CDISC_SUBSCRIPTION_KEY") + api_key = os.environ.get("CDISC_API_KEY") or os.environ.get( + "CDISC_SUBSCRIPTION_KEY" + ) + unified_key = subscription_key or api_key + if unified_key: + headers["Ocp-Apim-Subscription-Key"] = unified_key + if api_key: + headers["Authorization"] = f"Bearer {api_key}" + headers["api-key"] = api_key + + resp = requests.get(url, headers=headers, timeout=10) + if resp.status_code != 200: + logger.exception("No response from {} for code {}".format(url, epoch_type)) + else: + content = resp.json() + parsed_url = urlparse(url) + code_system = parsed_url.scheme + "://" + parsed_url.netloc + code_system_version = parsed_url.path.split("/", 7)[5] + + top_terms = content.get("terms") + for term in top_terms: + if term.get("conceptId") == code: + decode = term.get("submissionValue") + + return code_system, code_system_version, decode + + +# Helpers for Study Timings +def _get_timing_code_values(soa_id: int, code_uid: str) -> Tuple[str, str, str, str]: + conn = _connect() + cur = conn.cursor() + cur.execute( + "SELECT DISTINCT c.codelist_table,d.code,d.cdisc_submission_value,d.dataset_date " + "FROM code_association c INNER JOIN ddf_terminology d ON c.codelist_code = d.codelist_code " + "AND c.code = d.code WHERE c.soa_id=? AND c.code_uid=?", + ( + soa_id, + code_uid, + ), + ) + rows = cur.fetchall() + conn.close() + code_system = [r[0] for r in rows] + code_code = [r[1] for r in rows] + code_decode = [r[2] for r in rows] + code_system_version = [r[3] for r in rows] + + return code_code, code_decode, code_system, code_system_version + + +# Helper to return study metadata +def _get_soa_metadata(soa_id: int) -> Dict[str, Optional[str]]: + conn = _connect() + cur = conn.cursor() + cur.execute( + "SELECT name, study_id, study_label, study_description FROM soa WHERE id=?", + (soa_id,), + ) + row = cur.fetchone() + conn.close() + if row is None: + raise ValueError(f"No SOA found with id={soa_id}") + return { + "name": row[0], + "study_id": row[1], + "study_label": row[2], + "study_description": row[3], + } From 72eee58a124b4569daaf22c83202498811bf54ce Mon Sep 17 00:00:00 2001 From: Darren <3921919+pendingintent@users.noreply.github.com> Date: Fri, 6 Mar 2026 11:45:42 -0500 Subject: [PATCH 4/7] Address copilot-reported review issues --- src/soa_builder/web/app.py | 53 +++++++++- src/usdm/generate_biomedical_concepts.py | 2 +- src/usdm/generate_encounters.py | 2 + src/usdm/usdm_utils.py | 118 ++++++++++++++++++----- 4 files changed, 146 insertions(+), 29 deletions(-) diff --git a/src/soa_builder/web/app.py b/src/soa_builder/web/app.py index 5634029..2c98063 100644 --- a/src/soa_builder/web/app.py +++ b/src/soa_builder/web/app.py @@ -1844,7 +1844,9 @@ async def lifespan(app: FastAPI): ) def _run_enrichment_pool(_rows=_unenriched): - with ThreadPoolExecutor(max_workers=1) as _pool: + with ThreadPoolExecutor( + max_workers=1 + ) as _pool: # concurrency was reduced for rate-limiting for _concept_code, _soa_id in _rows: _pool.submit(_enrich_code_bg, _concept_code, _soa_id) @@ -2463,7 +2465,7 @@ def _lookup_and_save_dss(soa_id: int, activity_id: int, concept_code: str) -> No pass # silent failure — DSS column remains unset; user can assign manually -async def _populate_bc_properties_bg(soa_id: int, activity_id: int, concept_code: str): +def _populate_bc_properties_bg(soa_id: int, activity_id: int, concept_code: str): """ Background task: fetch DSS variables, parse them, insert BiomedicalConceptProperty rows. Restructured to only hold database lock during quick writes, not during slow API calls. @@ -2507,7 +2509,7 @@ async def _populate_bc_properties_bg(soa_id: int, activity_id: int, concept_code return dss_data = resp.json() except (requests.RequestException, ValueError) as e: - print(f"API error fetching DSS for {concept_code}: {e}") + logger.warning(f"API error fetching DSS for {concept_code}: {e}") return # Step 3: Parse response data (NO DATABASE CONNECTION) @@ -2550,6 +2552,14 @@ async def _populate_bc_properties_bg(soa_id: int, activity_id: int, concept_code try: cur.execute("BEGIN IMMEDIATE") + # Collect existing property alias_code UIDs BEFORE deleting (for cascade cleanup) + cur.execute( + "SELECT code FROM biomedical_concept_property" + " WHERE biomedical_concept_uid = ? AND soa_id = ?", + (concept_uid, soa_id), + ) + orphaned_alias_uids = [r[0] for r in cur.fetchall() if r[0]] + # Delete existing properties for this concept cur.execute( "DELETE FROM biomedical_concept_property" @@ -2557,6 +2567,43 @@ async def _populate_bc_properties_bg(soa_id: int, activity_id: int, concept_code (concept_uid, soa_id), ) + # Cascade-delete orphaned alias_code + code rows + for alias_uid in orphaned_alias_uids: + # Check if this alias_code is still referenced by other properties or biomedical_concept + cur.execute( + "SELECT 1 FROM biomedical_concept_property WHERE soa_id=? AND code=? LIMIT 1", + (soa_id, alias_uid), + ) + if cur.fetchone(): + continue # Still in use + cur.execute( + "SELECT 1 FROM biomedical_concept WHERE soa_id=? AND code=? LIMIT 1", + (soa_id, alias_uid), + ) + if cur.fetchone(): + continue # Still in use + + # This alias_code is orphaned; fetch its standard_code before deleting + cur.execute( + "SELECT standard_code FROM alias_code WHERE alias_code_uid=? AND soa_id=?", + (alias_uid, soa_id), + ) + code_row = cur.fetchone() + + # Delete the orphaned alias_code row + cur.execute( + "DELETE FROM alias_code WHERE alias_code_uid=? AND soa_id=?", + (alias_uid, soa_id), + ) + + # Cascade-delete the orphaned code row if it exists + if code_row: + code_uid = code_row[0] + cur.execute( + "DELETE FROM code WHERE code_uid=? AND soa_id=?", + (code_uid, soa_id), + ) + # Insert all new properties, creating code + alias_code rows per property for prop in properties_to_insert: # code row: stores the CDISC dataElementConceptId diff --git a/src/usdm/generate_biomedical_concepts.py b/src/usdm/generate_biomedical_concepts.py index fcd7dfa..c662d57 100644 --- a/src/usdm/generate_biomedical_concepts.py +++ b/src/usdm/generate_biomedical_concepts.py @@ -2,7 +2,7 @@ from concurrent.futures import ThreadPoolExecutor from typing import List, Dict, Any -from soa_builder.web.utils import _connect +from soa_builder.web.db import _connect from .usdm_utils import ( _get_biomedical_concept_synonyms as _get_biomedical_concept_synonyms, _get_biomedical_concept_properties as _get_biomedical_concept_properties, diff --git a/src/usdm/generate_encounters.py b/src/usdm/generate_encounters.py index b645ca7..e2488b7 100644 --- a/src/usdm/generate_encounters.py +++ b/src/usdm/generate_encounters.py @@ -224,6 +224,8 @@ def build_usdm_encounters(soa_id: int) -> List[Dict[str, Any]]: "notes": [], "instanceType": "Encounter", } + if timing_uid: + encounter["scheduledAt"] = timing_uid out.append(encounter) return out diff --git a/src/usdm/usdm_utils.py b/src/usdm/usdm_utils.py index ada8c43..6ead48c 100644 --- a/src/usdm/usdm_utils.py +++ b/src/usdm/usdm_utils.py @@ -185,7 +185,7 @@ def _get_biomedical_concept_synonyms(concept_code: str) -> List[str]: @functools.lru_cache(maxsize=256) def _get_biomedical_concept_properties( soa_id: int, biomedical_concept_uid: str -) -> Optional[Dict[str, any]]: +) -> List[Dict[str, Any]]: """Fetch biomedical concept properties from the database using the BiomedicalConcept_{}.""" conn = _connect() @@ -383,36 +383,58 @@ def _get_transition_start_rule( } -def _get_timing_name(soa_id: int, timing_id: Optional[int]) -> str: +def _get_timing_name(soa_id: int, timing_id: Optional[int]) -> Optional[str]: + """ + Get timing UID for a given timing ID. + + Args: + soa_id: The SOA ID + timing_id: The timing table ID (can be None) + + Returns: + Timing UID string or None if not found or timing_id is None + """ + if timing_id is None: + return None + conn = _connect() cur = conn.cursor() cur.execute( "SELECT timing_uid FROM timing WHERE id=? AND soa_id=?", - ( - timing_id, - soa_id, - ), + (timing_id, soa_id), ) row = cur.fetchone() conn.close() - timing_uid = row[0] if (row and row[0] is not None) else None - return timing_uid + return row[0] if (row and row[0] is not None) else None -def _get_code_tuple(soa_id: int, code_uid: str) -> Tuple[str, str]: +def _get_code_tuple(soa_id: int, code_uid: str) -> Tuple[List[str], List[str]]: + """ + Get code and code system lists for a given code UID. + + Args: + soa_id: The SOA ID + code_uid: The Code UID (e.g., 'Code_1') + + Returns: + Tuple of ([codes], [code_systems]) - both are lists that may be empty + + Note: + Returns lists because a single code_uid can map to multiple codes + via code_association table (e.g., same concept in multiple codelists). + Callers should handle list unpacking appropriately. + """ conn = _connect() cur = conn.cursor() cur.execute( - "SELECT DISTINCT c.codelist_table,c.code " + "SELECT DISTINCT c.codelist_table, c.code " "FROM code_association c WHERE c.soa_id=? AND c.code_uid=?", - ( - soa_id, - code_uid, - ), + (soa_id, code_uid), ) rows = cur.fetchall() conn.close() + code_system = [r[0] for r in rows] code = [r[1] for r in rows] @@ -564,12 +586,28 @@ def _get_element_ids(soa_id: int, study_cell_uid: str) -> List[str]: # Helpers for Study Epochs @functools.lru_cache(maxsize=256) -def _get_epoch_code_values(soa_id: int, epoch_type: str, code: str) -> Tuple[ - str, - str, - str, -]: +def _get_epoch_code_values( + soa_id: int, epoch_type: str, code: str +) -> Tuple[str, str, str]: + """ + Fetch epoch code values from CDISC Library API. + + Args: + soa_id: SOA ID (for logging/context) + epoch_type: Epoch type identifier + code: CDISC concept ID to look up (e.g., 'C99079') + + Returns: + Tuple of (code_system, code_system_version, decode) + Returns empty strings if API fails or term not found + """ logger = logging.getLogger("usdm.generate_epochs") + + # Initialize safe defaults (prevents UnboundLocalError) + code_system = "" + code_system_version = "" + decode = "" + url = "https://library.cdisc.org/api/mdr/ct/packages/sdtmct-2025-09-26/codelists/C99079" headers: dict[str, str] = {"Accept": "application/json"} subscription_key = os.environ.get("CDISC_SUBSCRIPTION_KEY") @@ -577,25 +615,55 @@ def _get_epoch_code_values(soa_id: int, epoch_type: str, code: str) -> Tuple[ "CDISC_SUBSCRIPTION_KEY" ) unified_key = subscription_key or api_key + if unified_key: headers["Ocp-Apim-Subscription-Key"] = unified_key if api_key: headers["Authorization"] = f"Bearer {api_key}" headers["api-key"] = api_key - resp = requests.get(url, headers=headers, timeout=10) - if resp.status_code != 200: - logger.exception("No response from {} for code {}".format(url, epoch_type)) - else: + try: + resp = requests.get(url, headers=headers, timeout=10) + if resp.status_code != 200: + logger.warning( + "Failed to fetch epoch codes from %s (status %d) for code %s", + url, + resp.status_code, + code, + ) + return code_system, code_system_version, decode + content = resp.json() parsed_url = urlparse(url) code_system = parsed_url.scheme + "://" + parsed_url.netloc code_system_version = parsed_url.path.split("/", 7)[5] - top_terms = content.get("terms") + # Guard against missing 'terms' key + top_terms = content.get("terms") or [] for term in top_terms: if term.get("conceptId") == code: - decode = term.get("submissionValue") + decode = term.get("submissionValue") or "" + break # Found matching term, exit loop + + if not decode: + logger.debug( + "No matching term found for conceptId=%s in %s", + code, + url, + ) + + except requests.RequestException as e: + logger.warning( + "Request error fetching epoch codes from %s: %s", + url, + e, + ) + except (ValueError, KeyError) as e: + logger.warning( + "Error parsing epoch code response from %s: %s", + url, + e, + ) return code_system, code_system_version, decode From 0121b6e917fecd24429586819afdea17f3614346 Mon Sep 17 00:00:00 2001 From: Darren <3921919+pendingintent@users.noreply.github.com> Date: Fri, 6 Mar 2026 15:18:07 -0500 Subject: [PATCH 5/7] Added CI components and new pre-commit-config --- .github/workflows/ci.yml | 75 ++++++++++++++++++++++++++++++++++++++++ .pre-commit-config.yaml | 25 +++++++++----- 2 files changed, 91 insertions(+), 9 deletions(-) create mode 100644 .github/workflows/ci.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..d520791 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,75 @@ +name: CI (Python) + +on: + push: + branches: [ "main", "master" ] + pull_request: + branches: [ "main", "master" ] + +permissions: + contents: read + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + test: + name: Python ${{ matrix.python-version }} + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: [ "3.10", "3.11", "3.12", "3.13" ] + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + cache: pip + cache-dependency-path: | + requirements.txt + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + # Core dev tooling + pip install pytest pytest-cov ruff flake8 + # Project deps + pip install -r requirements.txt + # If your package is installable (optional) + if [ -f pyproject.toml ] || [ -f setup.cfg ] || [ -f setup.py ]; then pip install -e .; fi + + - name: Lint (ruff) + run: | + # Lint package code and tests + ruff check src/soa_builder tests + + # Keep flake8 if you depend on its rules (syntax/errors must fail, style warnings are non-blocking) + - name: Lint (flake8) + run: | + flake8 src/soa_builder tests --count --select=E9,F63,F7,F82 --show-source --statistics + flake8 src/soa_builder tests --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics + + - name: Run tests (pytest + coverage) + run: | + mkdir -p reports + pytest -q tests \ + --maxfail=1 \ + --disable-warnings \ + --junitxml=reports/junit.xml \ + --cov=soa_builder \ + --cov-report=xml:reports/coverage.xml \ + --cov-report=term-missing + + - name: Upload test reports + if: always() + uses: actions/upload-artifact@v4 + with: + name: test-reports-py${{ matrix.python-version }} + path: reports/ + if-no-files-found: ignore \ No newline at end of file diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 452238e..604a483 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,23 +1,30 @@ +# See https://pre-commit.com for more information +# See https://pre-commit.com/hooks.html for more hooks repos: - # Optional: standard formatting/lint hooks - repo: https://github.com/psf/black - rev: 24.8.0 + rev: 24.3.0 hooks: - id: black + language_version: python3 + args: ['--line-length=88'] + # Standardize to actual directory structure (underscore) + files: ^(src/soa_builder/|tests/) - # Local hooks for tests/lint - repo: local hooks: - id: pytest name: Run pytest entry: pytest language: system - types: [python] pass_filenames: false + always_run: true + # Standardize to actual directory structure (underscore) + files: ^(src/soa_builder/|tests/) - # optional: reuse same flake8 config as GitHub Actions + - repo: https://github.com/PyCQA/flake8 + rev: 7.0.0 + hooks: - id: flake8 - name: Run flake8 - entry: flake8 src/soa_builder tests/ - language: system - pass_filenames: false \ No newline at end of file + args: ['--max-line-length=220', '--extend-ignore=E203,W503'] + # Standardize to actual directory structure (underscore) + files: ^(src/soa_builder/|tests/) \ No newline at end of file From 0e11d5e83a7825dcf447b5de69d134e91f53b1ab Mon Sep 17 00:00:00 2001 From: Darren <3921919+pendingintent@users.noreply.github.com> Date: Fri, 6 Mar 2026 15:27:21 -0500 Subject: [PATCH 6/7] Fixed ruff E402 Module level import not at top of file --- src/soa_builder/web/app.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/soa_builder/web/app.py b/src/soa_builder/web/app.py index 2c98063..38b3ea7 100644 --- a/src/soa_builder/web/app.py +++ b/src/soa_builder/web/app.py @@ -1,11 +1,10 @@ -from __future__ import annotations - +#!/usr/bin/env python3 """FastAPI web application for interactive Schedule of Activities creation. Data persisted in SQLite (file: soa_builder_web.db by default). """ - +from __future__ import annotations import csv import io import json @@ -79,7 +78,6 @@ from .routers import visits as visits_router from .routers import audits as audits_router from .routers import rules as rules_router - from .routers import timings as timings_router from .routers import schedule_timelines as schedule_timelines_router from .routers import cells as cells_router @@ -88,6 +86,7 @@ from .routers import tdd as tdd_router from .routers import decision_instances as decision_instances_router from .routers import condition_assignments as condition_assignments_router +from .audit import _record_element_audit # Avoid binding visit helpers directly to allow fresh reloads in tests @@ -117,9 +116,6 @@ get_scheduled_activity_instance, ) -# Audit functions -from .audit import _record_element_audit - def _configure_logging(): level = logging.INFO From b76bf242d8d82c56b58e012174aa7abdd14ff8f6 Mon Sep 17 00:00:00 2001 From: Darren <3921919+pendingintent@users.noreply.github.com> Date: Fri, 6 Mar 2026 15:51:13 -0500 Subject: [PATCH 7/7] Addressed ruff format issues --- .github/workflows/ci.yml | 9 ++------- .pre-commit-config.yaml | 22 ++++++---------------- normalize_soa.py | 1 + src/soa_builder/cli.py | 2 +- src/soa_builder/web/app.py | 15 ++++++++------- src/soa_builder/web/utils.py | 4 ++-- src/usdm/generate_usdm.py | 1 + tests/test_routers_elements.py | 2 +- tests/test_routers_instances.py | 4 +++- tests/test_study_cell_uid_reuse.py | 12 ++++++------ tests/test_study_cell_uid_reuse_later.py | 6 +++--- validate_soa.py | 1 + 12 files changed, 35 insertions(+), 44 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d520791..f6862c2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -20,7 +20,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: [ "3.10", "3.11", "3.12", "3.13" ] + python-version: [ "3.11", "3.12", "3.13", "3.14" ] steps: - name: Checkout @@ -47,13 +47,8 @@ jobs: - name: Lint (ruff) run: | # Lint package code and tests - ruff check src/soa_builder tests + ruff check src/ tests - # Keep flake8 if you depend on its rules (syntax/errors must fail, style warnings are non-blocking) - - name: Lint (flake8) - run: | - flake8 src/soa_builder tests --count --select=E9,F63,F7,F82 --show-source --statistics - flake8 src/soa_builder tests --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics - name: Run tests (pytest + coverage) run: | diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 604a483..99db2b9 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,14 +1,12 @@ # See https://pre-commit.com for more information # See https://pre-commit.com/hooks.html for more hooks repos: - - repo: https://github.com/psf/black - rev: 24.3.0 + - repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.15.5 hooks: - - id: black - language_version: python3 - args: ['--line-length=88'] - # Standardize to actual directory structure (underscore) - files: ^(src/soa_builder/|tests/) + - id: ruff-check + args: [ --fix ] + - id: ruff-format - repo: local hooks: @@ -19,12 +17,4 @@ repos: pass_filenames: false always_run: true # Standardize to actual directory structure (underscore) - files: ^(src/soa_builder/|tests/) - - - repo: https://github.com/PyCQA/flake8 - rev: 7.0.0 - hooks: - - id: flake8 - args: ['--max-line-length=220', '--extend-ignore=E203,W503'] - # Standardize to actual directory structure (underscore) - files: ^(src/soa_builder/|tests/) \ No newline at end of file + files: ^(src/|tests/) diff --git a/normalize_soa.py b/normalize_soa.py index 2df52e9..d5041eb 100644 --- a/normalize_soa.py +++ b/normalize_soa.py @@ -31,6 +31,7 @@ - Support rule recurrence expansion into concrete scheduled instances. - Add endpoints linkage and CRF page mapping. """ + from __future__ import annotations import argparse diff --git a/src/soa_builder/cli.py b/src/soa_builder/cli.py index cace32e..2e83aac 100644 --- a/src/soa_builder/cli.py +++ b/src/soa_builder/cli.py @@ -266,7 +266,7 @@ def cmd_expand( with open(json_out, "w", encoding="utf-8") as f: json.dump([inst.__dict__ for inst in instances], f, indent=2) click.echo( - f"Instances written: CSV={csv_out}{' JSON='+json_out if json_out else ''}" + f"Instances written: CSV={csv_out}{' JSON=' + json_out if json_out else ''}" ) diff --git a/src/soa_builder/web/app.py b/src/soa_builder/web/app.py index 38b3ea7..01b73ea 100644 --- a/src/soa_builder/web/app.py +++ b/src/soa_builder/web/app.py @@ -4,6 +4,7 @@ Data persisted in SQLite (file: soa_builder_web.db by default). """ + from __future__ import annotations import csv import io @@ -587,7 +588,9 @@ def _normalize_cell(cell: dict) -> dict: axis_type = ( "instance" if cell.get("instance_id") is not None - else "visit" if cell.get("visit_id") is not None else None + else "visit" + if cell.get("visit_id") is not None + else None ) axis_id = None if axis_type == "instance": @@ -3927,7 +3930,7 @@ def add(line: str): content_for_offsets.append(obj) final_body = "".join(content_for_offsets) xref_start = len(final_body.encode("utf-8")) - xref = ["xref\n", f"0 {len(objects)+1}\n", "0000000000 65535 f \n"] + xref = ["xref\n", f"0 {len(objects) + 1}\n", "0000000000 65535 f \n"] # True offsets: header length + cumulative lengths before each object cumulative = len(pdf_parts[0].encode("utf-8")) obj_offsets = [] @@ -3936,7 +3939,7 @@ def add(line: str): cumulative += len(obj.encode("utf-8")) for off in obj_offsets: xref.append(f"{off:010d} 00000 n \n") - trailer = f"trailer << /Size {len(objects)+1} /Root 1 0 R >>\nstartxref\n{xref_start}\n%%EOF" + trailer = f"trailer << /Size {len(objects) + 1} /Root 1 0 R >>\nstartxref\n{xref_start}\n%%EOF" pdf_bytes = (final_body + "".join(xref) + trailer).encode("utf-8") filename = f"soa_{soa_id}_summary.pdf" return Response( @@ -4385,9 +4388,9 @@ def ui_edit(request: Request, soa_id: int): if secs < 60: last_fetch_relative = f"{secs}s ago" elif secs < 3600: - last_fetch_relative = f"{secs//60}m ago" + last_fetch_relative = f"{secs // 60}m ago" else: - last_fetch_relative = f"{secs//3600}h ago" + last_fetch_relative = f"{secs // 3600}h ago" freeze_list = _list_freezes(soa_id) last_frozen_at = freeze_list[0]["created_at"] if freeze_list else None # Study metadata for edit form @@ -4509,7 +4512,6 @@ def ui_edit(request: Request, soa_id: int): code_map[eid] = code conn_em.close() try: - code_to_submission = load_epoch_type_map(force=False) or {} except Exception: code_to_submission = {} @@ -7155,7 +7157,6 @@ def main(): if __name__ == "__main__": - main() diff --git a/src/soa_builder/web/utils.py b/src/soa_builder/web/utils.py index a7875a7..9ca0913 100644 --- a/src/soa_builder/web/utils.py +++ b/src/soa_builder/web/utils.py @@ -1008,7 +1008,7 @@ def load_environmental_setting_options(force: bool = False) -> List[dict[str, st _env_setting_cache.update(options=[], fetched_at=now, last_error="missing slug") return [] - url = f"https://library.cdisc.org/api/mdr/ct/packages/" f"{slug}/codelists/C127262" + url = f"https://library.cdisc.org/api/mdr/ct/packages/{slug}/codelists/C127262" headers: dict[str, str] = {"Accept": "application/json"} subscription_key = os.environ.get("CDISC_SUBSCRIPTION_KEY") api_key = os.environ.get("CDISC_API_KEY") or subscription_key @@ -1100,7 +1100,7 @@ def load_contact_mode_options(force: bool = False) -> List[dict[str, str]]: ) return [] - url = f"https://library.cdisc.org/api/mdr/ct/packages/" f"{slug}/codelists/C171445" + url = f"https://library.cdisc.org/api/mdr/ct/packages/{slug}/codelists/C171445" headers: dict[str, str] = {"Accept": "application/json"} subscription_key = os.environ.get("CDISC_SUBSCRIPTION_KEY") api_key = os.environ.get("CDISC_API_KEY") or subscription_key diff --git a/src/usdm/generate_usdm.py b/src/usdm/generate_usdm.py index 175f1c2..7e9bb6d 100644 --- a/src/usdm/generate_usdm.py +++ b/src/usdm/generate_usdm.py @@ -5,6 +5,7 @@ Produces a Study-Output → StudyVersion-Output → InterventionalStudyDesign-Output hierarchy, populating sub-entities from the existing per-entity generators. """ + from typing import List, Dict, Any import logging from .usdm_utils import _get_soa_metadata diff --git a/tests/test_routers_elements.py b/tests/test_routers_elements.py index a3da30f..b60e63f 100644 --- a/tests/test_routers_elements.py +++ b/tests/test_routers_elements.py @@ -199,7 +199,7 @@ def test_bulk_create_elements(): # Create multiple elements for i in range(5): client.post( - f"/ui/soa/{soa_id}/elements/create", data={"name": f"Element {i+1}"} + f"/ui/soa/{soa_id}/elements/create", data={"name": f"Element {i + 1}"} ) # Verify all created diff --git a/tests/test_routers_instances.py b/tests/test_routers_instances.py index c313817..5c6f22e 100644 --- a/tests/test_routers_instances.py +++ b/tests/test_routers_instances.py @@ -208,7 +208,9 @@ def test_create_multiple_instances(): # Create 3 instances for i in range(3): - resp = client.post(f"/soa/{soa_id}/instances", json={"name": f"Instance {i+1}"}) + resp = client.post( + f"/soa/{soa_id}/instances", json={"name": f"Instance {i + 1}"} + ) assert resp.status_code == 201 # Verify all created diff --git a/tests/test_study_cell_uid_reuse.py b/tests/test_study_cell_uid_reuse.py index 817a0bc..1f20932 100644 --- a/tests/test_study_cell_uid_reuse.py +++ b/tests/test_study_cell_uid_reuse.py @@ -78,9 +78,9 @@ def test_study_cell_uid_unique_per_row(): conn.close() assert len(sc_rows) >= 2 uids = [r[0] for r in sc_rows] - assert len(uids) == len( - set(uids) - ), "Each study_cell row must have a unique study_cell_uid" + assert len(uids) == len(set(uids)), ( + "Each study_cell row must have a unique study_cell_uid" + ) # Idempotence check: submitting the same element again should not create a duplicate row form_dup = {"arm_uid": arm_uid, "epoch_uid": epoch_uid, "element_uids": [el_b]} @@ -94,6 +94,6 @@ def test_study_cell_uid_unique_per_row(): ) cnt = cur.fetchone()[0] conn.close() - assert ( - cnt == 1 - ), "Duplicate submission should not create a second row for the same element" + assert cnt == 1, ( + "Duplicate submission should not create a second row for the same element" + ) diff --git a/tests/test_study_cell_uid_reuse_later.py b/tests/test_study_cell_uid_reuse_later.py index 05ce394..dc85856 100644 --- a/tests/test_study_cell_uid_reuse_later.py +++ b/tests/test_study_cell_uid_reuse_later.py @@ -102,6 +102,6 @@ def test_study_cell_uid_unique_on_later_addition(): ) cnt = cur.fetchone()[0] conn.close() - assert ( - cnt == 1 - ), "Duplicate submission should not create a second row for the same element" + assert cnt == 1, ( + "Duplicate submission should not create a second row for the same element" + ) diff --git a/validate_soa.py b/validate_soa.py index d42b91e..75fe9eb 100644 --- a/validate_soa.py +++ b/validate_soa.py @@ -7,6 +7,7 @@ Exit code: 0 if all checks pass, 1 if any fail. """ + from __future__ import annotations import argparse