Skip to content

Commit

Permalink
Merge pull request #991 from microbiomedata/966-ingest-pooling
Browse files Browse the repository at this point in the history
  • Loading branch information
naglepuff authored Jul 18, 2023
2 parents 6edb5ea + 627c158 commit 3ce000c
Show file tree
Hide file tree
Showing 13 changed files with 275 additions and 46 deletions.
8 changes: 4 additions & 4 deletions nmdc_server/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,16 +326,16 @@ def construct_zip_file_path(data_object: models.DataObject) -> str:
# involves a complicated query... need a way to join that information
# in the original query (possibly in the sqlalchemy relationship)
omics_processing = data_object.omics_processing
biosample = cast(Optional[models.Biosample], omics_processing.biosample)
biosamples = cast(Optional[list[models.Biosample]], omics_processing.biosample_inputs)

def safe_name(name: str) -> str:
return name.replace("/", "_").replace("\\", "_").replace(":", "_")

op_name = safe_name(omics_processing.id)

if biosample is not None:
biosample_name = safe_name(biosample.id)
study = biosample.study
if biosamples:
biosample_name = ",".join([safe_name(biosample.id) for biosample in biosamples])
study = biosamples[0].study
else:
# Some emsl omics_processing are missing biosamples
biosample_name = "unknown"
Expand Down
7 changes: 5 additions & 2 deletions nmdc_server/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@
b'{MultiomicsValue.om.value:05b}'
end
)::integer as multiomics
from biosample b join omics_processing op on op.biosample_id = b.id
from biosample b
join biosample_input_association bia on bia.biosample_id = b.id
join omics_processing op on bia.omics_processing_id = op.id
group by b.id)
update biosample set multiomics = m.multiomics
from m
Expand All @@ -175,7 +177,8 @@
)::integer as multiomics
from study s
join biosample b on s.id = b.study_id
join omics_processing op on op.biosample_id = b.id
join biosample_input_association bia on b.id = bia.biosample_id
join omics_processing op on bia.omics_processing_id = op.id
group by s.id)
update study set multiomics = m.multiomics
from m
Expand Down
8 changes: 5 additions & 3 deletions nmdc_server/fakes.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from collections import OrderedDict
from datetime import datetime
from typing import Dict, Optional
from typing import Dict, List, Optional
from uuid import UUID, uuid4

from factory import Factory, Faker, SubFactory, lazy_attribute, post_generation
Expand Down Expand Up @@ -205,11 +205,13 @@ class Meta:

add_date = Faker("date_time")
mod_date = Faker("date_time")
biosample = SubFactory(BiosampleFactory)
biosample_inputs: List[models.Biosample] = []

@lazy_attribute
def study(self):
return self.biosample.study
if not self.biosample_inputs:
return None
return self.biosample_inputs[0].study


class DataObjectFactory(SQLAlchemyModelFactory):
Expand Down
8 changes: 2 additions & 6 deletions nmdc_server/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,7 @@ class BiosampleFilter(BaseFilter):

def join_omics_processing(self, query: Query) -> Query:
return self.join_self(
query.join(
models.Biosample, models.OmicsProcessing.biosample_id == models.Biosample.id
),
query.join(models.OmicsProcessing, models.Biosample.omics_processing),
Table.biosample,
)

Expand Down Expand Up @@ -243,9 +241,7 @@ def join_omics_processing(self, query: Query) -> Query:

def join_biosample(self, query: Query) -> Query:
return self.join_self(
query.join(
models.OmicsProcessing, models.Biosample.id == models.OmicsProcessing.biosample_id
),
query.join(models.Biosample, models.OmicsProcessing.biosample_inputs),
Table.omics_processing,
)

Expand Down
6 changes: 5 additions & 1 deletion nmdc_server/ingest/all.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ def load(db: Session, function_limit=None, skip_annotation=False):
db.commit()

logger.info("Loading omics processing...")
omics_processing.load(db, mongodb["omics_processing_set"].find())
omics_processing.load(
db,
mongodb["omics_processing_set"].find(),
mongodb,
)
db.commit()

logger.info("Loading metabolomics analysis...")
Expand Down
4 changes: 4 additions & 0 deletions nmdc_server/ingest/biosample.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ def extract_extras(cls, values):
lat, lon = values.pop("lat_lon")["has_raw_value"].split(" ")
values["latitude"] = float(lat)
values["longitude"] = float(lon)
else:
# Workaround/testing code -- not to be included in PR
values["latitude"] = 0.0
values["longitude"] = 0.0

return extract_extras(cls, values)

Expand Down
123 changes: 105 additions & 18 deletions nmdc_server/ingest/omics_processing.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import json
import re
from datetime import datetime
from typing import Any, Dict
from typing import Any, Dict, Optional

from pydantic import root_validator, validator
from pymongo.collection import Collection
from pymongo.cursor import Cursor
from pymongo.database import Database
from sqlalchemy.orm import Session

from nmdc_server import models
Expand All @@ -18,6 +20,31 @@
date_fmt = re.compile(r"\d\d-[A-Z]+-\d\d \d\d\.\d\d\.\d\d\.\d+ [AP]M")


process_types = [
"pooling",
"extraction",
"library_preparation",
]


collections = {
"biosample": "biosample_set",
"processed_sample": "processed_sample_set",
"extraction": "extraction_set",
"library_preparation": "library_preparation_set",
"pooling": "pooling_set",
}


omics_types = {
"metagenome": "Metagenome",
"metabolomics": "Metabolomics",
"proteomics": "Proteomics",
"metatranscriptome": "Metatranscriptome",
"organic matter characterization": "Organic Matter Characterization",
}


class OmicsProcessing(OmicsProcessingCreate):
_extract_value = validator("*", pre=True, allow_reuse=True)(extract_value)

Expand All @@ -32,28 +59,88 @@ def coerce_date(cls, v):
return v


omics_types = {
"metagenome": "Metagenome",
"metabolomics": "Metabolomics",
"proteomics": "Proteomics",
"metatranscriptome": "Metatranscriptome",
"organic matter characterization": "Organic Matter Characterization",
}


def load_omics_processing(db: Session, obj: Dict[str, Any]):
def is_biosample(object_id, biosample_collection):
return list(biosample_collection.find({"id": object_id}))


def find_parent_process(output_id: str, mongodb: Database) -> Optional[dict[str, Any]]:
"""Given a ProcessedSample ID, find the process (e.g. Extraction) that created it."""
output_found = False
collections_left = True
while not output_found and collections_left:
for name in process_types:
collection: Collection = mongodb[collections[name]]
query = collection.find({"has_output": output_id}, no_cursor_timeout=True)
result_list = list(query)
if len(result_list):
output_found = True
return result_list[0]
collections_left = False
return None


def get_biosample_input_ids(input_id: str, mongodb: Database, results: set) -> set[Any]:
"""
Given an input ID return all biosample objects that are included in the input resource.
OmicsProcessing objects can take Biosamples or ProcessedSamples as inputs. Work needs to be done
to determine which biosamples make up a given ProcessedSample. This function recursively tries
to determine those Biosamples.
"""
# Base case, the input is already a biosample
biosample_collection: Collection = mongodb["biosample_set"]
processed_sample_collection: Collection = mongodb["processed_sample_set"]
if is_biosample(input_id, biosample_collection):
results.add(input_id)
return results

# The given input is not a Biosample or Processed sample. Stop here.
# Maybe this should report an error?
query = list(processed_sample_collection.find({"id": input_id}, no_cursor_timeout=True))
if not query:
return results

processed_sample_id = query[0]["id"]

# Recursive case. For processed samples find the process that created it,
# and check the inputs of that process.
parent_process = find_parent_process(processed_sample_id, mongodb)
if parent_process:
for parent_input_id in parent_process["has_input"]:
get_biosample_input_ids(parent_input_id, mongodb, results)
return results


def load_omics_processing(db: Session, obj: Dict[str, Any], mongodb: Database, logger):
logger = get_logger(__name__)
obj["biosample_id"] = obj.pop("has_input", [None])[0]
input_ids: list[str] = obj.pop("has_input", [""])
biosample_input_ids: set[str] = set()
for input_id in input_ids:
biosample_input_ids.union(get_biosample_input_ids(input_id, mongodb, biosample_input_ids))
if len(biosample_input_ids) > 1:
logger.error("Processed sample input detected")
logger.error(obj["id"])
logger.error(biosample_input_ids)

obj["biosample_inputs"] = []
biosample_input_objects = []
for biosample_id in biosample_input_ids:
biosample_object = db.query(models.Biosample).get(biosample_id)
if not biosample_object:
logger.warn(f"Unknown biosample {biosample_id}")
missing_["biosample"].add(biosample_id)
else:
biosample_input_objects.append(biosample_object)

data_objects = obj.pop("has_output", [])
obj["study_id"] = obj.pop("part_of", [None])[0]
raw_omics_type: str = obj["omics_type"]["has_raw_value"]
obj["omics_type"]["has_raw_value"] = omics_types[raw_omics_type.lower()]

if obj["biosample_id"] and db.query(models.Biosample).get(obj["biosample_id"]) is None:
logger.warn(f"Unknown biosample {obj['biosample_id']}")
missing_["biosample"].add(obj.pop("biosample_id"))

omics_processing = models.OmicsProcessing(**OmicsProcessing(**obj).dict())
for biosample_object in biosample_input_objects:
# mypy thinks that omics_processing.biosample_inputs is of type Biosample
omics_processing.biosample_inputs.append(biosample_object) # type: ignore

for data_object_id in data_objects:
data_object = db.query(models.DataObject).get(data_object_id)
Expand All @@ -73,11 +160,11 @@ def load_omics_processing(db: Session, obj: Dict[str, Any]):
db.add(omics_processing)


def load(db: Session, cursor: Cursor):
def load(db: Session, cursor: Cursor, mongodb: Database):
logger = get_logger(__name__)
for obj in cursor:
try:
load_omics_processing(db, obj)
load_omics_processing(db, obj, mongodb, logger)
except Exception as err:
logger.error(err)
logger.error("Error parsing omics_processing:")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""Add many-to-many relationship between biosample and omics_processing
Revision ID: af8b2e3c91b2
Revises: dcc0a41b60af
Create Date: 2023-07-10 18:18:46.785564
"""
from typing import Optional

import sqlalchemy as sa
from alembic import op
from sqlalchemy import Column, ForeignKey, String, Table
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session, relationship

Base = declarative_base()

# revision identifiers, used by Alembic.
revision: str = "af8b2e3c91b2"
down_revision: Optional[str] = "dcc0a41b60af"
branch_labels: Optional[str] = None
depends_on: Optional[str] = None


class Biosample(Base):
__tablename__ = "biosample"
id = Column(String, primary_key=True)


class OmicsProcessing(Base):
__tablename__ = "omics_processing"
id = Column(String, primary_key=True)
biosample_id = Column(String, ForeignKey("biosample.id"), nullable=True)
biosample = relationship("Biosample", backref="omics_processing")


biosample_input_association = Table(
"biosample_input_association",
Base.metadata,
Column("biosample_id", String),
Column("omics_processing_id", String),
)


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"biosample_input_association",
sa.Column("omics_processing_id", sa.String(), nullable=False),
sa.Column("biosample_id", sa.String(), nullable=False),
sa.ForeignKeyConstraint(
["biosample_id"],
["biosample.id"],
name=op.f("fk_biosample_input_association_biosample_id_biosample"),
),
sa.ForeignKeyConstraint(
["omics_processing_id"],
["omics_processing.id"],
name=op.f("fk_biosample_input_association_omics_processing_id_omics_processing"),
),
sa.PrimaryKeyConstraint(
"omics_processing_id", "biosample_id", name=op.f("pk_biosample_input_association")
),
)

session = Session(bind=op.get_bind())
mappings = []
for omics_processing in session.query(OmicsProcessing):
if omics_processing.biosample_id:
mappings.append(
{
"biosample_id": omics_processing.biosample_id,
"omics_processing_id": omics_processing.id,
}
)
op.bulk_insert(biosample_input_association, mappings)

op.drop_constraint(
"fk_omics_processing_biosample_id_biosample", "omics_processing", type_="foreignkey"
)
op.drop_column("omics_processing", "biosample_id")
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"omics_processing",
sa.Column("biosample_id", sa.VARCHAR(), autoincrement=False, nullable=True),
)
op.create_foreign_key(
"fk_omics_processing_biosample_id_biosample",
"omics_processing",
"biosample",
["biosample_id"],
["id"],
)
op.drop_table("biosample_input_association")
# ### end Alembic commands ###
Loading

0 comments on commit 3ce000c

Please sign in to comment.