Skip to content

Commit

Permalink
Merge branch 'main' into berkeley-schema-migration
Browse files Browse the repository at this point in the history
  • Loading branch information
naglepuff committed Sep 26, 2024
2 parents ada2d2e + ce04d05 commit 9af4a50
Show file tree
Hide file tree
Showing 15 changed files with 493 additions and 60 deletions.
6 changes: 6 additions & 0 deletions nmdc_server/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,12 @@ async def update_submission(
submission.metadata_submission = (
submission.metadata_submission | body_dict["metadata_submission"]
)
# TODO: remove the child properties "studyName" and "templates" in favor of the top-
# level property. Requires some coordination between this API and its clients.
if "studyForm" in body_dict["metadata_submission"]:
submission.study_name = body_dict["metadata_submission"]["studyForm"]["studyName"]
if "templates" in body_dict["metadata_submission"]:
submission.templates = body_dict["metadata_submission"]["templates"]
# Update permissions and status iff the user is an "owner"
if current_user_role and current_user_role.role == models.SubmissionEditorRole.owner:
new_permissions = body_dict.get("permissions", None)
Expand Down
155 changes: 145 additions & 10 deletions nmdc_server/ingest/kegg.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,40 @@
import csv
from pathlib import Path
from typing import Dict
from typing import Dict, List, Union

import requests
from sqlalchemy.orm import Session

from nmdc_server.ingest.errors import errors
from nmdc_server.models import KoTermText, KoTermToModule, KoTermToPathway
from nmdc_server.models import (
CogTermToFunction,
CogTermToPathway,
KoTermText,
KoTermToModule,
KoTermToPathway,
PfamEntryToClan,
)

ORTHOLOGY_URL = "https://www.genome.jp/kegg-bin/download_htext?htext=ko00001&format=json"
MODULE_URL = "https://www.genome.jp/kegg-bin/download_htext?htext=ko00002&format=json"

# Expect that this file is mounted from CORI /global/cfs/cdirs/m3408/kegg_pathway.tab.txt
PATHWAY_FILE = "/data/ingest/kegg/kegg_pathway.tab.txt"

# Ingest COG terms, pathways, and functions with these files
COG_FUNCTION_DEFS = "/data/ingest/cog/fun-20.tab"

# Note that we're using the same file for both COG terms and pathways
COG_PATHWAY_DEFS = COG_TERM_DEFS = "/data/ingest/cog/cog-20.def.tab"

PFAM_TERM_DEFS = PFAM_CLAN_DEFS = "/data/ingest/pfam/Pfam-A.clans.tsv"


def load(db: Session) -> None:
ingest_ko_search(db)
ingest_ko_module_map(db)
ingest_ko_pathway_map(db)
ingest_pfam_clan_map(db)


def ingest_ko_search(db: Session) -> None:
Expand All @@ -28,6 +44,86 @@ def ingest_ko_search(db: Session) -> None:
db.commit()


def get_search_records_from_delimeted_file(
file,
term_key,
text_key,
records,
delimeter="\t",
fallback_text_key=None,
fieldnames=None,
):
"""
Given a delimeted file containing term, pathway, module, etc. itentifiers and
the corresponding text, append the pairs of identifiers and texts to a dictionary of
records to be made into database objects for the ko_term_text table.
"""
try:
with open(file) as fd:
for row in csv.DictReader(fd, fieldnames=fieldnames, delimiter=delimeter):
if not row[term_key]:
continue
if fallback_text_key:
records[row[term_key]] = row[text_key] or row[fallback_text_key]
else:
records[row[term_key]] = row[text_key]
except FileNotFoundError:
errors["kegg_search"].add(f"Missing {file}")


cog_def_headers = [
"cog_id",
"cog_functional_category",
"cog_name",
"gene",
"pathway",
"pubmed_id",
"pdb_id",
]
pfam_headers = [
"pfam_accession",
"clan_accession",
"clan_name",
"pfam_short_name",
"pfam_name",
]

cog_function_headers = ["function_code", "sequence", "definition"]

delimeted_files: Dict[str, Dict[str, Union[str, List[str]]]] = {
PATHWAY_FILE: {
"term_key": "image_id",
"text_key": "title",
"fallback_text_key": "pathway_name",
},
COG_FUNCTION_DEFS: {
"fieldnames": cog_function_headers,
"term_key": cog_function_headers[0],
"text_key": cog_function_headers[2],
},
COG_PATHWAY_DEFS: {
"fieldnames": cog_def_headers,
"term_key": cog_def_headers[4],
"text_key": cog_def_headers[4],
},
COG_TERM_DEFS: {
"fieldnames": cog_def_headers,
"term_key": cog_def_headers[0],
"text_key": cog_def_headers[2],
},
PFAM_TERM_DEFS: {
"fieldnames": pfam_headers,
"term_key": "pfam_accession",
"text_key": "pfam_name",
},
PFAM_CLAN_DEFS: {
"fieldnames": pfam_headers,
"term_key": "clan_accession",
"text_key": "clan_name",
},
}


def get_search_records():
records: Dict[str, str] = {}

Expand All @@ -44,18 +140,20 @@ def ingest_tree(node: dict) -> None:
req.raise_for_status()
ingest_tree(req.json())

for file in [PATHWAY_FILE]:
try:
with open(file) as fd:
for row in csv.DictReader(fd, delimiter="\t"):
records[row["image_id"]] = row["title"] or row["pathway_name"]
except FileNotFoundError:
errors["kegg_search"].add(f"Missing {file}")

for file, keys in delimeted_files.items():
get_search_records_from_delimeted_file(
file,
keys["term_key"],
keys["text_key"],
records,
fallback_text_key=keys.get("fallback_text_key", None),
fieldnames=keys.get("fieldnames", None),
)
return records


def ingest_ko_module_map(db: Session) -> None:
"""Ingest a mapping of KEGG modules to terms and COG functions to terms."""
db.execute(f"truncate table {KoTermToModule.__tablename__}")

datafile = Path(__file__).parent / "data" / "kegg_module_ko_terms.tab.txt"
Expand All @@ -66,8 +164,25 @@ def ingest_ko_module_map(db: Session) -> None:
)
db.commit()

with open(COG_TERM_DEFS) as fd:
reader = csv.DictReader(fd, fieldnames=cog_def_headers, delimiter="\t")
records = []
for row in reader:
function_count = len(row.get("cog_functional_category", ""))
if function_count > 1:
for char in list(row["cog_functional_category"]):
records.append((row["cog_id"], char))
elif function_count == 1:
records.append((row["cog_id"], row["cog_functional_category"]))

db.bulk_save_objects(
[CogTermToFunction(term=record[0], function=record[1]) for record in records]
)
db.commit()


def ingest_ko_pathway_map(db: Session) -> None:
"""Ingest a mapping of KEGG pathways to terms and COG pathways to COG terms."""
db.execute(f"truncate table {KoTermToPathway.__tablename__}")

datafile = Path(__file__).parent / "data" / "ko_term_pathways.tab.txt"
Expand All @@ -77,3 +192,23 @@ def ingest_ko_pathway_map(db: Session) -> None:
[KoTermToPathway(term=row["KO_id"], pathway=row["image_id"]) for row in reader]
)
db.commit()

with open(COG_TERM_DEFS) as fd:
reader = csv.DictReader(fd, fieldnames=cog_def_headers, delimiter="\t")
mappings = set([(row["cog_id"], row["pathway"]) for row in reader])
db.bulk_save_objects(
[CogTermToPathway(term=mapping[0], pathway=mapping[1]) for mapping in mappings]
)
db.commit()


def ingest_pfam_clan_map(db: Session) -> None:
"""Ingest a mapping of Pfam entries to clans"""
db.execute(f"truncate table {PfamEntryToClan.__tablename__}")
with open(PFAM_CLAN_DEFS) as fd:
reader = csv.DictReader(fd, fieldnames=pfam_headers, delimiter="\t")
mappings = set([(row[pfam_headers[0]], row[pfam_headers[1]]) for row in reader])
db.bulk_save_objects(
[PfamEntryToClan(entry=mapping[0], clan=mapping[1]) for mapping in mappings]
)
db.commit()
8 changes: 4 additions & 4 deletions nmdc_server/ingest/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

DataObjectList = List[str]
LoadObjectReturn = models.PipelineStep
ko_regex = re.compile(r"^KEGG\.ORTHOLOGY")
gene_regex = re.compile(r"^(KEGG\.ORTHOLOGY|COG|PFAM)")


class LoadObject(Protocol):
Expand All @@ -33,7 +33,7 @@ def load_mg_annotation(db: Session, obj: Dict[str, Any], **kwargs) -> LoadObject
{
"metagenome_annotation_id": pipeline.id,
"gene_function_id": {
"$regex": ko_regex,
"$regex": gene_regex,
},
},
no_cursor_timeout=True,
Expand Down Expand Up @@ -90,7 +90,7 @@ def load_mp_analysis(db: Session, obj: Dict[str, Any], **kwargs) -> LoadObjectRe
{
"metaproteomic_analysis_id": pipeline.id,
"gene_function_id": {
"$regex": ko_regex,
"$regex": gene_regex,
},
"best_protein": True,
},
Expand Down Expand Up @@ -141,7 +141,7 @@ def load_mt_annotation(db: Session, obj: Dict[str, Any], **kwargs) -> LoadObject
{
"metagenome_annotation_id": pipeline.id,
"gene_function_id": {
"$regex": ko_regex,
"$regex": gene_regex,
},
},
no_cursor_timeout=True,
Expand Down
10 changes: 6 additions & 4 deletions nmdc_server/ingest/study.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ def get_or_create_pi(db: Session, name: str, url: Optional[str], orcid: Optional

image_data = None
if url:
r = requests.get(url)
if r.ok:
image_data = r.content
try:
r = requests.get(url)
r.raise_for_status()
except Exception as e:
logger.error(f"Failed to download image for {name} from {url} : {e}")
else:
logger.error(f"Failed to download image for {name} from {url} : {r.status_code}")
image_data = r.content

pi = PrincipalInvestigator(name=name, image=image_data, orcid=orcid)

Expand Down
50 changes: 50 additions & 0 deletions nmdc_server/migrations/versions/317274ad8137_add_cog_mappings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""Add COG mappings
Revision ID: 317274ad8137
Revises: 5403c7fe0b33
Create Date: 2024-08-30 20:13:12.480955
"""

from typing import Optional

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision: str = "317274ad8137"
down_revision: Optional[str] = "5403c7fe0b33"
branch_labels: Optional[str] = None
depends_on: Optional[str] = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"cog_term_to_function",
sa.Column("term", sa.String(), nullable=False),
sa.Column("function", sa.String(), nullable=False),
sa.PrimaryKeyConstraint("term", "function", name=op.f("pk_cog_term_to_function")),
)
op.create_index(
op.f("ix_cog_term_to_function_function"), "cog_term_to_function", ["function"], unique=False
)
op.create_table(
"cog_term_to_pathway",
sa.Column("term", sa.String(), nullable=False),
sa.Column("pathway", sa.String(), nullable=False),
sa.PrimaryKeyConstraint("term", "pathway", name=op.f("pk_cog_term_to_pathway")),
)
op.create_index(
op.f("ix_cog_term_to_pathway_pathway"), "cog_term_to_pathway", ["pathway"], unique=False
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f("ix_cog_term_to_pathway_pathway"), table_name="cog_term_to_pathway")
op.drop_table("cog_term_to_pathway")
op.drop_index(op.f("ix_cog_term_to_function_function"), table_name="cog_term_to_function")
op.drop_table("cog_term_to_function")
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""Migrate Study names
Revision ID: 5403c7fe0b33
Revises: 0ff690fb929d
Create Date: 2024-09-09 18:50:11.771035
"""

from typing import Optional

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.sql import column, table

# revision identifiers, used by Alembic.
revision: str = "5403c7fe0b33"
down_revision: Optional[str] = "0ff690fb929d"
branch_labels: Optional[str] = None
depends_on: Optional[str] = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
submission_metadata = table(
"submission_metadata",
column("id", sa.String),
column("metadata_submission", JSONB),
column("study_name", sa.String),
)

connection = op.get_bind()
submissions = connection.execute(
sa.select([submission_metadata.c.id, submission_metadata.c.metadata_submission])
)

for submission in submissions:
study_name = submission.metadata_submission["studyForm"].get("studyName")
if study_name:
connection.execute(
submission_metadata.update()
.where(submission_metadata.c.id == submission.id)
.values(study_name=study_name)
)
pass
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
pass
# ### end Alembic commands ###
Loading

0 comments on commit 9af4a50

Please sign in to comment.