From 67e2d8d797fa30aa111836e7cea218bb45d906f3 Mon Sep 17 00:00:00 2001 From: "Mark A. Miller" Date: Sat, 19 Apr 2025 11:55:37 -0400 Subject: [PATCH 1/5] dump mongodb to cachedir cache.db --- .gitignore | 4 +- make-gold-cache.Makefile | 25 ++++++- sample_annotator/gold_to_mongo.py | 19 +++++- sample_annotator/rebuild_gold_cache.py | 92 ++++++++++++++++++++++++++ 4 files changed, 133 insertions(+), 7 deletions(-) create mode 100644 sample_annotator/rebuild_gold_cache.py diff --git a/.gitignore b/.gitignore index 08a5a6e..8059d99 100644 --- a/.gitignore +++ b/.gitignore @@ -12,7 +12,7 @@ googlemaps-api-key.txt bin -cachedir +cachedir.prev examples/outputs temp/ @@ -63,7 +63,7 @@ sphinx/_build/* # Cacheing tests/cachedir/ -./cachedir/ +cachedir.prev/ # https://raw.githubusercontent.com/github/gitignore/main/Python.gitignore diff --git a/make-gold-cache.Makefile b/make-gold-cache.Makefile index 7291a96..03fa0c2 100644 --- a/make-gold-cache.Makefile +++ b/make-gold-cache.Makefile @@ -4,7 +4,7 @@ RUN=poetry run -.PHONY: load-gold-biosamples-into-mongo +.PHONY: gold-to-mongo-all gold-to-mongo-clean load-gold-biosamples-into-mongo rebuild-gold-cache-from-mongodb gold-to-mongo-all: gold-to-mongo-clean load-gold-biosamples-into-mongo @@ -39,13 +39,34 @@ local/gold-study-ids-with-biosamples.txt: downloads/goldData.xlsx # # --purge-diskcache # # --env-file +#load-gold-biosamples-into-mongo: local/gold-study-ids-with-biosamples.txt +# $(RUN) gold-to-mongo \ +# --authentication-file config/gold-key.txt \ +# --log-failures-to-file local/gold-to-mongo-failures.json \ +# --mongo-uri "mongodb://localhost:27017/gold_metadata" \ +# --study-ids-file $< + + load-gold-biosamples-into-mongo: local/gold-study-ids-with-biosamples.txt $(RUN) gold-to-mongo \ --authentication-file config/gold-key.txt \ + --env-file local/.env.27778 \ --log-failures-to-file local/gold-to-mongo-failures.json \ - --mongo-uri "mongodb://localhost:27017/gold_metadata" \ + --mongo-uri "mongodb://localhost:27778/staging?directConnection=true&authMechanism=DEFAULT&authSource=admin" \ --study-ids-file $< +#rebuild-gold-cache-from-mongodb: +# poetry run python sample_annotator/rebuild_gold_cache.py \ +# --mongo-uri "mongodb://localhost:27778/staging?directConnection=true&authMechanism=DEFAULT&authSource=admin" \ +# --env-file local/.env.27778 \ +# --cache-dir cache_from_mongodb + + +rebuild-gold-cache-from-mongodb: + poetry run python sample_annotator/rebuild_gold_cache.py \ + --mongo-uri "mongodb://localhost:27017/gold_metadata" \ + --cache-dir cache_from_local_mongodb + local/gold-cache.json: local/gold-studies.tsv # ~ 3 seconds/uncached study # GOLD has ~ 63k studies diff --git a/sample_annotator/gold_to_mongo.py b/sample_annotator/gold_to_mongo.py index 5448209..b80c6ab 100644 --- a/sample_annotator/gold_to_mongo.py +++ b/sample_annotator/gold_to_mongo.py @@ -1,6 +1,8 @@ import json import logging import os +import shutil +import time from datetime import datetime from time import sleep from typing import List, Optional, Set @@ -61,14 +63,12 @@ def build_mongodb_connection_string( # Inject user/pass escaped_user = quote_plus(user) escaped_password = quote_plus(password) - # Find where to insert credentials protocol_split = mongo_uri.split("://", 1) if len(protocol_split) != 2: raise ValueError("Invalid MongoDB URI") scheme, rest = protocol_split return f"{scheme}://{escaped_user}:{escaped_password}@{rest}" - # If no URI provided, build a basic local one if not user or not password: return "mongodb://localhost:27017/" escaped_user = quote_plus(user) @@ -110,8 +110,17 @@ def get_processed_study_ids(db) -> Set[str]: return processed_ids +def backup_cache_dir(cache_dir: str, backup_dir: str = "local") -> None: + if os.path.exists(cache_dir): + ts = time.strftime("%Y%m%d_%H%M%S") + backup_path = os.path.join(backup_dir, f"{os.path.basename(cache_dir)}_backup_{ts}") + shutil.copytree(cache_dir, backup_path) + logging.info(f"Backed up cache to: {backup_path}") + + @click.command() @click.option('--authentication-file', '-a', default="config/gold-key.txt") +@click.option('--backup-cache', is_flag=True, default=True) @click.option('--env-file', '-e') @click.option('--log-failures-to-file', type=click.Path(writable=True), default=None) @click.option('--max-retries', '-m', type=int, default=3) @@ -122,7 +131,11 @@ def get_processed_study_ids(db) -> Set[str]: @click.option('--study-ids-file', '-i', type=click.Path(exists=True), required=True) def main(study_ids_file: str, authentication_file: str, mongo_uri: Optional[str], env_file: Optional[str], purge_mongodb: bool, purge_diskcache: bool, - resume: bool, max_retries: int, log_failures_to_file: Optional[str]): + resume: bool, max_retries: int, log_failures_to_file: Optional[str], + backup_cache: bool): + if backup_cache: + backup_cache_dir("cachedir", backup_dir="local") + mongo_creds = load_mongodb_credentials(env_file) conn_str = build_mongodb_connection_string( mongo_uri=mongo_uri, diff --git a/sample_annotator/rebuild_gold_cache.py b/sample_annotator/rebuild_gold_cache.py new file mode 100644 index 0000000..98f4058 --- /dev/null +++ b/sample_annotator/rebuild_gold_cache.py @@ -0,0 +1,92 @@ +import logging +import os + +import click +import dotenv +from pymongo import MongoClient +from diskcache import Cache + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +API_BASE = "https://gold.jgi.doe.gov/rest/nmdc" + + +def load_mongodb_credentials(env_file: str) -> dict: + if env_file: + dotenv.load_dotenv(env_file) + return { + 'user': os.environ.get('MONGODB_USER'), + 'password': os.environ.get('MONGODB_PASSWORD') + } + + +def normalize_id(id_str: str) -> str: + return id_str.replace("gold:", "") + + +@click.command() +@click.option('--mongo-uri', required=True, help='MongoDB URI including database name') +@click.option('--env-file', '-e', required=False, help='Path to .env file with MongoDB user/pass') +@click.option('--cache-dir', default="cachedir", help='Path to diskcache directory (default: cachedir)') +def rebuild_gold_cache(mongo_uri: str, env_file: str, cache_dir: str): + """ + Rebuild diskcache entries for GoldClient by inserting MongoDB-stored responses + directly into the cache without calling the API again. + """ + mongo_creds = load_mongodb_credentials(env_file) if env_file else {'user': None, 'password': None} + user = mongo_creds.get('user') + password = mongo_creds.get('password') + + # If no credentials are present, continue unauthenticated + if not user or not password: + logging.warning("No GOLD credentials found in .env; proceeding without authentication") + user = "" + password = "" + + logging.info(f"Connecting to MongoDB: {mongo_uri}") + client = MongoClient(mongo_uri, username=mongo_creds.get('user'), password=mongo_creds.get('password')) + db = client.get_default_database() + + cache = Cache(cache_dir) + + studies = list(db.studies.find({}, {'_id': 0})) + biosamples = list(db.biosamples.find({}, {'_id': 0})) + projects = list(db.seq_projects.find({}, {'_id': 0})) + + # DEBUG: Confirm MongoDB contents + logging.info(f"Sample study: {studies[0] if studies else 'EMPTY'}") + logging.info(f"Sample biosample: {biosamples[0] if biosamples else 'EMPTY'}") + logging.info(f"Sample project: {projects[0] if projects else 'EMPTY'}") + + logging.info( + f"Inserting {len(studies)} studies, {len(biosamples)} biosamples, and {len(projects)} projects into cache") + + for i, study in enumerate(studies): + study_id = normalize_id(study["studyGoldId"]) + key = (f"{API_BASE}/studies", ("studyGoldId", study_id), user, password) + cache.set(key, [study]) + + biosamples_by_study = {} + for prj in projects: + sid = prj.get("studyGoldId") + bsid = prj.get("biosampleGoldId") + if sid and bsid: + sid_norm = normalize_id(sid) + biosamples_by_study.setdefault(sid_norm, []).append(bsid) + + biosample_lookup = {b["biosampleGoldId"]: b for b in biosamples if "biosampleGoldId" in b} + + for sid, bsids in biosamples_by_study.items(): + bs_payload = [biosample_lookup[bid] for bid in bsids if bid in biosample_lookup] + if not bs_payload: + continue + key = (f"{API_BASE}/biosamples", ("studyGoldId", sid), user, password) + cache.set(key, bs_payload) + + cache.close() + logging.info(f"Cache closed and flushed to disk at {cache_dir}") + + +if __name__ == '__main__': + rebuild_gold_cache() From 20628aacb4bdf3452897e383934b7147029900e3 Mon Sep 17 00:00:00 2001 From: "Mark A. Miller" Date: Sat, 19 Apr 2025 16:10:01 -0400 Subject: [PATCH 2/5] single tool --- make-gold-cache.Makefile | 19 +- pyproject.toml | 3 +- sample_annotator/gold_to_mongo.py | 316 ++++++++---- sample_annotator/gold_tool.py | 685 +++++++++++++++++++++++++ sample_annotator/rebuild_gold_cache.py | 186 +++++-- 5 files changed, 1058 insertions(+), 151 deletions(-) create mode 100644 sample_annotator/gold_tool.py diff --git a/make-gold-cache.Makefile b/make-gold-cache.Makefile index 03fa0c2..c8415bf 100644 --- a/make-gold-cache.Makefile +++ b/make-gold-cache.Makefile @@ -42,31 +42,28 @@ local/gold-study-ids-with-biosamples.txt: downloads/goldData.xlsx #load-gold-biosamples-into-mongo: local/gold-study-ids-with-biosamples.txt # $(RUN) gold-to-mongo \ # --authentication-file config/gold-key.txt \ +# --cache-dir cache_from_local_mongodb \ +# --env-file local/.env.27778 \ # --log-failures-to-file local/gold-to-mongo-failures.json \ -# --mongo-uri "mongodb://localhost:27017/gold_metadata" \ +# --mongo-uri "mongodb://localhost:27778/staging?directConnection=true&authMechanism=DEFAULT&authSource=admin" \ # --study-ids-file $< load-gold-biosamples-into-mongo: local/gold-study-ids-with-biosamples.txt $(RUN) gold-to-mongo \ --authentication-file config/gold-key.txt \ - --env-file local/.env.27778 \ + --cache-dir cache_from_local_mongodb \ --log-failures-to-file local/gold-to-mongo-failures.json \ - --mongo-uri "mongodb://localhost:27778/staging?directConnection=true&authMechanism=DEFAULT&authSource=admin" \ + --mongo-uri "mongodb://localhost:27017/gold_metadata_2" \ --study-ids-file $< -#rebuild-gold-cache-from-mongodb: -# poetry run python sample_annotator/rebuild_gold_cache.py \ -# --mongo-uri "mongodb://localhost:27778/staging?directConnection=true&authMechanism=DEFAULT&authSource=admin" \ -# --env-file local/.env.27778 \ -# --cache-dir cache_from_mongodb - - rebuild-gold-cache-from-mongodb: - poetry run python sample_annotator/rebuild_gold_cache.py \ + poetry run rebuild-gold-cache \ --mongo-uri "mongodb://localhost:27017/gold_metadata" \ + --authentication-file config/gold-key.txt \ --cache-dir cache_from_local_mongodb + local/gold-cache.json: local/gold-studies.tsv # ~ 3 seconds/uncached study # GOLD has ~ 63k studies diff --git a/pyproject.toml b/pyproject.toml index 3308697..030f3f2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -77,4 +77,5 @@ cache-gold = "sample_annotator.clients.gold_client:main" extract-study-ids-with-biosamples = "sample_annotator.file_utils.extract_study_ids_with_biosamples:main" gold-to-mongo = "sample_annotator.gold_to_mongo:main" rel_to_oxygen_example = "sample_annotator.rel_to_oxygen_example:cli" -xlsx-to-tsv = "sample_annotator.file_utils.xlsx_to_tsv:xlsx_to_tsv" \ No newline at end of file +xlsx-to-tsv = "sample_annotator.file_utils.xlsx_to_tsv:xlsx_to_tsv" +rebuild-gold-cache = "sample_annotator.rebuild_gold_cache:rebuild_gold_cache" diff --git a/sample_annotator/gold_to_mongo.py b/sample_annotator/gold_to_mongo.py index b80c6ab..ff154df 100644 --- a/sample_annotator/gold_to_mongo.py +++ b/sample_annotator/gold_to_mongo.py @@ -1,39 +1,38 @@ import json import logging +import math import os -import shutil -import time from datetime import datetime from time import sleep -from typing import List, Optional, Set +from typing import List, Optional, Set, Dict, Any from urllib.parse import quote_plus +from tqdm import tqdm import click import dotenv +from diskcache import Cache from pymongo import MongoClient, ASCENDING from pymongo.errors import DuplicateKeyError from pymongo.uri_parser import parse_uri +from requests.auth import HTTPBasicAuth +import requests + +import sample_annotator.clients.gold_client as gold_client # Fix import path for both direct script execution and CLI entry point try: # When running as a script - from clients.gold_client import GoldClient + from clients.gold_client import GoldClient, build_cache_key except ModuleNotFoundError: # When running as an installed package - from sample_annotator.clients.gold_client import GoldClient - -# todo might need better API error handling -# should be more consistent about bundling (projects in biosamples) vs getting biosamples separate from studies - -# todo document the fact that a biosamples key is added to studies -# biosamples have no foreign keys -# (sequencing) projects include native study and biosample foreign keys + from sample_annotator.clients.gold_client import GoldClient, build_cache_key # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') def load_mongodb_credentials(env_file: Optional[str] = None) -> dict: + """Load MongoDB credentials from environment file.""" if env_file: dotenv.load_dotenv(env_file) else: @@ -77,31 +76,38 @@ def build_mongodb_connection_string( def create_unique_index(collection, field_name: str, index_name: str) -> None: + """Create a unique index on a MongoDB collection.""" try: collection.create_index([(field_name, ASCENDING)], name=index_name, unique=True) except Exception as e: logging.error(f"Failed to create index '{index_name}': {e}") -def insert_document(collection, document: dict, key_name: str) -> None: +def insert_document(collection, document: dict, key_name: str) -> bool: + """Insert a document into MongoDB, handling duplicate key errors.""" try: collection.insert_one(document) + return True except DuplicateKeyError: logging.warning(f"Duplicate key error for {key_name}") + return False def process_study_ids(file_path: str) -> List[str]: + """Extract study IDs from a file.""" ids = set() with open(file_path) as file: for line in file: + line = line.strip() if line.startswith("Gs"): - ids.add(line.strip()) + ids.add(line) if not ids: raise ValueError(f"No valid IDs found in {file_path}") return sorted(ids) def get_processed_study_ids(db) -> Set[str]: + """Get a set of study IDs that have already been processed in MongoDB.""" study_collection = db['studies'] processed_ids = set() for study in study_collection.find({}, {'studyGoldId': 1}): @@ -110,32 +116,97 @@ def get_processed_study_ids(db) -> Set[str]: return processed_ids -def backup_cache_dir(cache_dir: str, backup_dir: str = "local") -> None: - if os.path.exists(cache_dir): - ts = time.strftime("%Y%m%d_%H%M%S") - backup_path = os.path.join(backup_dir, f"{os.path.basename(cache_dir)}_backup_{ts}") - shutil.copytree(cache_dir, backup_path) - logging.info(f"Backed up cache to: {backup_path}") +class EnhancedGoldClient(GoldClient): + """Enhanced version of GoldClient with direct cache access.""" + + def _call(self, endpoint: str, params: Dict = {}) -> Any: + """ + Enhanced version of _call that directly checks the cache before + using the memoized function. + """ + (user, passwd) = self.gold_key + endpoint_url = f"{self.url}/{endpoint}" + + # Check both cache key formats + key_data = build_cache_key(endpoint_url, params, user, passwd) + simple_key = f"{endpoint_url}-{params.get('studyGoldId', '')}-{user}-{passwd}" + + # Try to get directly from cache first + if gold_client.cache and key_data in gold_client.cache: + logging.info(f"[cache] Direct cache hit for {endpoint} {params}") + return gold_client.cache.get(key_data) + elif gold_client.cache and simple_key in gold_client.cache: + logging.info(f"[cache] Simple key cache hit for {endpoint} {params}") + return gold_client.cache.get(simple_key) + + # Fall back to original method if not in cache + logging.warning(f"[cache] Cache miss for {endpoint} {params}, making API call") + self.num_calls += 1 + + # Make API call directly rather than using memoize + attempt = 0 + while attempt < 4: + try: + results = requests.get( + endpoint_url, params=params, auth=HTTPBasicAuth(user, passwd) + ) + logging.info(f"STATUS={results.status_code}") + if results.status_code == 200: + data = results.json() + # Store in cache for future use + if gold_client.cache: + gold_client.cache.set(key_data, data) + gold_client.cache.set(simple_key, data) + return data + else: + logging.error( + f"API call to {endpoint_url} failed, code={results.status_code}; attempt={attempt} [pausing]" + ) + sleep(5 ** attempt) + except Exception as e: + logging.error(f"Exception during API call: {e}") + sleep(5 ** attempt) + + attempt += 1 + + raise Exception(f"API call to {endpoint_url} failed after {attempt} attempts") @click.command() -@click.option('--authentication-file', '-a', default="config/gold-key.txt") -@click.option('--backup-cache', is_flag=True, default=True) -@click.option('--env-file', '-e') -@click.option('--log-failures-to-file', type=click.Path(writable=True), default=None) -@click.option('--max-retries', '-m', type=int, default=3) -@click.option('--mongo-uri', '-u', required=True) -@click.option('--purge-diskcache', '-P', is_flag=True, default=False) -@click.option('--purge-mongodb', '-p', is_flag=True, default=False) -@click.option('--resume', '-r', is_flag=True, default=True) -@click.option('--study-ids-file', '-i', type=click.Path(exists=True), required=True) +@click.option('--authentication-file', '-a', default="config/gold-key.txt", + help="Path to GOLD API authentication file (user:pass)") +@click.option('--env-file', '-e', help="Path to environment file with MongoDB credentials") +@click.option('--log-failures-to-file', type=click.Path(writable=True), default=None, + help="Path to write failure logs as JSON") +@click.option('--max-retries', '-m', type=int, default=3, + help="Maximum number of retries for failed API calls") +@click.option('--mongo-uri', '-u', required=True, + help="MongoDB URI including database name") +@click.option('--purge-diskcache', '-P', is_flag=True, default=False, + help="Purge the disk cache before processing") +@click.option('--purge-mongodb', '-p', is_flag=True, default=False, + help="Purge the MongoDB collections before processing") +@click.option('--resume', '-r', is_flag=True, default=True, + help="Resume from last run, skipping already processed studies") +@click.option('--study-ids-file', '-i', type=click.Path(exists=True), required=True, + help="File containing study IDs to process") +@click.option('--cache-dir', default="cachedir", + help="Path to diskcache directory") +@click.option('--batch-size', type=int, default=50, + help="Number of studies to process in each batch") +@click.option('--progress-bar/--no-progress-bar', default=True, + help="Show progress bars for processing") def main(study_ids_file: str, authentication_file: str, mongo_uri: Optional[str], env_file: Optional[str], purge_mongodb: bool, purge_diskcache: bool, resume: bool, max_retries: int, log_failures_to_file: Optional[str], - backup_cache: bool): - if backup_cache: - backup_cache_dir("cachedir", backup_dir="local") + cache_dir: str, batch_size: int, progress_bar: bool) -> None: + """ + Load GOLD metadata into MongoDB using a local cache to minimize API calls. + This script processes study IDs from a file, fetches their metadata from GOLD API + or a local cache, and stores the data in MongoDB collections. + """ + # Connect to MongoDB mongo_creds = load_mongodb_credentials(env_file) conn_str = build_mongodb_connection_string( mongo_uri=mongo_uri, @@ -160,6 +231,7 @@ def main(study_ids_file: str, authentication_file: str, mongo_uri: Optional[str] logging.error(f"MongoDB connection failed: {e}") return + # Set up MongoDB collections if purge_mongodb: logging.info("Purging MongoDB collections...") db.drop_collection('biosamples') @@ -174,103 +246,145 @@ def main(study_ids_file: str, authentication_file: str, mongo_uri: Optional[str] project_collection = db['seq_projects'] failure_collection = db['study_import_failures'] + # Create indices for faster lookups and uniqueness constraints create_unique_index(biosample_collection, "biosampleGoldId", "biosampleGoldId_index") create_unique_index(study_collection, "studyGoldId", "studyGoldId_index") create_unique_index(project_collection, "projectGoldId", "projectGoldId_index") create_unique_index(failure_collection, "studyGoldId", "failedStudy_index") - gc = GoldClient() + # Initialize the cache for GoldClient + gold_client.set_cache_directory(cache_dir) + logging.info(f"Using disk cache directory: {cache_dir}") + + # Create enhanced GoldClient that checks cache directly + gc = EnhancedGoldClient() + if purge_diskcache: logging.info("Purging disk cache...") gc.clear_cache() + gc.load_key(authentication_file) + # Process study IDs from file study_ids = process_study_ids(study_ids_file) total_studies = len(study_ids) + logging.info(f"Found {total_studies} studies to process") + # Identify studies that have already been processed processed_study_ids = set() if resume: processed_study_ids = get_processed_study_ids(db) if processed_study_ids: - logging.info(f"Found {len(processed_study_ids)} studies already in MongoDB that will be skipped") + already_processed = len(processed_study_ids) + logging.info(f"Found {already_processed} studies already in MongoDB that will be skipped") + logging.info(f"Remaining studies to process: {total_studies - already_processed}") + # Process studies in batches completed = 0 failed = 0 skipped = 0 failed_study_logs = [] - for study_id in study_ids: - if resume and study_id in processed_study_ids: - logging.info(f"Skipping study {study_id} (already in MongoDB)") - skipped += 1 - continue - - logging.info(f"Processing study {study_id} ({completed + skipped + 1}/{total_studies})...") - - retry_count = 0 - success = False - - while retry_count <= max_retries and not success: - try: - study = gc.fetch_study(study_id) - if not study: - logging.warning(f"No data returned for study {study_id}, skipping") - failed += 1 - failure_doc = { - 'studyGoldId': study_id, - 'error': "No data returned (null response)", - 'timestamp': datetime.utcnow(), - 'failed': True - } - insert_document(failure_collection, failure_doc, study_id) - failed_study_logs.append(failure_doc) - break - - biosamples = gc.fetch_biosamples_by_study(study_id) - logging.info(f"Retrieved {len(biosamples)} biosamples for study {study_id}") - - biosample_ids = [] - for biosample in biosamples: - biosample_id = biosample.get('biosampleGoldId') - if biosample_id: - biosample_ids.append(biosample_id) - - for project in biosample.pop('projects', []): - insert_document(project_collection, project, project.get('projectGoldId', 'Unknown')) - insert_document(biosample_collection, biosample, biosample_id) - - study['biosamples'] = biosample_ids - insert_document(study_collection, study, study_id) - - completed += 1 - success = True - - except Exception as e: - retry_count += 1 - if retry_count <= max_retries: - wait_time = 5 * retry_count - logging.warning( - f"Error processing study {study_id}: {e}. Retrying in {wait_time} seconds (attempt {retry_count}/{max_retries})") - sleep(wait_time) - else: - logging.error(f"Failed to process study {study_id} after {max_retries} attempts: {e}") - failed += 1 - failure_doc = { - 'studyGoldId': study_id, - 'error': str(e), - 'timestamp': datetime.utcnow(), - 'failed': True - } - insert_document(failure_collection, failure_doc, study_id) - failed_study_logs.append(failure_doc) - + # Determine number of batches + num_batches = math.ceil(total_studies / batch_size) + + # Process each batch + for batch_num in range(num_batches): + batch_start = batch_num * batch_size + batch_end = min(batch_start + batch_size, total_studies) + batch = study_ids[batch_start:batch_end] + + logging.info(f"Processing batch {batch_num + 1}/{num_batches} ({len(batch)} studies)") + + # Use tqdm for progress bar if enabled + iterator = tqdm(batch, desc=f"Batch {batch_num + 1}") if progress_bar else batch + + # Process each study in the batch + for study_id in iterator: + if resume and study_id in processed_study_ids: + logging.info(f"Skipping study {study_id} (already in MongoDB)") + skipped += 1 + continue + + logging.info(f"Processing study {study_id} ({completed + skipped + 1}/{total_studies})...") + + retry_count = 0 + success = False + + while retry_count <= max_retries and not success: + try: + # Fetch study data + study = gc.fetch_study(study_id) + if not study: + logging.warning(f"No data returned for study {study_id}, skipping") + failed += 1 + failure_doc = { + 'studyGoldId': study_id, + 'error': "No data returned (null response)", + 'timestamp': datetime.utcnow(), + 'failed': True + } + insert_document(failure_collection, failure_doc, study_id) + failed_study_logs.append(failure_doc) + break + + # Fetch and process biosamples + biosamples = gc.fetch_biosamples_by_study(study_id) + logging.info(f"Retrieved {len(biosamples)} biosamples for study {study_id}") + + # Store biosamples and their projects + biosample_ids = [] + for biosample in biosamples: + biosample_id = biosample.get('biosampleGoldId') + if biosample_id: + biosample_ids.append(biosample_id) + + # Extract projects before inserting biosample + projects = biosample.pop('projects', []) + + # Insert biosample + insert_document(biosample_collection, biosample, biosample_id) + + # Insert projects + for project in projects: + insert_document(project_collection, project, project.get('projectGoldId', 'Unknown')) + + # Add biosample IDs to study and insert into MongoDB + study['biosamples'] = biosample_ids + insert_document(study_collection, study, study_id) + + completed += 1 + success = True + + except Exception as e: + retry_count += 1 + if retry_count <= max_retries: + wait_time = 5 * retry_count + logging.warning( + f"Error processing study {study_id}: {e}. Retrying in {wait_time} seconds (attempt {retry_count}/{max_retries})") + sleep(wait_time) + else: + logging.error(f"Failed to process study {study_id} after {max_retries} attempts: {e}") + failed += 1 + failure_doc = { + 'studyGoldId': study_id, + 'error': str(e), + 'timestamp': datetime.utcnow(), + 'failed': True + } + insert_document(failure_collection, failure_doc, study_id) + failed_study_logs.append(failure_doc) + + # Report results logging.info(f"Import completed: {completed} studies processed, {skipped} skipped, {failed} failed") + # Write failure logs if requested if log_failures_to_file and failed_study_logs: with open(log_failures_to_file, 'w') as f: json.dump(failed_study_logs, f, indent=2, default=str) - logging.info(f"Wrote failed study logs to {log_failures_to_file}") + logging.info(f"Wrote {len(failed_study_logs)} failed study logs to {log_failures_to_file}") + # Close connections client.close() logging.info("MongoDB connection closed") diff --git a/sample_annotator/gold_tool.py b/sample_annotator/gold_tool.py new file mode 100644 index 0000000..368d562 --- /dev/null +++ b/sample_annotator/gold_tool.py @@ -0,0 +1,685 @@ +import hashlib +import json +import logging +import os +import pickle +import sys +from datetime import datetime +from time import sleep +from typing import Dict, List, Optional, Set, Any, Union, TextIO, Tuple +from urllib.parse import quote_plus +from tqdm import tqdm + +import click +import dotenv +import requests +import yaml +from diskcache import Cache +from pymongo import MongoClient, ASCENDING +from pymongo.errors import DuplicateKeyError +from pymongo.uri_parser import parse_uri +from requests.auth import HTTPBasicAuth + +# Type definitions +USERPASS = Tuple[str, str] +URL = str +JSON = Any +SampleDict = JSON +StudyDict = JSON +ProjectDict = JSON +ApDict = JSON +FILENAME = Union[str, bytes, os.PathLike] + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +# Global cache +cache = None +API_BASE = "https://gold.jgi.doe.gov/rest/nmdc" + +# Exclusion list for problematic study IDs +EXCLUSION_LIST = [] + + +def ensure_cache_initialized(cache_dir="cachedir"): + """Ensure the cache is initialized.""" + global cache + if cache is None: + cache = Cache(cache_dir) + logging.info(f"Cache initialized at {cache_dir} with {len(cache)} records") + + +def build_cache_key(endpoint_url: str, params: dict, user: str, passwd: str) -> tuple: + """Build a cache key in a consistent format.""" + normalized_params = tuple(sorted(params.items())) + key_data = (endpoint_url, normalized_params, user, passwd) + return key_data + + +def normalize_id(id_str: str) -> str: + """Normalize GOLD IDs by removing 'gold:' prefix.""" + return id_str.replace("gold:", "") + + +def load_mongodb_credentials(env_file: Optional[str] = None) -> dict: + """Load MongoDB credentials from environment file.""" + if env_file: + dotenv.load_dotenv(env_file) + else: + default_env_path = os.path.join('local', '.env') + if os.path.exists(default_env_path): + dotenv.load_dotenv(default_env_path) + return { + 'user': os.environ.get('MONGODB_USER'), + 'password': os.environ.get('MONGODB_PASSWORD') + } + + +def build_mongodb_connection_string( + mongo_uri: Optional[str] = None, + user: Optional[str] = None, + password: Optional[str] = None) -> str: + """Build MongoDB connection string with credentials.""" + if mongo_uri: + parsed = parse_uri(mongo_uri) + if parsed.get('username') or parsed.get('password'): + return mongo_uri # already has credentials + if not user or not password: + return mongo_uri # let it connect anonymously + # Inject user/pass + escaped_user = quote_plus(user) + escaped_password = quote_plus(password) + protocol_split = mongo_uri.split("://", 1) + if len(protocol_split) != 2: + raise ValueError("Invalid MongoDB URI") + scheme, rest = protocol_split + return f"{scheme}://{escaped_user}:{escaped_password}@{rest}" + + if not user or not password: + return "mongodb://localhost:27017/" + escaped_user = quote_plus(user) + escaped_password = quote_plus(password) + return f"mongodb://{escaped_user}:{escaped_password}@localhost:27017/" + + +def create_unique_index(collection, field_name: str, index_name: str) -> None: + """Create a unique index on a MongoDB collection.""" + try: + collection.create_index([(field_name, ASCENDING)], name=index_name, unique=True) + except Exception as e: + logging.error(f"Failed to create index '{index_name}': {e}") + + +def insert_document(collection, document: dict, key_name: str) -> bool: + """Insert a document into MongoDB, handling duplicate key errors.""" + try: + collection.insert_one(document) + return True + except DuplicateKeyError: + logging.warning(f"Duplicate key error for {key_name}") + return False + + +def process_study_ids(file_path: str) -> List[str]: + """Extract study IDs from a file.""" + ids = set() + with open(file_path) as file: + for line in file: + line = line.strip() + if line.startswith("Gs"): + ids.add(line) + if not ids: + raise ValueError(f"No valid IDs found in {file_path}") + return sorted(ids) + + +def get_processed_study_ids(db) -> Set[str]: + """Get a set of study IDs that have already been processed in MongoDB.""" + study_collection = db['studies'] + processed_ids = set() + for study in study_collection.find({}, {'studyGoldId': 1}): + if 'studyGoldId' in study: + processed_ids.add(study['studyGoldId']) + return processed_ids + + +def fetch_url(endpoint_url: str, params: Dict, user: str, passwd: str, max_retries: int = 3) -> JSON: + """Fetch data from GOLD API with retries.""" + logging.info(f"API call to {endpoint_url} with params {params}") + attempt = 0 + while attempt <= max_retries: + try: + results = requests.get( + endpoint_url, params=params, auth=HTTPBasicAuth(user, passwd) + ) + logging.info(f"STATUS={results.status_code}") + if results.status_code == 200: + return results.json() + else: + logging.error( + f"API call failed, code={results.status_code}; attempt={attempt} [pausing]" + ) + except Exception as e: + logging.error(f"API call exception: {e}; attempt={attempt} [pausing]") + + # Retry with backoff + attempt += 1 + if attempt <= max_retries: + wait_time = 5 * attempt + logging.info(f"Retrying in {wait_time} seconds...") + sleep(wait_time) + + raise Exception(f"API call to {endpoint_url} failed after {max_retries + 1} attempts") + + +class GoldClient: + """ + A client for the GOLD API with caching support. + Can fetch studies, biosamples, projects, and other data. + """ + + def __init__(self, cache_dir: str = "cachedir"): + self.gold_key = None + self.url = API_BASE + self.num_calls = 0 + ensure_cache_initialized(cache_dir) + + def load_key(self, path: str) -> None: + """Load API key from file.""" + with open(path) as stream: + lines = stream.readlines() + [user, passwd] = lines[0].strip().split(":") + self.gold_key = user, passwd + + def clear_cache(self) -> None: + """Clear the cache.""" + global cache + if cache: + cache.clear() + logging.info("Cache cleared") + + def _normalize_id(self, id: str) -> str: + """Normalize GOLD IDs.""" + return normalize_id(id) + + def _call(self, endpoint: str, params: Dict = {}) -> JSON: + """ + Call GOLD API with caching. + First checks cache, then falls back to API call. + """ + (user, passwd) = self.gold_key + endpoint_url = f"{self.url}/{endpoint}" + + # Try cache first + key = build_cache_key(endpoint_url, params, user, passwd) + if key in cache: + logging.info(f"Cache hit for {endpoint} {params}") + return cache.get(key) + + # Fall back to API call + logging.info(f"Cache miss for {endpoint} {params}") + data = fetch_url(endpoint_url, params, user, passwd) + + # Cache the result + cache.set(key, data) + self.num_calls += 1 + + return data + + def fetch_projects_by_study(self, id: str) -> List[SampleDict]: + """Fetch projects for a study.""" + id = self._normalize_id(id) + results = self._call("projects", {"studyGoldId": id}) + return results + + def fetch_biosamples_by_study(self, id: str, include_project=True) -> List[SampleDict]: + """ + Fetch biosamples for a study. + Optionally includes project data. + """ + id = self._normalize_id(id) + if id in EXCLUSION_LIST: + biosamples = [] + else: + biosamples = self._call("biosamples", {"studyGoldId": id}) + if include_project: + projects = self.fetch_projects_by_study(id) + + # Map biosamples by ID for easy lookup + samples_by_id = { + sample["biosampleGoldId"]: sample for sample in biosamples + } + + # Add projects to their biosamples + for project in projects: + sample_id = project.get("biosampleGoldId") + if not sample_id or sample_id not in samples_by_id: + continue + + sample = samples_by_id[sample_id] + if "projects" not in sample: + sample["projects"] = [] + sample["projects"].append(project) + + return biosamples + + def fetch_study(self, id: str, include_biosamples=False) -> StudyDict: + """ + Fetch a study by ID. + Optionally includes biosamples. + """ + id = self._normalize_id(id) + logging.info(f"Fetching study: {id}") + results = self._call("studies", {"studyGoldId": id}) + + if not results: + logging.warning(f"No study found for ID: {id}") + return {} + + study = results[0] + if include_biosamples: + study["biosamples"] = self.fetch_biosamples_by_study(id) + + return study + + def fetch_studies(self, ids: List[str], **kwargs) -> List[StudyDict]: + """Fetch multiple studies.""" + logging.info(f"Fetching {len(ids)} studies") + studies = [] + for id in tqdm(ids, desc="Fetching studies"): + studies.append(self.fetch_study(id, **kwargs)) + return studies + + def rebuild_cache_from_mongodb( + self, + mongo_uri: str, + mongo_user: Optional[str] = None, + mongo_password: Optional[str] = None + ) -> None: + """ + Rebuild the cache from MongoDB data. + Caches studies, biosamples, and projects. + """ + logging.info(f"Rebuilding cache from MongoDB: {mongo_uri}") + + # Connect to MongoDB + conn_str = build_mongodb_connection_string(mongo_uri, mongo_user, mongo_password) + client = MongoClient(conn_str) + db = client.get_default_database() + + # Load collections + studies = list(db.studies.find({}, {'_id': 0})) + biosamples = list(db.biosamples.find({}, {'_id': 0})) + projects = list(db.seq_projects.find({}, {'_id': 0})) + + logging.info(f"Found {len(studies)} studies, {len(biosamples)} biosamples, " + f"and {len(projects)} projects in MongoDB") + + # Clear existing cache + self.clear_cache() + + # Cache studies + for study in tqdm(studies, desc="Caching studies"): + study_id = normalize_id(study["studyGoldId"]) + key = build_cache_key(f"{self.url}/studies", {"studyGoldId": study_id}, + self.gold_key[0], self.gold_key[1]) + cache.set(key, [study]) # API returns as list + + # Group biosamples by study + biosamples_by_study = {} + projects_by_study = {} + + # Group projects by study and build biosample->study mapping + biosample_to_study = {} + for project in tqdm(projects, desc="Processing projects"): + study_id = project.get("studyGoldId") + biosample_id = project.get("biosampleGoldId") + + if study_id: + study_id = normalize_id(study_id) + if study_id not in projects_by_study: + projects_by_study[study_id] = [] + projects_by_study[study_id].append(project) + + if biosample_id and study_id: + biosample_to_study[biosample_id] = normalize_id(study_id) + + # Group biosamples by study + for biosample in tqdm(biosamples, desc="Processing biosamples"): + biosample_id = biosample.get("biosampleGoldId") + if biosample_id in biosample_to_study: + study_id = biosample_to_study[biosample_id] + if study_id not in biosamples_by_study: + biosamples_by_study[study_id] = [] + biosamples_by_study[study_id].append(biosample) + + # Cache projects + for study_id, study_projects in tqdm(projects_by_study.items(), desc="Caching projects"): + key = build_cache_key(f"{self.url}/projects", {"studyGoldId": study_id}, + self.gold_key[0], self.gold_key[1]) + cache.set(key, study_projects) + + # Cache biosamples + for study_id, study_biosamples in tqdm(biosamples_by_study.items(), desc="Caching biosamples"): + key = build_cache_key(f"{self.url}/biosamples", {"studyGoldId": study_id}, + self.gold_key[0], self.gold_key[1]) + cache.set(key, study_biosamples) + + logging.info(f"Cache rebuild complete. Cache now contains {len(cache)} entries.") + + def load_to_mongodb( + self, + study_ids: List[str], + mongo_uri: str, + mongo_user: Optional[str] = None, + mongo_password: Optional[str] = None, + resume: bool = True, + max_retries: int = 3, + batch_size: int = 50 + ) -> Dict: + """ + Load GOLD data for specified studies into MongoDB. + Uses cache when available, falls back to API calls. + Returns statistics about the operation. + """ + logging.info(f"Loading {len(study_ids)} studies to MongoDB: {mongo_uri}") + + # Connect to MongoDB + conn_str = build_mongodb_connection_string(mongo_uri, mongo_user, mongo_password) + client = MongoClient(conn_str) + db = client.get_default_database() + + # Set up collections + biosample_collection = db['biosamples'] + study_collection = db['studies'] + project_collection = db['seq_projects'] + failure_collection = db['study_import_failures'] + + # Create indices + create_unique_index(biosample_collection, "biosampleGoldId", "biosampleGoldId_index") + create_unique_index(study_collection, "studyGoldId", "studyGoldId_index") + create_unique_index(project_collection, "projectGoldId", "projectGoldId_index") + create_unique_index(failure_collection, "studyGoldId", "failedStudy_index") + + # Find already processed studies + processed_ids = set() + if resume: + processed_ids = get_processed_study_ids(db) + if processed_ids: + logging.info(f"Found {len(processed_ids)} studies already in MongoDB that will be skipped") + + # Process studies in batches + stats = {"completed": 0, "skipped": 0, "failed": 0} + failed_studies = [] + + # Calculate number of batches + num_batches = (len(study_ids) + batch_size - 1) // batch_size + + for batch_idx in range(num_batches): + start_idx = batch_idx * batch_size + end_idx = min(start_idx + batch_size, len(study_ids)) + batch = study_ids[start_idx:end_idx] + + logging.info(f"Processing batch {batch_idx + 1}/{num_batches} ({len(batch)} studies)") + + for study_id in tqdm(batch, desc=f"Batch {batch_idx + 1}"): + if resume and study_id in processed_ids: + logging.info(f"Skipping study {study_id} (already in MongoDB)") + stats["skipped"] += 1 + continue + + logging.info(f"Processing study {study_id} " + f"({stats['completed'] + stats['skipped'] + 1}/{len(study_ids)})...") + + retry_count = 0 + success = False + + while retry_count <= max_retries and not success: + try: + # Fetch study and biosamples + study = self.fetch_study(study_id) + if not study: + logging.warning(f"No data returned for study {study_id}, skipping") + stats["failed"] += 1 + failed_studies.append({ + 'studyGoldId': study_id, + 'error': "No data returned (null response)", + 'timestamp': datetime.utcnow().isoformat(), + 'failed': True + }) + break + + biosamples = self.fetch_biosamples_by_study(study_id) + logging.info(f"Retrieved {len(biosamples)} biosamples for study {study_id}") + + # Process biosamples and projects + biosample_ids = [] + for biosample in biosamples: + biosample_id = biosample.get('biosampleGoldId') + if biosample_id: + biosample_ids.append(biosample_id) + + # Extract and insert projects + for project in biosample.pop('projects', []): + insert_document(project_collection, project, + project.get('projectGoldId', 'Unknown')) + + # Insert biosample + insert_document(biosample_collection, biosample, biosample_id) + + # Insert study with biosample references + study['biosamples'] = biosample_ids + insert_document(study_collection, study, study_id) + + stats["completed"] += 1 + success = True + + except Exception as e: + retry_count += 1 + if retry_count <= max_retries: + wait_time = 5 * retry_count + logging.warning( + f"Error processing study {study_id}: {e}. " + f"Retrying in {wait_time} seconds " + f"(attempt {retry_count}/{max_retries})") + sleep(wait_time) + else: + logging.error(f"Failed to process study {study_id} after " + f"{max_retries} attempts: {e}") + stats["failed"] += 1 + failed_studies.append({ + 'studyGoldId': study_id, + 'error': str(e), + 'timestamp': datetime.utcnow().isoformat(), + 'failed': True + }) + insert_document(failure_collection, { + 'studyGoldId': study_id, + 'error': str(e), + 'timestamp': datetime.utcnow(), + 'failed': True + }, study_id) + + # Close connection + client.close() + logging.info(f"Import completed: {stats['completed']} studies processed, " + f"{stats['skipped']} skipped, {stats['failed']} failed") + + return { + "stats": stats, + "failed_studies": failed_studies + } + + +@click.group() +@click.option('-v', '--verbose', count=True, help='Increase verbosity') +@click.option('-q', '--quiet', is_flag=True, help='Decrease verbosity') +@click.option('--cache-dir', default="cachedir", help='Path to cache directory') +@click.pass_context +def cli(ctx, verbose, quiet, cache_dir): + """GOLD API client with caching and MongoDB integration.""" + # Set up logging + if verbose >= 2: + logging.basicConfig(level=logging.DEBUG) + elif verbose == 1: + logging.basicConfig(level=logging.INFO) + else: + logging.basicConfig(level=logging.WARNING) + if quiet: + logging.basicConfig(level=logging.ERROR) + + # Initialize context + ctx.ensure_object(dict) + ctx.obj['cache_dir'] = cache_dir + ensure_cache_initialized(cache_dir) + + +@cli.command() +@click.argument('idfile') +@click.option('-o', '--output', type=click.File('w'), default=sys.stdout, help='Output file') +@click.option('-O', '--output-format', type=click.Choice(['json', 'yaml']), default='json', + help='Output format') +@click.option('--include-biosamples/--no-include-biosamples', default=False, + help='Include biosamples in study data') +@click.option('--clear-cache/--no-clear-cache', default=False, help='Clear cache before fetching') +@click.option('-a', '--authentication-file', default="config/gold-key.txt", + help='Path to authentication file') +@click.pass_context +def fetch_studies(ctx, idfile, output, output_format, include_biosamples, clear_cache, + authentication_file): + """ + Fetch studies from GOLD API and save as JSON or YAML. + + IDFILE is a file containing GOLD study IDs (Gs...) one per line. + """ + # Initialize client + gc = GoldClient(ctx.obj['cache_dir']) + gc.load_key(authentication_file) + + if clear_cache: + gc.clear_cache() + + # Read study IDs + ids = [] + with open(idfile) as file: + for line in file: + if line.startswith("Gs"): + ids.append(line.strip()) + + if not ids: + raise click.BadParameter(f"No study IDs found in {idfile}") + + # Fetch studies + studies = gc.fetch_studies(ids, include_biosamples=include_biosamples) + + # Write output + if output_format == 'yaml': + yaml.dump(studies, output, default_flow_style=False, sort_keys=False) + else: + json.dump(studies, output, indent=2) + + logging.info(f"Fetched {len(studies)} studies. API calls: {gc.num_calls}") + + +@cli.command() +@click.option('-i', '--study-ids-file', required=True, type=click.Path(exists=True), + help='File containing study IDs') +@click.option('-u', '--mongo-uri', required=True, help='MongoDB URI') +@click.option('-e', '--env-file', help='Environment file with MongoDB credentials') +@click.option('-a', '--authentication-file', default="config/gold-key.txt", + help='GOLD API authentication file') +@click.option('--log-failures-to-file', type=click.Path(), help='Write failures to JSON file') +@click.option('-r', '--resume/--no-resume', default=True, help='Skip already processed studies') +@click.option('-b', '--batch-size', default=50, help='Number of studies per batch') +@click.option('-m', '--max-retries', default=3, help='Maximum retry attempts') +@click.pass_context +def load_to_mongodb(ctx, study_ids_file, mongo_uri, env_file, authentication_file, + log_failures_to_file, resume, batch_size, max_retries): + """ + Load GOLD data into MongoDB. + + Fetches studies, biosamples, and projects from GOLD API + and stores them in MongoDB collections. + """ + # Load MongoDB credentials if provided + mongo_creds = load_mongodb_credentials(env_file) if env_file else {} + + # Initialize client + gc = GoldClient(ctx.obj['cache_dir']) + gc.load_key(authentication_file) + + # Read study IDs + study_ids = process_study_ids(study_ids_file) + + # Load to MongoDB + result = gc.load_to_mongodb( + study_ids=study_ids, + mongo_uri=mongo_uri, + mongo_user=mongo_creds.get('user'), + mongo_password=mongo_creds.get('password'), + resume=resume, + max_retries=max_retries, + batch_size=batch_size + ) + + # Write failures to file if requested + if log_failures_to_file and result['failed_studies']: + with open(log_failures_to_file, 'w') as f: + json.dump(result['failed_studies'], f, indent=2) + logging.info(f"Wrote {len(result['failed_studies'])} failed study logs to {log_failures_to_file}") + + +@cli.command() +@click.option('--mongo-uri', required=True, help='MongoDB URI including database name') +@click.option('-e', '--env-file', help='Path to .env file with MongoDB credentials') +@click.option('-a', '--authentication-file', default="config/gold-key.txt", + help='GOLD API authentication file') +@click.pass_context +def rebuild_cache(ctx, mongo_uri, env_file, authentication_file): + """ + Rebuild cache from MongoDB data. + + Uses existing MongoDB collections to populate cache + without making API calls. + """ + # Load MongoDB credentials if provided + mongo_creds = load_mongodb_credentials(env_file) if env_file else {} + + # Initialize client + gc = GoldClient(ctx.obj['cache_dir']) + gc.load_key(authentication_file) + + # Rebuild cache + gc.rebuild_cache_from_mongodb( + mongo_uri=mongo_uri, + mongo_user=mongo_creds.get('user'), + mongo_password=mongo_creds.get('password') + ) + + +@cli.command() +@click.pass_context +def inspect_cache(ctx): + """ + Print information about cache contents. + """ + c = Cache(ctx.obj['cache_dir']) + click.echo(f"Cache directory: {c.directory}") + click.echo(f"Cache contains {len(c)} records") + + # Print some example keys + for i, key in enumerate(c): + if i >= 10: # limit to 10 entries + break + value = c.get(key) + click.echo(f"Key {i}: {key}") + if isinstance(value, list): + click.echo(f" Value type: list with {len(value)} items") + if value and 'studyGoldId' in value[0]: + click.echo(f" Study ID: {value[0]['studyGoldId']}") + else: + click.echo(f" Value type: {type(value)}") + + +if __name__ == "__main__": + cli(obj={}) diff --git a/sample_annotator/rebuild_gold_cache.py b/sample_annotator/rebuild_gold_cache.py index 98f4058..165bad2 100644 --- a/sample_annotator/rebuild_gold_cache.py +++ b/sample_annotator/rebuild_gold_cache.py @@ -1,11 +1,16 @@ +import hashlib import logging import os +import pickle +from tqdm import tqdm import click import dotenv from pymongo import MongoClient from diskcache import Cache +from sample_annotator.clients.gold_client import build_cache_key + # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') @@ -21,68 +26,173 @@ def load_mongodb_credentials(env_file: str) -> dict: } +def load_gold_key(path: str) -> tuple[str, str]: + with open(path) as f: + return tuple(f.read().strip().split(":")) + + def normalize_id(id_str: str) -> str: return id_str.replace("gold:", "") +def calculate_key_hash(key_data): + """Calculate the hash of a key for logging purposes.""" + key_bytes = pickle.dumps(key_data) + hashed_key = hashlib.sha256(key_bytes).hexdigest() + return hashed_key + + @click.command() @click.option('--mongo-uri', required=True, help='MongoDB URI including database name') @click.option('--env-file', '-e', required=False, help='Path to .env file with MongoDB user/pass') +@click.option('--authentication-file', required=True, help='GOLD API key file (user:pass)') @click.option('--cache-dir', default="cachedir", help='Path to diskcache directory (default: cachedir)') -def rebuild_gold_cache(mongo_uri: str, env_file: str, cache_dir: str): +@click.option('--debug', is_flag=True, help='Print extra debug information') +@click.option('--batch-size', default=1000, help='Batch size for processing biosamples') +def rebuild_gold_cache(mongo_uri: str, env_file: str, authentication_file: str, cache_dir: str, + debug: bool, batch_size: int): """ Rebuild diskcache entries for GoldClient by inserting MongoDB-stored responses directly into the cache without calling the API again. """ - mongo_creds = load_mongodb_credentials(env_file) if env_file else {'user': None, 'password': None} - user = mongo_creds.get('user') - password = mongo_creds.get('password') - - # If no credentials are present, continue unauthenticated - if not user or not password: - logging.warning("No GOLD credentials found in .env; proceeding without authentication") - user = "" - password = "" + mongo_user = mongo_password = None + if env_file: + mongo_creds = load_mongodb_credentials(env_file) + mongo_user = mongo_creds.get('user') + mongo_password = mongo_creds.get('password') + # Connect to MongoDB logging.info(f"Connecting to MongoDB: {mongo_uri}") - client = MongoClient(mongo_uri, username=mongo_creds.get('user'), password=mongo_creds.get('password')) + client = MongoClient(mongo_uri, username=mongo_user, password=mongo_password) db = client.get_default_database() + # Load GOLD API credentials + user, password = load_gold_key(authentication_file) + + # Initialize diskcache cache = Cache(cache_dir) - studies = list(db.studies.find({}, {'_id': 0})) - biosamples = list(db.biosamples.find({}, {'_id': 0})) - projects = list(db.seq_projects.find({}, {'_id': 0})) + # Clear the cache first to avoid any issues with existing entries + logging.info(f"Clearing existing cache at {cache_dir}") + cache.clear() - # DEBUG: Confirm MongoDB contents - logging.info(f"Sample study: {studies[0] if studies else 'EMPTY'}") - logging.info(f"Sample biosample: {biosamples[0] if biosamples else 'EMPTY'}") - logging.info(f"Sample project: {projects[0] if projects else 'EMPTY'}") + # Get counts for progress tracking + studies_count = db.studies.count_documents({}) + projects_count = db.seq_projects.count_documents({}) + biosamples_count = db.biosamples.count_documents({}) logging.info( - f"Inserting {len(studies)} studies, {len(biosamples)} biosamples, and {len(projects)} projects into cache") + f"Found {studies_count} studies, {projects_count} projects, and {biosamples_count} biosamples in MongoDB") + + # -------------------------- + # Phase 1: Cache studies and projects + # -------------------------- + logging.info("Phase 1: Caching studies and projects") + + # Create lookup of biosample IDs to study IDs + logging.info("Building biosample to study mapping from projects...") + biosample_to_study = {} + for project in tqdm(db.seq_projects.find({}, {'studyGoldId': 1, 'biosampleGoldId': 1}), + total=projects_count, desc="Building biosample mapping"): + bs_id = project.get('biosampleGoldId') + study_id = project.get('studyGoldId') + if bs_id and study_id: + biosample_to_study[bs_id] = study_id + + logging.info(f"Created mapping for {len(biosample_to_study)} biosamples to their studies") + + # Find all study IDs + study_ids = [doc['studyGoldId'] for doc in db.studies.find({}, {'studyGoldId': 1, '_id': 0})] + + # Process each study + study_count = 0 + project_count = 0 + + for study_id in tqdm(study_ids, desc="Caching studies and projects"): + norm_study_id = normalize_id(study_id) + + # Get the full study document + study = db.studies.find_one({'studyGoldId': study_id}, {'_id': 0}) + if not study: + continue + + # Cache study data using both formats + study_key = build_cache_key(f"{API_BASE}/studies", {"studyGoldId": norm_study_id}, user, password) + cache.set(study_key, [study]) + + # Also cache with a simpler key format for the memoize decorator + simple_study_key = f"{API_BASE}/studies-{norm_study_id}-{user}-{password}" + cache.set(simple_study_key, [study]) + + study_count += 1 + + # Get and cache projects for this study + projects = list(db.seq_projects.find({'studyGoldId': study_id}, {'_id': 0})) + if projects: + projects_key = build_cache_key(f"{API_BASE}/projects", {"studyGoldId": norm_study_id}, user, password) + cache.set(projects_key, projects) + + # Also cache with simpler key format + simple_projects_key = f"{API_BASE}/projects-{norm_study_id}-{user}-{password}" + cache.set(simple_projects_key, projects) + + project_count += len(projects) - for i, study in enumerate(studies): - study_id = normalize_id(study["studyGoldId"]) - key = (f"{API_BASE}/studies", ("studyGoldId", study_id), user, password) - cache.set(key, [study]) + if debug: + logging.info(f"Cached {len(projects)} projects for study {study_id}") + logging.info(f"Cached {study_count} studies and {project_count} projects") + + # -------------------------- + # Phase 2: Cache biosamples by study + # -------------------------- + logging.info("Phase 2: Caching biosamples by study") + + # Create a dictionary to group biosamples by study biosamples_by_study = {} - for prj in projects: - sid = prj.get("studyGoldId") - bsid = prj.get("biosampleGoldId") - if sid and bsid: - sid_norm = normalize_id(sid) - biosamples_by_study.setdefault(sid_norm, []).append(bsid) - - biosample_lookup = {b["biosampleGoldId"]: b for b in biosamples if "biosampleGoldId" in b} - - for sid, bsids in biosamples_by_study.items(): - bs_payload = [biosample_lookup[bid] for bid in bsids if bid in biosample_lookup] - if not bs_payload: - continue - key = (f"{API_BASE}/biosamples", ("studyGoldId", sid), user, password) - cache.set(key, bs_payload) + + # Process biosamples in batches to avoid memory issues + biosample_count = 0 + total_batches = (biosamples_count + batch_size - 1) // batch_size + + for batch_num in tqdm(range(total_batches), desc="Processing biosample batches"): + skip = batch_num * batch_size + + batch = list(db.biosamples.find({}, {'_id': 0}).skip(skip).limit(batch_size)) + + for biosample in batch: + bs_id = biosample.get('biosampleGoldId') + if not bs_id or bs_id not in biosample_to_study: + continue + + study_id = biosample_to_study[bs_id] + norm_study_id = normalize_id(study_id) + + if norm_study_id not in biosamples_by_study: + biosamples_by_study[norm_study_id] = [] + + biosamples_by_study[norm_study_id].append(biosample) + biosample_count += 1 + + # Cache the biosamples by study + logging.info(f"Caching {len(biosamples_by_study)} study-biosample groups") + + for study_id, biosamples in tqdm(biosamples_by_study.items(), desc="Caching biosamples by study"): + # Cache using both key formats + bs_key = build_cache_key(f"{API_BASE}/biosamples", {"studyGoldId": study_id}, user, password) + cache.set(bs_key, biosamples) + + simple_bs_key = f"{API_BASE}/biosamples-{study_id}-{user}-{password}" + cache.set(simple_bs_key, biosamples) + + if debug: + logging.info(f"Cached {len(biosamples)} biosamples for study {study_id}") + + # Report on cache size and contents + logging.info(f"Cache now contains {len(cache)} records") + logging.info(f" - {study_count} studies") + logging.info(f" - {project_count} projects") + logging.info(f" - {biosample_count} biosamples") cache.close() logging.info(f"Cache closed and flushed to disk at {cache_dir}") From f9e20529f26e9976052853a9a03d88ac8fcea2b1 Mon Sep 17 00:00:00 2001 From: "Mark A. Miller" Date: Sat, 19 Apr 2025 16:12:32 -0400 Subject: [PATCH 3/5] ignore common cache dirs --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 8059d99..57e5d09 100644 --- a/.gitignore +++ b/.gitignore @@ -12,7 +12,8 @@ googlemaps-api-key.txt bin -cachedir.prev +cachedir +cache_from_local_mongodb examples/outputs temp/ From 75aa800dbb0f19905133dc9790d596821a99cd24 Mon Sep 17 00:00:00 2001 From: "Mark A. Miller" Date: Sat, 19 Apr 2025 16:13:11 -0400 Subject: [PATCH 4/5] more thorough cache ignoring --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 57e5d09..aae7e55 100644 --- a/.gitignore +++ b/.gitignore @@ -64,7 +64,7 @@ sphinx/_build/* # Cacheing tests/cachedir/ -cachedir.prev/ +cachedir # https://raw.githubusercontent.com/github/gitignore/main/Python.gitignore From 997dd065e8ea55bcf7b345533bf26b18c69e049c Mon Sep 17 00:00:00 2001 From: "Mark A. Miller" Date: Sat, 19 Apr 2025 16:14:29 -0400 Subject: [PATCH 5/5] remove legacy code --- sample_annotator/gold_to_mongo.py | 393 ------------------------- sample_annotator/rebuild_gold_cache.py | 202 ------------- 2 files changed, 595 deletions(-) delete mode 100644 sample_annotator/gold_to_mongo.py delete mode 100644 sample_annotator/rebuild_gold_cache.py diff --git a/sample_annotator/gold_to_mongo.py b/sample_annotator/gold_to_mongo.py deleted file mode 100644 index ff154df..0000000 --- a/sample_annotator/gold_to_mongo.py +++ /dev/null @@ -1,393 +0,0 @@ -import json -import logging -import math -import os -from datetime import datetime -from time import sleep -from typing import List, Optional, Set, Dict, Any -from urllib.parse import quote_plus -from tqdm import tqdm - -import click -import dotenv -from diskcache import Cache -from pymongo import MongoClient, ASCENDING -from pymongo.errors import DuplicateKeyError -from pymongo.uri_parser import parse_uri -from requests.auth import HTTPBasicAuth -import requests - -import sample_annotator.clients.gold_client as gold_client - -# Fix import path for both direct script execution and CLI entry point -try: - # When running as a script - from clients.gold_client import GoldClient, build_cache_key -except ModuleNotFoundError: - # When running as an installed package - from sample_annotator.clients.gold_client import GoldClient, build_cache_key - -# Configure logging -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') - - -def load_mongodb_credentials(env_file: Optional[str] = None) -> dict: - """Load MongoDB credentials from environment file.""" - if env_file: - dotenv.load_dotenv(env_file) - else: - default_env_path = os.path.join('local', '.env') - if os.path.exists(default_env_path): - dotenv.load_dotenv(default_env_path) - return { - 'user': os.environ.get('MONGODB_USER'), - 'password': os.environ.get('MONGODB_PASSWORD') - } - - -def build_mongodb_connection_string( - mongo_uri: Optional[str] = None, - user: Optional[str] = None, - password: Optional[str] = None) -> str: - """ - Injects user and password into an existing MongoDB URI if missing. - If no URI is provided, falls back to localhost unauthenticated. - """ - if mongo_uri: - parsed = parse_uri(mongo_uri) - if parsed.get('username') or parsed.get('password'): - return mongo_uri # already has credentials - if not user or not password: - return mongo_uri # let it connect anonymously - # Inject user/pass - escaped_user = quote_plus(user) - escaped_password = quote_plus(password) - protocol_split = mongo_uri.split("://", 1) - if len(protocol_split) != 2: - raise ValueError("Invalid MongoDB URI") - scheme, rest = protocol_split - return f"{scheme}://{escaped_user}:{escaped_password}@{rest}" - - if not user or not password: - return "mongodb://localhost:27017/" - escaped_user = quote_plus(user) - escaped_password = quote_plus(password) - return f"mongodb://{escaped_user}:{escaped_password}@localhost:27017/" - - -def create_unique_index(collection, field_name: str, index_name: str) -> None: - """Create a unique index on a MongoDB collection.""" - try: - collection.create_index([(field_name, ASCENDING)], name=index_name, unique=True) - except Exception as e: - logging.error(f"Failed to create index '{index_name}': {e}") - - -def insert_document(collection, document: dict, key_name: str) -> bool: - """Insert a document into MongoDB, handling duplicate key errors.""" - try: - collection.insert_one(document) - return True - except DuplicateKeyError: - logging.warning(f"Duplicate key error for {key_name}") - return False - - -def process_study_ids(file_path: str) -> List[str]: - """Extract study IDs from a file.""" - ids = set() - with open(file_path) as file: - for line in file: - line = line.strip() - if line.startswith("Gs"): - ids.add(line) - if not ids: - raise ValueError(f"No valid IDs found in {file_path}") - return sorted(ids) - - -def get_processed_study_ids(db) -> Set[str]: - """Get a set of study IDs that have already been processed in MongoDB.""" - study_collection = db['studies'] - processed_ids = set() - for study in study_collection.find({}, {'studyGoldId': 1}): - if 'studyGoldId' in study: - processed_ids.add(study['studyGoldId']) - return processed_ids - - -class EnhancedGoldClient(GoldClient): - """Enhanced version of GoldClient with direct cache access.""" - - def _call(self, endpoint: str, params: Dict = {}) -> Any: - """ - Enhanced version of _call that directly checks the cache before - using the memoized function. - """ - (user, passwd) = self.gold_key - endpoint_url = f"{self.url}/{endpoint}" - - # Check both cache key formats - key_data = build_cache_key(endpoint_url, params, user, passwd) - simple_key = f"{endpoint_url}-{params.get('studyGoldId', '')}-{user}-{passwd}" - - # Try to get directly from cache first - if gold_client.cache and key_data in gold_client.cache: - logging.info(f"[cache] Direct cache hit for {endpoint} {params}") - return gold_client.cache.get(key_data) - elif gold_client.cache and simple_key in gold_client.cache: - logging.info(f"[cache] Simple key cache hit for {endpoint} {params}") - return gold_client.cache.get(simple_key) - - # Fall back to original method if not in cache - logging.warning(f"[cache] Cache miss for {endpoint} {params}, making API call") - self.num_calls += 1 - - # Make API call directly rather than using memoize - attempt = 0 - while attempt < 4: - try: - results = requests.get( - endpoint_url, params=params, auth=HTTPBasicAuth(user, passwd) - ) - logging.info(f"STATUS={results.status_code}") - if results.status_code == 200: - data = results.json() - # Store in cache for future use - if gold_client.cache: - gold_client.cache.set(key_data, data) - gold_client.cache.set(simple_key, data) - return data - else: - logging.error( - f"API call to {endpoint_url} failed, code={results.status_code}; attempt={attempt} [pausing]" - ) - sleep(5 ** attempt) - except Exception as e: - logging.error(f"Exception during API call: {e}") - sleep(5 ** attempt) - - attempt += 1 - - raise Exception(f"API call to {endpoint_url} failed after {attempt} attempts") - - -@click.command() -@click.option('--authentication-file', '-a', default="config/gold-key.txt", - help="Path to GOLD API authentication file (user:pass)") -@click.option('--env-file', '-e', help="Path to environment file with MongoDB credentials") -@click.option('--log-failures-to-file', type=click.Path(writable=True), default=None, - help="Path to write failure logs as JSON") -@click.option('--max-retries', '-m', type=int, default=3, - help="Maximum number of retries for failed API calls") -@click.option('--mongo-uri', '-u', required=True, - help="MongoDB URI including database name") -@click.option('--purge-diskcache', '-P', is_flag=True, default=False, - help="Purge the disk cache before processing") -@click.option('--purge-mongodb', '-p', is_flag=True, default=False, - help="Purge the MongoDB collections before processing") -@click.option('--resume', '-r', is_flag=True, default=True, - help="Resume from last run, skipping already processed studies") -@click.option('--study-ids-file', '-i', type=click.Path(exists=True), required=True, - help="File containing study IDs to process") -@click.option('--cache-dir', default="cachedir", - help="Path to diskcache directory") -@click.option('--batch-size', type=int, default=50, - help="Number of studies to process in each batch") -@click.option('--progress-bar/--no-progress-bar', default=True, - help="Show progress bars for processing") -def main(study_ids_file: str, authentication_file: str, mongo_uri: Optional[str], - env_file: Optional[str], purge_mongodb: bool, purge_diskcache: bool, - resume: bool, max_retries: int, log_failures_to_file: Optional[str], - cache_dir: str, batch_size: int, progress_bar: bool) -> None: - """ - Load GOLD metadata into MongoDB using a local cache to minimize API calls. - - This script processes study IDs from a file, fetches their metadata from GOLD API - or a local cache, and stores the data in MongoDB collections. - """ - # Connect to MongoDB - mongo_creds = load_mongodb_credentials(env_file) - conn_str = build_mongodb_connection_string( - mongo_uri=mongo_uri, - user=mongo_creds.get('user'), - password=mongo_creds.get('password'), - ) - - logging.info("Connecting to MongoDB") - client = MongoClient(conn_str) - - try: - db_name = MongoClient(conn_str).get_default_database().name - db = client[db_name] - except Exception as e: - logging.error(f"Could not determine database name from URI: {e}") - return - - try: - client.admin.command('ping') - logging.info("MongoDB connection successful") - except Exception as e: - logging.error(f"MongoDB connection failed: {e}") - return - - # Set up MongoDB collections - if purge_mongodb: - logging.info("Purging MongoDB collections...") - db.drop_collection('biosamples') - db.drop_collection('studies') - db.drop_collection('projects') - db.drop_collection('seq_projects') - db.drop_collection('study_import_failures') - resume = False - - biosample_collection = db['biosamples'] - study_collection = db['studies'] - project_collection = db['seq_projects'] - failure_collection = db['study_import_failures'] - - # Create indices for faster lookups and uniqueness constraints - create_unique_index(biosample_collection, "biosampleGoldId", "biosampleGoldId_index") - create_unique_index(study_collection, "studyGoldId", "studyGoldId_index") - create_unique_index(project_collection, "projectGoldId", "projectGoldId_index") - create_unique_index(failure_collection, "studyGoldId", "failedStudy_index") - - # Initialize the cache for GoldClient - gold_client.set_cache_directory(cache_dir) - logging.info(f"Using disk cache directory: {cache_dir}") - - # Create enhanced GoldClient that checks cache directly - gc = EnhancedGoldClient() - - if purge_diskcache: - logging.info("Purging disk cache...") - gc.clear_cache() - - gc.load_key(authentication_file) - - # Process study IDs from file - study_ids = process_study_ids(study_ids_file) - total_studies = len(study_ids) - logging.info(f"Found {total_studies} studies to process") - - # Identify studies that have already been processed - processed_study_ids = set() - if resume: - processed_study_ids = get_processed_study_ids(db) - if processed_study_ids: - already_processed = len(processed_study_ids) - logging.info(f"Found {already_processed} studies already in MongoDB that will be skipped") - logging.info(f"Remaining studies to process: {total_studies - already_processed}") - - # Process studies in batches - completed = 0 - failed = 0 - skipped = 0 - failed_study_logs = [] - - # Determine number of batches - num_batches = math.ceil(total_studies / batch_size) - - # Process each batch - for batch_num in range(num_batches): - batch_start = batch_num * batch_size - batch_end = min(batch_start + batch_size, total_studies) - batch = study_ids[batch_start:batch_end] - - logging.info(f"Processing batch {batch_num + 1}/{num_batches} ({len(batch)} studies)") - - # Use tqdm for progress bar if enabled - iterator = tqdm(batch, desc=f"Batch {batch_num + 1}") if progress_bar else batch - - # Process each study in the batch - for study_id in iterator: - if resume and study_id in processed_study_ids: - logging.info(f"Skipping study {study_id} (already in MongoDB)") - skipped += 1 - continue - - logging.info(f"Processing study {study_id} ({completed + skipped + 1}/{total_studies})...") - - retry_count = 0 - success = False - - while retry_count <= max_retries and not success: - try: - # Fetch study data - study = gc.fetch_study(study_id) - if not study: - logging.warning(f"No data returned for study {study_id}, skipping") - failed += 1 - failure_doc = { - 'studyGoldId': study_id, - 'error': "No data returned (null response)", - 'timestamp': datetime.utcnow(), - 'failed': True - } - insert_document(failure_collection, failure_doc, study_id) - failed_study_logs.append(failure_doc) - break - - # Fetch and process biosamples - biosamples = gc.fetch_biosamples_by_study(study_id) - logging.info(f"Retrieved {len(biosamples)} biosamples for study {study_id}") - - # Store biosamples and their projects - biosample_ids = [] - for biosample in biosamples: - biosample_id = biosample.get('biosampleGoldId') - if biosample_id: - biosample_ids.append(biosample_id) - - # Extract projects before inserting biosample - projects = biosample.pop('projects', []) - - # Insert biosample - insert_document(biosample_collection, biosample, biosample_id) - - # Insert projects - for project in projects: - insert_document(project_collection, project, project.get('projectGoldId', 'Unknown')) - - # Add biosample IDs to study and insert into MongoDB - study['biosamples'] = biosample_ids - insert_document(study_collection, study, study_id) - - completed += 1 - success = True - - except Exception as e: - retry_count += 1 - if retry_count <= max_retries: - wait_time = 5 * retry_count - logging.warning( - f"Error processing study {study_id}: {e}. Retrying in {wait_time} seconds (attempt {retry_count}/{max_retries})") - sleep(wait_time) - else: - logging.error(f"Failed to process study {study_id} after {max_retries} attempts: {e}") - failed += 1 - failure_doc = { - 'studyGoldId': study_id, - 'error': str(e), - 'timestamp': datetime.utcnow(), - 'failed': True - } - insert_document(failure_collection, failure_doc, study_id) - failed_study_logs.append(failure_doc) - - # Report results - logging.info(f"Import completed: {completed} studies processed, {skipped} skipped, {failed} failed") - - # Write failure logs if requested - if log_failures_to_file and failed_study_logs: - with open(log_failures_to_file, 'w') as f: - json.dump(failed_study_logs, f, indent=2, default=str) - logging.info(f"Wrote {len(failed_study_logs)} failed study logs to {log_failures_to_file}") - - # Close connections - client.close() - logging.info("MongoDB connection closed") - - -if __name__ == "__main__": - main() diff --git a/sample_annotator/rebuild_gold_cache.py b/sample_annotator/rebuild_gold_cache.py deleted file mode 100644 index 165bad2..0000000 --- a/sample_annotator/rebuild_gold_cache.py +++ /dev/null @@ -1,202 +0,0 @@ -import hashlib -import logging -import os -import pickle -from tqdm import tqdm - -import click -import dotenv -from pymongo import MongoClient -from diskcache import Cache - -from sample_annotator.clients.gold_client import build_cache_key - -# Configure logging -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') - -API_BASE = "https://gold.jgi.doe.gov/rest/nmdc" - - -def load_mongodb_credentials(env_file: str) -> dict: - if env_file: - dotenv.load_dotenv(env_file) - return { - 'user': os.environ.get('MONGODB_USER'), - 'password': os.environ.get('MONGODB_PASSWORD') - } - - -def load_gold_key(path: str) -> tuple[str, str]: - with open(path) as f: - return tuple(f.read().strip().split(":")) - - -def normalize_id(id_str: str) -> str: - return id_str.replace("gold:", "") - - -def calculate_key_hash(key_data): - """Calculate the hash of a key for logging purposes.""" - key_bytes = pickle.dumps(key_data) - hashed_key = hashlib.sha256(key_bytes).hexdigest() - return hashed_key - - -@click.command() -@click.option('--mongo-uri', required=True, help='MongoDB URI including database name') -@click.option('--env-file', '-e', required=False, help='Path to .env file with MongoDB user/pass') -@click.option('--authentication-file', required=True, help='GOLD API key file (user:pass)') -@click.option('--cache-dir', default="cachedir", help='Path to diskcache directory (default: cachedir)') -@click.option('--debug', is_flag=True, help='Print extra debug information') -@click.option('--batch-size', default=1000, help='Batch size for processing biosamples') -def rebuild_gold_cache(mongo_uri: str, env_file: str, authentication_file: str, cache_dir: str, - debug: bool, batch_size: int): - """ - Rebuild diskcache entries for GoldClient by inserting MongoDB-stored responses - directly into the cache without calling the API again. - """ - mongo_user = mongo_password = None - if env_file: - mongo_creds = load_mongodb_credentials(env_file) - mongo_user = mongo_creds.get('user') - mongo_password = mongo_creds.get('password') - - # Connect to MongoDB - logging.info(f"Connecting to MongoDB: {mongo_uri}") - client = MongoClient(mongo_uri, username=mongo_user, password=mongo_password) - db = client.get_default_database() - - # Load GOLD API credentials - user, password = load_gold_key(authentication_file) - - # Initialize diskcache - cache = Cache(cache_dir) - - # Clear the cache first to avoid any issues with existing entries - logging.info(f"Clearing existing cache at {cache_dir}") - cache.clear() - - # Get counts for progress tracking - studies_count = db.studies.count_documents({}) - projects_count = db.seq_projects.count_documents({}) - biosamples_count = db.biosamples.count_documents({}) - - logging.info( - f"Found {studies_count} studies, {projects_count} projects, and {biosamples_count} biosamples in MongoDB") - - # -------------------------- - # Phase 1: Cache studies and projects - # -------------------------- - logging.info("Phase 1: Caching studies and projects") - - # Create lookup of biosample IDs to study IDs - logging.info("Building biosample to study mapping from projects...") - biosample_to_study = {} - for project in tqdm(db.seq_projects.find({}, {'studyGoldId': 1, 'biosampleGoldId': 1}), - total=projects_count, desc="Building biosample mapping"): - bs_id = project.get('biosampleGoldId') - study_id = project.get('studyGoldId') - if bs_id and study_id: - biosample_to_study[bs_id] = study_id - - logging.info(f"Created mapping for {len(biosample_to_study)} biosamples to their studies") - - # Find all study IDs - study_ids = [doc['studyGoldId'] for doc in db.studies.find({}, {'studyGoldId': 1, '_id': 0})] - - # Process each study - study_count = 0 - project_count = 0 - - for study_id in tqdm(study_ids, desc="Caching studies and projects"): - norm_study_id = normalize_id(study_id) - - # Get the full study document - study = db.studies.find_one({'studyGoldId': study_id}, {'_id': 0}) - if not study: - continue - - # Cache study data using both formats - study_key = build_cache_key(f"{API_BASE}/studies", {"studyGoldId": norm_study_id}, user, password) - cache.set(study_key, [study]) - - # Also cache with a simpler key format for the memoize decorator - simple_study_key = f"{API_BASE}/studies-{norm_study_id}-{user}-{password}" - cache.set(simple_study_key, [study]) - - study_count += 1 - - # Get and cache projects for this study - projects = list(db.seq_projects.find({'studyGoldId': study_id}, {'_id': 0})) - if projects: - projects_key = build_cache_key(f"{API_BASE}/projects", {"studyGoldId": norm_study_id}, user, password) - cache.set(projects_key, projects) - - # Also cache with simpler key format - simple_projects_key = f"{API_BASE}/projects-{norm_study_id}-{user}-{password}" - cache.set(simple_projects_key, projects) - - project_count += len(projects) - - if debug: - logging.info(f"Cached {len(projects)} projects for study {study_id}") - - logging.info(f"Cached {study_count} studies and {project_count} projects") - - # -------------------------- - # Phase 2: Cache biosamples by study - # -------------------------- - logging.info("Phase 2: Caching biosamples by study") - - # Create a dictionary to group biosamples by study - biosamples_by_study = {} - - # Process biosamples in batches to avoid memory issues - biosample_count = 0 - total_batches = (biosamples_count + batch_size - 1) // batch_size - - for batch_num in tqdm(range(total_batches), desc="Processing biosample batches"): - skip = batch_num * batch_size - - batch = list(db.biosamples.find({}, {'_id': 0}).skip(skip).limit(batch_size)) - - for biosample in batch: - bs_id = biosample.get('biosampleGoldId') - if not bs_id or bs_id not in biosample_to_study: - continue - - study_id = biosample_to_study[bs_id] - norm_study_id = normalize_id(study_id) - - if norm_study_id not in biosamples_by_study: - biosamples_by_study[norm_study_id] = [] - - biosamples_by_study[norm_study_id].append(biosample) - biosample_count += 1 - - # Cache the biosamples by study - logging.info(f"Caching {len(biosamples_by_study)} study-biosample groups") - - for study_id, biosamples in tqdm(biosamples_by_study.items(), desc="Caching biosamples by study"): - # Cache using both key formats - bs_key = build_cache_key(f"{API_BASE}/biosamples", {"studyGoldId": study_id}, user, password) - cache.set(bs_key, biosamples) - - simple_bs_key = f"{API_BASE}/biosamples-{study_id}-{user}-{password}" - cache.set(simple_bs_key, biosamples) - - if debug: - logging.info(f"Cached {len(biosamples)} biosamples for study {study_id}") - - # Report on cache size and contents - logging.info(f"Cache now contains {len(cache)} records") - logging.info(f" - {study_count} studies") - logging.info(f" - {project_count} projects") - logging.info(f" - {biosample_count} biosamples") - - cache.close() - logging.info(f"Cache closed and flushed to disk at {cache_dir}") - - -if __name__ == '__main__': - rebuild_gold_cache()