Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -166,5 +166,6 @@ cython_debug/
#.idea/


#Ignore vscode AI rules
# codacy stuff
.github/instructions/codacy.instructions.md
.codacy
Empty file.
159 changes: 159 additions & 0 deletions src/cdm_data_loader_utils/parsers/uniprot/idmapping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
"""Parser for UniProt ID Mapping file.

UniProt provides comprehensive mappings between their protein IDs and many other databases. Mappings are extracted from the UniProt records where possible.

Source file: https://ftp.uniprot.org/pub/databases/uniprot/current_release/knowledgebase/idmapping/idmapping.dat.gz

Legacy mappings (pre-UniProt proteome redundancy reduction drive): https://ftp.uniprot.org/pub/databases/uniprot/current_release/knowledgebase/idmapping/idmapping.dat.2015_03.gz

Docs: https://ftp.uniprot.org/pub/databases/uniprot/current_release/knowledgebase/idmapping/README

Metalink file: https://ftp.uniprot.org/pub/databases/uniprot/current_release/knowledgebase/idmapping/RELEASE.metalink

Retrieve the list of databases referenced from the UniProt API: https://rest.uniprot.org/database/stream?format=json&query=%28*%29

1) idmapping.dat
This file has three columns, delimited by tab:
1. UniProtKB-AC
2. ID_type
3. ID
where ID_type is the database name as appearing in UniProtKB cross-references,
and as supported by the ID mapping tool on the UniProt web site,
http://www.uniprot.org/mapping and where ID is the identifier in
that cross-referenced database.
"""

import datetime
from uuid import uuid4

import click
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as sf
from pyspark.sql.types import StringType, StructField

from cdm_data_loader_utils.core.constants import INVALID_DATA_FIELD_NAME
from cdm_data_loader_utils.core.pipeline_run import PipelineRun
from cdm_data_loader_utils.readers.dsv import read
from cdm_data_loader_utils.utils.cdm_logger import get_cdm_logger
from cdm_data_loader_utils.utils.minio import list_remote_dir_contents
from cdm_data_loader_utils.utils.spark_delta import APPEND, set_up_workspace, write_delta
from cdm_data_loader_utils.validation.dataframe_validator import DataFrameValidator, Validator
from cdm_data_loader_utils.validation.df_nullable_fields import validate as check_nullable_fields

APP_NAME = "uniprot_idmapping"
NOW = datetime.datetime.now(tz=datetime.UTC)
DB = "db"
XREF = "xref"
ID = "id"
COLUMNS = [ID, DB, XREF]


logger = get_cdm_logger()

ID_MAPPING_SCHEMA = [StructField(n, StringType(), nullable=False) for n in COLUMNS]


def ingest(spark: SparkSession, run: PipelineRun, id_mapping_tsv: str) -> DataFrame:
"""Parse the ID mapping file and convert it to a dataframe.

:param spark: spark sesh
:type spark: SparkSession
:param id_mapping_tsv: path to the ID mapping tsv file
:type id_mapping_tsv: str
:return: dataframe containing the ID mapping stuff
:rtype: DataFrame
"""
options = {
"delimiter": "\t",
"header": False,
"ignoreLeadingWhiteSpace": True,
"ignoreTrailingWhiteSpace": True,
"enforceSchema": True,
"inferSchema": False,
}

df = read(spark, id_mapping_tsv, ID_MAPPING_SCHEMA, options)
id_map_parse_result = DataFrameValidator(spark).validate_dataframe(
data_to_validate=df,
schema=ID_MAPPING_SCHEMA,
run=run,
validator=Validator(check_nullable_fields, {"invalid_col": INVALID_DATA_FIELD_NAME}),
invalid_col=INVALID_DATA_FIELD_NAME,
)
id_map_df = id_map_parse_result.valid_df

# destination format:
# "entity_id", "identifier", "description", "source", "relationship"
return id_map_df.select(
# prefix with UniProt
sf.concat(sf.lit("UniProt:"), sf.col("id")).alias("uniprot_id"),
sf.col(DB),
sf.col(XREF),
sf.lit(None).cast("string").alias("description"),
sf.lit("UniProt ID mapping").alias("source"),
sf.lit(None).cast("string").alias("relationship"),
)


def read_and_write(spark: SparkSession, pipeline_run: PipelineRun, id_mapping_tsv: str) -> None:
"""Read in the UniProt ID mapping and write it out as a uniprot_identifier table.

:param spark: spark sesh
:type spark: SparkSession
:param delta_ns: namespace to write to
:type delta_ns: str
:param id_mapping_tsv: path to the ID mapping file
:type id_mapping_tsv: str
:param mode: write mode (append or overwrite)
:type mode: str
"""
# get the metalink XML and retrieve data source info
write_delta(
spark, ingest(spark, pipeline_run, id_mapping_tsv), pipeline_run.namespace, "uniprot_identifier", APPEND
)


@click.command()
@click.option(
"--source",
required=True,
help="Full path to the source directory containing ID mapping file(s). Does not need to specify the Bucket (i.e. cdm-lake) but should specify everything else.",
)
@click.option(
"--output-dir",
required=True,
help="Output directory for Delta tables; should be relative to the user/tenant warehouse. It will contain the output data in a directory named <namespace>.db.",
)
@click.option(
"--namespace",
default="uniprot",
show_default=True,
help="Delta Lake database name",
)
@click.option(
"--tenant-name",
default=None,
help="Tenant warehouse to save processed data to; defaults to saving data to the user warehouse if a tenant is not specified",
)
def main(source: str, output_dir: str, namespace: str, tenant_name: str | None) -> None:
"""Run the UniProt ID Mapping importer.

:param source: full path to the source directory containing ID mapping file(s)
:type source: str
:param output_dir: Output directory for Delta tables; should be relative to the user/tenant warehouse. It will contain the output data in a directory named <namespace>.db.
:type output_dir: str
:param namespace: Delta Lake database name
:type namespace: str
:param tenant_name: Tenant warehouse to save processed data to; defaults to saving data to the user warehouse if a tenant is not specified
:type tenant_name: str | None
"""
(spark, delta_ns) = set_up_workspace(APP_NAME, f"{output_dir.removesuffix('/')}/{namespace}", tenant_name)
for file in list_remote_dir_contents(source):
# file names are in the 'Key' value
# 'tenant-general-warehouse/kbase/datasets/uniprot/id_mapping/id_mapping_part_001.tsv.gz'
pipeline_run = PipelineRun(str(uuid4()), APP_NAME, file["Key"], delta_ns)
read_and_write(spark, pipeline_run, file["Key"])


if __name__ == "__main__":
main()
97 changes: 97 additions & 0 deletions src/cdm_data_loader_utils/parsers/uniprot/metalink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""Parser for UniProt metalink XML files.

These metadata files provide information and links for UniProt and related downloads.


"""

import datetime
from pathlib import Path
from typing import Any
from xml.etree.ElementTree import Element

from defusedxml.ElementTree import parse

from cdm_data_loader_utils.utils.cdm_logger import get_cdm_logger, log_and_die

NS = {"": "http://www.metalinker.org/"}
NOW = datetime.datetime.now(tz=datetime.UTC)
COLUMNS = ["id", "db", "xref"]

logger = get_cdm_logger()


def parse_metalink(metalink_xml_path: Path | str) -> Element | None:
"""Parse the metalink file and return the root node."""
document = parse(str(metalink_xml_path))
root = document.getroot()
if root is not None:
return root

return log_and_die("Could not find root for metalink file", RuntimeError)


def generate_data_source_table(metalink_xml_path: Path | str) -> dict[str, Any]:
"""Generate the data source information for the ID Mapping data."""
root = parse_metalink(metalink_xml_path)
if root is None:
return {}

data_source = {
"license": root.findtext("./license/name", namespaces=NS),
"publisher": root.findtext("./publisher/name", namespaces=NS),
"resource_type": "dataset",
"version": root.findtext("./version", namespaces=NS),
}
missing = [k for k in data_source if not data_source[k]]
if missing:
log_and_die(
f"Missing required elements from metalink file: {', '.join(missing)}",
RuntimeError,
)

return data_source


def get_files(metalink_xml_path: Path | str, files_to_find: list[str] | None = None) -> dict[str, Any]:
"""Generate the data source information for the ID Mapping data."""
root = parse_metalink(metalink_xml_path)
if root is None:
return {}

if files_to_find is not None and files_to_find == []:
logger.warning("Empty file list supplied to get_files: aborting.")
return {}

files = {}
for f in root.findall("./files/file", NS):
# get the name, size, any verification info
name = f.get("name")
# skip now if the file is not of interest
if files_to_find and name not in files_to_find:
continue

size = f.findtext("./size", namespaces=NS)
checksum = f.find("./verification/hash", NS)
if checksum is not None:
checksum_fn = checksum.get("type")
checksum_value = checksum.text
else:
checksum_fn = checksum_value = None
dl_url = f.findtext("./resources/url[@location='us']", namespaces=NS)
files[name] = {
"name": name,
"size": size,
"checksum": checksum_value,
"checksum_fn": checksum_fn,
"url": dl_url,
}

# report on unfound files
if files_to_find:
not_found = {f for f in files_to_find if f not in files}
if not_found:
msg = "The following files were not found: " + ", ".join(sorted(not_found))
logger.warning(msg)

return files
105 changes: 105 additions & 0 deletions src/cdm_data_loader_utils/parsers/uniprot/relnotes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""Parser for UniProt release notes."""

# The UniProt consortium European Bioinformatics Institute (EBI), SIB Swiss
# Institute of Bioinformatics and Protein Information Resource (PIR),
# is pleased to announce UniProt Knowledgebase (UniProtKB) Release
# 2025_03 (18-Jun-2025). UniProt (Universal Protein Resource) is a
# comprehensive catalog of information on proteins.

# UniProtKB Release 2025_03 consists of 253,635,358 entries (UniProtKB/Swiss-Prot:
# 573,661 entries and UniProtKB/TrEMBL: 253,061,697 entries)
# UniRef100 Release 2025_03 consists of 465,330,530 entries
# UniRef90 Release 2025_03 consists of 208,005,650 entries
# UniRef50 Release 2025_03 consists of 70,198,728 entries
# UniParc Release 2025_03 consists of 982,121,738 entries, where 915,805,719 are active and 66,316,019 inactive
# UniProt databases can be accessed from the web at http://www.uniprot.org and
# downloaded from http://www.uniprot.org/downloads. Detailed release
# statistics for TrEMBL and Swiss-Prot sections of the UniProt Knowledgebase
# can be viewed at http://www.ebi.ac.uk/uniprot/TrEMBLstats/ and
# http://web.expasy.org/docs/relnotes/relstat.html respectively.

import datetime as dt
import re
from pathlib import Path
from typing import Any

from cdm_data_loader_utils.utils.cdm_logger import log_and_die

RELEASE_VERSION_DATE: re.Pattern[str] = re.compile(
r"is pleased to announce UniProt Knowledgebase \(UniProtKB\) Release\s+(\w+) \((\d{1,2}-[a-zA-Z]+-\d{4})\)\."
)

UNIPROT_TREMBL_STATS: re.Pattern[str] = re.compile(
r"UniProtKB Release \w+ consists of ([\d,]+) entries \(UniProtKB/Swiss-Prot:\n([\d,]+) entries and UniProtKB/TrEMBL: ([\d,]+) entries\)"
)

RELEASE_STATS: re.Pattern[str] = re.compile(r"(\w+) Release .*? consists of ([\d,]+) entries")

DATE_FORMAT = "%d-%b-%Y"


def parse_relnotes(relnotes_path: Path) -> dict[str, Any]:
"""Open and read the release notes file, returning it as a text string.

:param relnotes_path: path to the release notes file
:type relnotes_path: Path
:return: string
:rtype: str
"""
rel_text = relnotes_path.read_text()
return parse(rel_text)


def parse(relnotes: str) -> dict[str, Any]:
"""Parse the release notes for a UniProt release.

:param relnotes: contents of the release notes file as a string
:type relnotes: str
:return: key-value pairs with vital release stats
:rtype: dict[str, Any]
"""
errors = []
stats = {}

relnotes_parts = relnotes.strip().split("\n\n", 1)
if len(relnotes_parts) != 2:
log_and_die("Could not find double line break. Relnotes file format may have changed", RuntimeError)

(intro_str, stats_str) = relnotes_parts

# remove line breaks for ease of parsing
intro_str = intro_str.replace("\n", " ")

rv = re.search(RELEASE_VERSION_DATE, intro_str)
if not rv:
errors.append("Could not find text matching the release version date regex.")
else:
stats["version"] = rv.groups()[0]
stats["date_published"] = dt.datetime.strptime(rv.groups()[1], DATE_FORMAT) # noqa: DTZ007

# parse the stats section
uniprot_trembl = re.search(UNIPROT_TREMBL_STATS, stats_str)
if not uniprot_trembl:
errors.append("Could not find text matching the UniProt/TrEMBL stats regex.")
else:
numbers = uniprot_trembl.groups()
stats["UniProtKB"] = numbers[0]
stats["UniProtKB/Swiss-Prot"] = numbers[1]
stats["UniProtKB/TrEMBL"] = numbers[2]

all_releases = re.findall(RELEASE_STATS, stats_str)
if not all_releases:
errors.append("Could not find text matching the release stats regex.")
else:
for release in all_releases:
(db, number) = release
if db not in stats:
stats[db] = number

# make sure that we have all the UniRef stats
errors.extend([f"No stats for UniRef{n} found." for n in ["50", "90", "100"] if f"UniRef{n}" not in stats])

if errors:
log_and_die("\n".join(errors), RuntimeError)

return stats
Loading
Loading