diff --git a/nmdc_server/crud.py b/nmdc_server/crud.py index 68304581..b929dc7a 100644 --- a/nmdc_server/crud.py +++ b/nmdc_server/crud.py @@ -326,16 +326,16 @@ def construct_zip_file_path(data_object: models.DataObject) -> str: # involves a complicated query... need a way to join that information # in the original query (possibly in the sqlalchemy relationship) omics_processing = data_object.omics_processing - biosample = cast(Optional[models.Biosample], omics_processing.biosample) + biosamples = cast(Optional[list[models.Biosample]], omics_processing.biosample_inputs) def safe_name(name: str) -> str: return name.replace("/", "_").replace("\\", "_").replace(":", "_") op_name = safe_name(omics_processing.id) - if biosample is not None: - biosample_name = safe_name(biosample.id) - study = biosample.study + if biosamples: + biosample_name = ",".join([safe_name(biosample.id) for biosample in biosamples]) + study = biosamples[0].study else: # Some emsl omics_processing are missing biosamples biosample_name = "unknown" diff --git a/nmdc_server/database.py b/nmdc_server/database.py index 4d07f891..7a49cad6 100644 --- a/nmdc_server/database.py +++ b/nmdc_server/database.py @@ -151,7 +151,9 @@ b'{MultiomicsValue.om.value:05b}' end )::integer as multiomics -from biosample b join omics_processing op on op.biosample_id = b.id +from biosample b + join biosample_input_association bia on bia.biosample_id = b.id + join omics_processing op on bia.omics_processing_id = op.id group by b.id) update biosample set multiomics = m.multiomics from m @@ -175,7 +177,8 @@ )::integer as multiomics from study s join biosample b on s.id = b.study_id - join omics_processing op on op.biosample_id = b.id + join biosample_input_association bia on b.id = bia.biosample_id + join omics_processing op on bia.omics_processing_id = op.id group by s.id) update study set multiomics = m.multiomics from m diff --git a/nmdc_server/fakes.py b/nmdc_server/fakes.py index caa85b58..22fede1c 100644 --- a/nmdc_server/fakes.py +++ b/nmdc_server/fakes.py @@ -1,6 +1,6 @@ from collections import OrderedDict from datetime import datetime -from typing import Dict, Optional +from typing import Dict, List, Optional from uuid import UUID, uuid4 from factory import Factory, Faker, SubFactory, lazy_attribute, post_generation @@ -205,11 +205,13 @@ class Meta: add_date = Faker("date_time") mod_date = Faker("date_time") - biosample = SubFactory(BiosampleFactory) + biosample_inputs: List[models.Biosample] = [] @lazy_attribute def study(self): - return self.biosample.study + if not self.biosample_inputs: + return None + return self.biosample_inputs[0].study class DataObjectFactory(SQLAlchemyModelFactory): diff --git a/nmdc_server/filters.py b/nmdc_server/filters.py index 11c857ba..a6199244 100644 --- a/nmdc_server/filters.py +++ b/nmdc_server/filters.py @@ -199,9 +199,7 @@ class BiosampleFilter(BaseFilter): def join_omics_processing(self, query: Query) -> Query: return self.join_self( - query.join( - models.Biosample, models.OmicsProcessing.biosample_id == models.Biosample.id - ), + query.join(models.OmicsProcessing, models.Biosample.omics_processing), Table.biosample, ) @@ -243,9 +241,7 @@ def join_omics_processing(self, query: Query) -> Query: def join_biosample(self, query: Query) -> Query: return self.join_self( - query.join( - models.OmicsProcessing, models.Biosample.id == models.OmicsProcessing.biosample_id - ), + query.join(models.Biosample, models.OmicsProcessing.biosample_inputs), Table.omics_processing, ) diff --git a/nmdc_server/ingest/all.py b/nmdc_server/ingest/all.py index a8441a61..39000afb 100644 --- a/nmdc_server/ingest/all.py +++ b/nmdc_server/ingest/all.py @@ -95,7 +95,11 @@ def load(db: Session, function_limit=None, skip_annotation=False): db.commit() logger.info("Loading omics processing...") - omics_processing.load(db, mongodb["omics_processing_set"].find()) + omics_processing.load( + db, + mongodb["omics_processing_set"].find(), + mongodb, + ) db.commit() logger.info("Loading metabolomics analysis...") diff --git a/nmdc_server/ingest/biosample.py b/nmdc_server/ingest/biosample.py index 4615ad5b..3a36b171 100644 --- a/nmdc_server/ingest/biosample.py +++ b/nmdc_server/ingest/biosample.py @@ -31,6 +31,10 @@ def extract_extras(cls, values): lat, lon = values.pop("lat_lon")["has_raw_value"].split(" ") values["latitude"] = float(lat) values["longitude"] = float(lon) + else: + # Workaround/testing code -- not to be included in PR + values["latitude"] = 0.0 + values["longitude"] = 0.0 return extract_extras(cls, values) diff --git a/nmdc_server/ingest/omics_processing.py b/nmdc_server/ingest/omics_processing.py index ed6b33f4..8e3ba930 100644 --- a/nmdc_server/ingest/omics_processing.py +++ b/nmdc_server/ingest/omics_processing.py @@ -1,10 +1,12 @@ import json import re from datetime import datetime -from typing import Any, Dict +from typing import Any, Dict, Optional from pydantic import root_validator, validator +from pymongo.collection import Collection from pymongo.cursor import Cursor +from pymongo.database import Database from sqlalchemy.orm import Session from nmdc_server import models @@ -18,6 +20,31 @@ date_fmt = re.compile(r"\d\d-[A-Z]+-\d\d \d\d\.\d\d\.\d\d\.\d+ [AP]M") +process_types = [ + "pooling", + "extraction", + "library_preparation", +] + + +collections = { + "biosample": "biosample_set", + "processed_sample": "processed_sample_set", + "extraction": "extraction_set", + "library_preparation": "library_preparation_set", + "pooling": "pooling_set", +} + + +omics_types = { + "metagenome": "Metagenome", + "metabolomics": "Metabolomics", + "proteomics": "Proteomics", + "metatranscriptome": "Metatranscriptome", + "organic matter characterization": "Organic Matter Characterization", +} + + class OmicsProcessing(OmicsProcessingCreate): _extract_value = validator("*", pre=True, allow_reuse=True)(extract_value) @@ -32,28 +59,88 @@ def coerce_date(cls, v): return v -omics_types = { - "metagenome": "Metagenome", - "metabolomics": "Metabolomics", - "proteomics": "Proteomics", - "metatranscriptome": "Metatranscriptome", - "organic matter characterization": "Organic Matter Characterization", -} - - -def load_omics_processing(db: Session, obj: Dict[str, Any]): +def is_biosample(object_id, biosample_collection): + return list(biosample_collection.find({"id": object_id})) + + +def find_parent_process(output_id: str, mongodb: Database) -> Optional[dict[str, Any]]: + """Given a ProcessedSample ID, find the process (e.g. Extraction) that created it.""" + output_found = False + collections_left = True + while not output_found and collections_left: + for name in process_types: + collection: Collection = mongodb[collections[name]] + query = collection.find({"has_output": output_id}, no_cursor_timeout=True) + result_list = list(query) + if len(result_list): + output_found = True + return result_list[0] + collections_left = False + return None + + +def get_biosample_input_ids(input_id: str, mongodb: Database, results: set) -> set[Any]: + """ + Given an input ID return all biosample objects that are included in the input resource. + + OmicsProcessing objects can take Biosamples or ProcessedSamples as inputs. Work needs to be done + to determine which biosamples make up a given ProcessedSample. This function recursively tries + to determine those Biosamples. + """ + # Base case, the input is already a biosample + biosample_collection: Collection = mongodb["biosample_set"] + processed_sample_collection: Collection = mongodb["processed_sample_set"] + if is_biosample(input_id, biosample_collection): + results.add(input_id) + return results + + # The given input is not a Biosample or Processed sample. Stop here. + # Maybe this should report an error? + query = list(processed_sample_collection.find({"id": input_id}, no_cursor_timeout=True)) + if not query: + return results + + processed_sample_id = query[0]["id"] + + # Recursive case. For processed samples find the process that created it, + # and check the inputs of that process. + parent_process = find_parent_process(processed_sample_id, mongodb) + if parent_process: + for parent_input_id in parent_process["has_input"]: + get_biosample_input_ids(parent_input_id, mongodb, results) + return results + + +def load_omics_processing(db: Session, obj: Dict[str, Any], mongodb: Database, logger): logger = get_logger(__name__) - obj["biosample_id"] = obj.pop("has_input", [None])[0] + input_ids: list[str] = obj.pop("has_input", [""]) + biosample_input_ids: set[str] = set() + for input_id in input_ids: + biosample_input_ids.union(get_biosample_input_ids(input_id, mongodb, biosample_input_ids)) + if len(biosample_input_ids) > 1: + logger.error("Processed sample input detected") + logger.error(obj["id"]) + logger.error(biosample_input_ids) + + obj["biosample_inputs"] = [] + biosample_input_objects = [] + for biosample_id in biosample_input_ids: + biosample_object = db.query(models.Biosample).get(biosample_id) + if not biosample_object: + logger.warn(f"Unknown biosample {biosample_id}") + missing_["biosample"].add(biosample_id) + else: + biosample_input_objects.append(biosample_object) + data_objects = obj.pop("has_output", []) obj["study_id"] = obj.pop("part_of", [None])[0] raw_omics_type: str = obj["omics_type"]["has_raw_value"] obj["omics_type"]["has_raw_value"] = omics_types[raw_omics_type.lower()] - if obj["biosample_id"] and db.query(models.Biosample).get(obj["biosample_id"]) is None: - logger.warn(f"Unknown biosample {obj['biosample_id']}") - missing_["biosample"].add(obj.pop("biosample_id")) - omics_processing = models.OmicsProcessing(**OmicsProcessing(**obj).dict()) + for biosample_object in biosample_input_objects: + # mypy thinks that omics_processing.biosample_inputs is of type Biosample + omics_processing.biosample_inputs.append(biosample_object) # type: ignore for data_object_id in data_objects: data_object = db.query(models.DataObject).get(data_object_id) @@ -73,11 +160,11 @@ def load_omics_processing(db: Session, obj: Dict[str, Any]): db.add(omics_processing) -def load(db: Session, cursor: Cursor): +def load(db: Session, cursor: Cursor, mongodb: Database): logger = get_logger(__name__) for obj in cursor: try: - load_omics_processing(db, obj) + load_omics_processing(db, obj, mongodb, logger) except Exception as err: logger.error(err) logger.error("Error parsing omics_processing:") diff --git a/nmdc_server/migrations/versions/af8b2e3c91b2_biosample_omics_association.py b/nmdc_server/migrations/versions/af8b2e3c91b2_biosample_omics_association.py new file mode 100644 index 00000000..b12808c4 --- /dev/null +++ b/nmdc_server/migrations/versions/af8b2e3c91b2_biosample_omics_association.py @@ -0,0 +1,99 @@ +"""Add many-to-many relationship between biosample and omics_processing + +Revision ID: af8b2e3c91b2 +Revises: dcc0a41b60af +Create Date: 2023-07-10 18:18:46.785564 + +""" +from typing import Optional + +import sqlalchemy as sa +from alembic import op +from sqlalchemy import Column, ForeignKey, String, Table +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import Session, relationship + +Base = declarative_base() + +# revision identifiers, used by Alembic. +revision: str = "af8b2e3c91b2" +down_revision: Optional[str] = "dcc0a41b60af" +branch_labels: Optional[str] = None +depends_on: Optional[str] = None + + +class Biosample(Base): + __tablename__ = "biosample" + id = Column(String, primary_key=True) + + +class OmicsProcessing(Base): + __tablename__ = "omics_processing" + id = Column(String, primary_key=True) + biosample_id = Column(String, ForeignKey("biosample.id"), nullable=True) + biosample = relationship("Biosample", backref="omics_processing") + + +biosample_input_association = Table( + "biosample_input_association", + Base.metadata, + Column("biosample_id", String), + Column("omics_processing_id", String), +) + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "biosample_input_association", + sa.Column("omics_processing_id", sa.String(), nullable=False), + sa.Column("biosample_id", sa.String(), nullable=False), + sa.ForeignKeyConstraint( + ["biosample_id"], + ["biosample.id"], + name=op.f("fk_biosample_input_association_biosample_id_biosample"), + ), + sa.ForeignKeyConstraint( + ["omics_processing_id"], + ["omics_processing.id"], + name=op.f("fk_biosample_input_association_omics_processing_id_omics_processing"), + ), + sa.PrimaryKeyConstraint( + "omics_processing_id", "biosample_id", name=op.f("pk_biosample_input_association") + ), + ) + + session = Session(bind=op.get_bind()) + mappings = [] + for omics_processing in session.query(OmicsProcessing): + if omics_processing.biosample_id: + mappings.append( + { + "biosample_id": omics_processing.biosample_id, + "omics_processing_id": omics_processing.id, + } + ) + op.bulk_insert(biosample_input_association, mappings) + + op.drop_constraint( + "fk_omics_processing_biosample_id_biosample", "omics_processing", type_="foreignkey" + ) + op.drop_column("omics_processing", "biosample_id") + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column( + "omics_processing", + sa.Column("biosample_id", sa.VARCHAR(), autoincrement=False, nullable=True), + ) + op.create_foreign_key( + "fk_omics_processing_biosample_id_biosample", + "omics_processing", + "biosample", + ["biosample_id"], + ["id"], + ) + op.drop_table("biosample_input_association") + # ### end Alembic commands ### diff --git a/nmdc_server/models.py b/nmdc_server/models.py index 467e1e71..46a5aa8a 100644 --- a/nmdc_server/models.py +++ b/nmdc_server/models.py @@ -268,6 +268,14 @@ def doi_map(self) -> Dict[str, Any]: return doi_info +biosample_input_association = Table( + "biosample_input_association", + Base.metadata, + Column("omics_processing_id", ForeignKey("omics_processing.id"), primary_key=True), + Column("biosample_id", ForeignKey("biosample.id"), primary_key=True), +) + + class Biosample(Base, AnnotatedModel): __tablename__ = "biosample" @@ -286,6 +294,9 @@ class Biosample(Base, AnnotatedModel): study_id = Column(String, ForeignKey("study.id"), nullable=False) multiomics = Column(Integer, nullable=False, default=0) emsl_biosample_identifiers = Column(JSONB, nullable=True) + omics_processing = relationship( + "OmicsProcessing", secondary=biosample_input_association, back_populates="biosample_inputs" + ) # gold terms ecosystem = Column(String, nullable=True) @@ -332,8 +343,11 @@ class OmicsProcessing(Base, AnnotatedModel): add_date = Column(DateTime, nullable=True) mod_date = Column(DateTime, nullable=True) - biosample_id = Column(String, ForeignKey("biosample.id"), nullable=True) - biosample = relationship("Biosample", backref="omics_processing") + biosample_inputs = relationship( + "Biosample", secondary=biosample_input_association, back_populates="omics_processing" + ) + # biosample_id = Column(String, ForeignKey("biosample.id"), nullable=True) + # biosample = relationship("Biosample", backref="omics_processing") study_id = Column(String, ForeignKey("study.id"), nullable=True) study = relationship("Study", backref="omics_processing") diff --git a/nmdc_server/schemas.py b/nmdc_server/schemas.py index 219e67dc..e742a6cf 100644 --- a/nmdc_server/schemas.py +++ b/nmdc_server/schemas.py @@ -292,6 +292,9 @@ class BiosampleBase(AnnotatedBase): ecosystem_subtype: Optional[str] specific_ecosystem: Optional[str] + class Config: + orm_mode = True + class BiosampleCreate(BiosampleBase): emsl_biosample_identifiers: List[str] = [] @@ -317,7 +320,7 @@ class Config: # omics_processing class OmicsProcessingBase(AnnotatedBase): study_id: Optional[str] - biosample_id: Optional[str] + biosample_inputs: list[BiosampleBase] = [] add_date: Optional[DateType] mod_date: Optional[DateType] @@ -328,10 +331,21 @@ class OmicsProcessingCreate(OmicsProcessingBase): class OmicsProcessing(OmicsProcessingBase): open_in_gold: Optional[str] + biosample_ids: list[str] = [] omics_data: List["OmicsTypes"] outputs: List["DataObject"] + @validator("biosample_ids") + @classmethod + def set_biosample_ids(cls, biosample_ids: list[str], values: dict[str, Any]) -> list[str]: + # Only capture biosample IDs in responses + biosample_objects: list[BiosampleBase] = values.get("biosample_inputs", []) + biosample_ids = biosample_ids + [biosample.id for biosample in biosample_objects] + values.pop("biosample_inputs") + + return biosample_ids + class Config: orm_mode = True diff --git a/tests/test_app.py b/tests/test_app.py index 113b593d..1eee0903 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -147,6 +147,7 @@ def test_get_environmental_aggregation(db: Session, client: TestClient): ) def test_list_data_objects(db: Session, client: TestClient, endpoint: str): data_object = fakes.DataObjectFactory(id="do") + biosample_input = fakes.BiosampleFactory(id="b") omics_processing = fakes.OmicsProcessingFactory(id="1") reads_qc = fakes.ReadsQCFactory(id="1") assembly = fakes.MetagenomeAssemblyFactory(id="1") @@ -154,6 +155,7 @@ def test_list_data_objects(db: Session, client: TestClient, endpoint: str): analysis = fakes.MetaproteomicAnalysisFactory(id="1") omics_processing.outputs = [data_object] + omics_processing.biosample_inputs = [biosample_input] reads_qc.outputs = [data_object] assembly.outputs = [data_object] annotation.outputs = [data_object] diff --git a/tests/test_download.py b/tests/test_download.py index 687441ea..4212411e 100644 --- a/tests/test_download.py +++ b/tests/test_download.py @@ -7,8 +7,8 @@ def test_bulk_download_query(db: Session): sample = fakes.BiosampleFactory() - op1 = fakes.OmicsProcessingFactory(biosample=sample) - fakes.OmicsProcessingFactory(biosample=sample) + op1 = fakes.OmicsProcessingFactory(biosample_inputs=[sample]) + fakes.OmicsProcessingFactory(biosample_inputs=[sample]) raw1 = fakes.DataObjectFactory( url="https://data.microbiomedata.org/data/raw", @@ -50,8 +50,8 @@ def test_bulk_download_query(db: Session): def test_generate_bulk_download(db: Session, client: TestClient, token): sample = fakes.BiosampleFactory() - op1 = fakes.OmicsProcessingFactory(biosample=sample) - fakes.OmicsProcessingFactory(biosample=sample) + op1 = fakes.OmicsProcessingFactory(biosample_inputs=[sample]) + fakes.OmicsProcessingFactory(biosample_inputs=[sample]) raw1 = fakes.DataObjectFactory( url="https://data.microbiomedata.org/data/raw", @@ -81,8 +81,8 @@ def test_generate_bulk_download(db: Session, client: TestClient, token): def test_generate_bulk_download_filtered(db: Session, client: TestClient, token): sample = fakes.BiosampleFactory() - op1 = fakes.OmicsProcessingFactory(biosample=sample) - fakes.OmicsProcessingFactory(biosample=sample) + op1 = fakes.OmicsProcessingFactory(biosample_inputs=[sample]) + fakes.OmicsProcessingFactory(biosample_inputs=[sample]) raw1 = fakes.DataObjectFactory( url="https://data.microbiomedata.org/data/raw", diff --git a/tests/test_query.py b/tests/test_query.py index 08159a45..3a2157f2 100644 --- a/tests/test_query.py +++ b/tests/test_query.py @@ -138,7 +138,9 @@ def test_grouped_query(db: Session): def test_indirect_join(db: Session): study = fakes.StudyFactory(id="study1") - fakes.OmicsProcessingFactory(id="omics_processing1", biosample__study=study) + biosample = fakes.BiosampleFactory(id="b", study=study) + # fakes.OmicsProcessingFactory(id="omics_processing1", biosample__study=study) + fakes.OmicsProcessingFactory(id="omics_processing1", biosample_inputs=[biosample]) db.commit() q = query.StudyQuerySchema( @@ -484,9 +486,11 @@ def test_query_pi(db: Session): ) def test_query_multiomics(db: Session, value: int, result: bool): biosample = fakes.BiosampleFactory() - fakes.OmicsProcessingFactory(annotations={"omics_type": "Metabolomics"}, biosample=biosample) fakes.OmicsProcessingFactory( - annotations={"omics_type": "Metatranscriptome"}, biosample=biosample + annotations={"omics_type": "Metabolomics"}, biosample_inputs=[biosample] + ) + fakes.OmicsProcessingFactory( + annotations={"omics_type": "Metatranscriptome"}, biosample_inputs=[biosample] ) db.commit()