Skip to content

Commit

Permalink
Merge pull request #1321 from microbiomedata/berkeley-schema-migration
Browse files Browse the repository at this point in the history
Update Data and Submission Portals to work with/use NMDC Schema version `v11.0.1` (Berkeley schema)
  • Loading branch information
pkalita-lbl authored Oct 8, 2024
2 parents f54032d + 88b4d75 commit 219f034
Show file tree
Hide file tree
Showing 17 changed files with 118 additions and 117 deletions.
39 changes: 22 additions & 17 deletions nmdc_server/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,13 +412,18 @@ async def get_study_image(study_id: str, db: Session = Depends(get_db)):
return StreamingResponse(BytesIO(image), media_type="image/jpeg")


# omics_processing
# data_generation
# Note the intermingling of the terms "data generation" and "omics processing."
# The Berkeley schema (NMDC schema v11) did away with the phrase "omics processing."
# As a result, public-facing uses of "omics processing" should be replaced with
# "data generation."
# Future work should go in to a more thorough conversion of omics process to data generation.
@router.post(
"/omics_processing/search",
"/data_generation/search",
response_model=query.OmicsProcessingSearchResponse,
tags=["omics_processing"],
name="Search for omics processings",
description="Faceted search of omics_processing data.",
tags=["data_generation"],
name="Search for data generations",
description="Faceted search of data_generation data.",
)
async def search_omics_processing(
query: query.SearchQuery = query.SearchQuery(),
Expand All @@ -429,19 +434,19 @@ async def search_omics_processing(


@router.post(
"/omics_processing/facet",
"/data_generation/facet",
response_model=query.FacetResponse,
tags=["omics_processing"],
tags=["data_generation"],
name="Get all values of an attribute",
)
async def facet_omics_processing(query: query.FacetQuery, db: Session = Depends(get_db)):
return crud.facet_omics_processing(db, query.attribute, query.conditions)


@router.post(
"/omics_processing/binned_facet",
"/data_generation/binned_facet",
response_model=query.BinnedFacetResponse,
tags=["omics_processing"],
tags=["data_generation"],
name="Get all values of a non-string attribute with binning",
)
async def binned_facet_omics_processing(
Expand All @@ -451,26 +456,26 @@ async def binned_facet_omics_processing(


@router.get(
"/omics_processing/{omics_processing_id}",
"/data_generation/{data_generation_id}",
response_model=schemas.OmicsProcessing,
tags=["omics_processing"],
tags=["data_generation"],
)
async def get_omics_processing(omics_processing_id: str, db: Session = Depends(get_db)):
db_omics_processing = crud.get_omics_processing(db, omics_processing_id)
async def get_omics_processing(data_generation_id: str, db: Session = Depends(get_db)):
db_omics_processing = crud.get_omics_processing(db, data_generation_id)
if db_omics_processing is None:
raise HTTPException(status_code=404, detail="OmicsProcessing not found")
return db_omics_processing


@router.get(
"/omics_processing/{omics_processing_id}/outputs",
"/data_generation/{data_generation_id}/outputs",
response_model=List[schemas.DataObject],
tags=["omics_processing"],
tags=["data_generation"],
)
async def list_omics_processing_data_objects(
omics_processing_id: str, db: Session = Depends(get_db)
data_generation_id: str, db: Session = Depends(get_db)
):
return crud.list_omics_processing_data_objects(db, omics_processing_id).all()
return crud.list_omics_processing_data_objects(db, data_generation_id).all()


# data object
Expand Down
18 changes: 9 additions & 9 deletions nmdc_server/data_object_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,18 @@ def get_local_data_url(url: Optional[str]) -> Optional[str]:


class WorkflowActivityTypeEnum(Enum):
reads_qc = "nmdc:ReadQCAnalysisActivity"
mags_analysis = "nmdc:MagsAnalysis"
metabolomics_analysis = "nmdc:MetabolomicsAnalysis"
metagenome_assembly = "nmdc:MetagenomeAssembly"
metagenome_annotation = "nmdc:MetagenomeAnnotation" # TODO name out of date, fix
metagenome_annotation = "nmdc:MetagenomeAnnotation"
metaproteomic_analysis = "nmdc:MetaproteomicAnalysis"
metatranscriptome = "nmdc:MetatranscriptomeAnalysis"
metatranscriptome_assembly = "nmdc:MetatranscriptomeAssembly"
metatranscriptome_annotation = "nmdc:MetatranscriptomeAnnotation" # TODO name out of date, fix
metaproteomic_analysis = "nmdc:MetaProteomicAnalysis"
mags_analysis = "nmdc:MAGsAnalysisActivity"
read_based_analysis = "nmdc:ReadbasedAnalysis" # TODO name out of date, fix
nom_analysis = "nmdc:NomAnalysisActivity"
metabolomics_analysis = "nmdc:MetabolomicsAnalysisActivity"
metatranscriptome_annotation = "nmdc:MetatranscriptomeAnnotation"
nom_analysis = "nmdc:NomAnalysis"
raw_data = "nmdc:RawData"
metatranscriptome = "nmdc:metaT"
read_based_analysis = "nmdc:ReadBasedTaxonomyAnalysis"
reads_qc = "nmdc:ReadQcAnalysis"

@property
def model(self):
Expand Down
41 changes: 23 additions & 18 deletions nmdc_server/ingest/all.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,22 +90,23 @@ def load(db: Session, function_limit=None, skip_annotation=False):
biosample.load(
db,
cursor,
omics_processing=mongodb["omics_processing_set"],
)
db.commit()

logger.info("Loading omics processing...")
omics_processing.load(
db,
mongodb["omics_processing_set"].find(),
mongodb["data_generation_set"].find(),
mongodb,
)
db.commit()

workflow_set = "workflow_execution_set"

logger.info("Loading metabolomics analysis...")
pipeline.load(
db,
mongodb["metabolomics_analysis_activity_set"].find(),
mongodb[workflow_set].find({"type": WorkflowActivityTypeEnum.metabolomics_analysis.value}),
pipeline.load_metabolomics_analysis,
WorkflowActivityTypeEnum.metabolomics_analysis.value,
)
Expand All @@ -114,7 +115,7 @@ def load(db: Session, function_limit=None, skip_annotation=False):
logger.info("Loading read based analysis...")
pipeline.load(
db,
mongodb["read_based_taxonomy_analysis_activity_set"].find(),
mongodb[workflow_set].find({"type": WorkflowActivityTypeEnum.read_based_analysis.value}),
pipeline.load_read_based_analysis,
WorkflowActivityTypeEnum.read_based_analysis.value,
)
Expand All @@ -123,23 +124,25 @@ def load(db: Session, function_limit=None, skip_annotation=False):
logger.info("Loading metatranscriptome expression analyses...")
pipeline.load(
db,
mongodb["metatranscriptome_expression_analysis_set"].find(),
mongodb[workflow_set].find({"type": WorkflowActivityTypeEnum.metatranscriptome.value}),
pipeline.load_metatranscriptome,
WorkflowActivityTypeEnum.metatranscriptome.value,
)

logger.info("Loading metatranscriptome assemblies...")
pipeline.load(
db,
mongodb["metatranscriptome_assembly_set"].find(),
mongodb[workflow_set].find(
{"type": WorkflowActivityTypeEnum.metatranscriptome_assembly.value}
),
pipeline.load_mt_assembly,
WorkflowActivityTypeEnum.metatranscriptome_assembly.value,
)

logger.info("Loading NOM analysis...")
pipeline.load(
db,
mongodb["nom_analysis_activity_set"].find(),
mongodb[workflow_set].find({"type": WorkflowActivityTypeEnum.nom_analysis.value}),
pipeline.load_nom_analysis,
WorkflowActivityTypeEnum.nom_analysis.value,
)
Expand All @@ -148,7 +151,7 @@ def load(db: Session, function_limit=None, skip_annotation=False):
logger.info("Loading MAGs...")
pipeline.load(
db,
mongodb["mags_activity_set"].find(),
mongodb[workflow_set].find({"type": WorkflowActivityTypeEnum.mags_analysis.value}),
pipeline.load_mags,
WorkflowActivityTypeEnum.mags_analysis.value,
)
Expand All @@ -163,13 +166,12 @@ def load(db: Session, function_limit=None, skip_annotation=False):

# This has historically been fast, but it is only for the progress bar.
# It can be removed if it becomes slow.
count = mongodb["metagenome_annotation_activity_set"].estimated_document_count()
iterator = paginate_cursor(
mongodb["metagenome_annotation_activity_set"],
page_size=1, # prevent cursor from timing out
no_cursor_timeout=True,
annotation_activities = list(
mongodb[workflow_set].find({"type": "nmdc:MetagenomeAnnotation"}, batch_size=100)
)
with click.progressbar(iterator, length=count) as bar:
# TODO test this and make sure it works as expected
# this undoes the pagination that existed before
with click.progressbar(annotation_activities, length=len(annotation_activities)) as bar:
pipeline.load(
db,
bar,
Expand All @@ -191,7 +193,9 @@ def load(db: Session, function_limit=None, skip_annotation=False):
logger.info("Loading metatranscriptome annotation...")
pipeline.load(
db,
mongodb["metatranscriptome_annotation_set"].find(),
mongodb[workflow_set].find(
{"type": WorkflowActivityTypeEnum.metatranscriptome_annotation.value}
),
pipeline.load_mt_annotation,
WorkflowActivityTypeEnum.metatranscriptome_annotation.value,
annotations=mongodb["functional_annotation_agg"],
Expand All @@ -205,7 +209,7 @@ def load(db: Session, function_limit=None, skip_annotation=False):
logger.info("Loading read qc...")
pipeline.load(
db,
mongodb["read_qc_analysis_activity_set"].find(),
mongodb[workflow_set].find({"type": WorkflowActivityTypeEnum.reads_qc.value}),
pipeline.load_reads_qc,
WorkflowActivityTypeEnum.reads_qc.value,
)
Expand All @@ -215,7 +219,8 @@ def load(db: Session, function_limit=None, skip_annotation=False):
logger.info("Loading metaproteomic analysis...")
pipeline.load(
db,
mongodb["metaproteomics_analysis_activity_set"].find(
mongodb[workflow_set].find(
{"type": "nmdc:MetaproteomicsAnalysis"},
no_cursor_timeout=True,
),
pipeline.load_mp_analysis,
Expand All @@ -232,7 +237,7 @@ def load(db: Session, function_limit=None, skip_annotation=False):
logger.info("Loading metagenome assembly...")
pipeline.load(
db,
mongodb["metagenome_assembly_set"].find(),
mongodb[workflow_set].find({"type": WorkflowActivityTypeEnum.metagenome_assembly.value}),
pipeline.load_mg_assembly,
WorkflowActivityTypeEnum.metagenome_assembly.value,
)
Expand Down
20 changes: 8 additions & 12 deletions nmdc_server/ingest/biosample.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from typing import Any, Dict

from pydantic import root_validator, validator
from pymongo.collection import Collection
from pymongo.cursor import Cursor
from sqlalchemy.orm import Session

Expand Down Expand Up @@ -92,7 +91,7 @@ def coerce_collection_date(cls, value):
return raw_value


def load_biosample(db: Session, obj: Dict[str, Any], omics_processing: Collection):
def load_biosample(db: Session, obj: Dict[str, Any]):
logger = get_logger(__name__)
env_broad_scale_id = obj.pop("env_broad_scale", {}).get("term", {}).get("id", "")
env_broad_scale = db.query(models.EnvoTerm).get(env_broad_scale_id.replace("_", ":"))
Expand All @@ -108,15 +107,12 @@ def load_biosample(db: Session, obj: Dict[str, Any], omics_processing: Collectio
if env_medium:
obj["env_medium_id"] = env_medium.id

omics_processing_record = omics_processing.find_one({"has_input": obj["id"]})
part_of = obj.pop("part_of", None)
if part_of is None:
if omics_processing_record is None:
logger.error(f"Could not determine study for biosample {obj['id']}")
return
part_of = omics_processing_record["part_of"]
associated_studies = obj.pop("associated_studies", None)
if associated_studies is None:
logger.error(f"Could not determine study for biosample {obj['id']}")
return

obj["study_id"] = part_of[0]
obj["study_id"] = associated_studies[0]
depth_obj = obj.get("depth", {})
obj["depth"] = extract_quantity(depth_obj, "biosample", "depth")

Expand All @@ -143,11 +139,11 @@ def load_biosample(db: Session, obj: Dict[str, Any], omics_processing: Collectio
db.add(models.Biosample(**biosample.dict()))


def load(db: Session, cursor: Cursor, omics_processing: Collection):
def load(db: Session, cursor: Cursor):
logger = get_logger(__name__)
for obj in cursor:
try:
load_biosample(db, obj, omics_processing)
load_biosample(db, obj)
except Exception as err:
logger.error(f"Error parsing biosample: {err}")
logger.error(json.dumps(obj, indent=2, default=str))
Expand Down
52 changes: 18 additions & 34 deletions nmdc_server/ingest/omics_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,15 @@
date_fmt = re.compile(r"\d\d-[A-Z]+-\d\d \d\d\.\d\d\.\d\d\.\d+ [AP]M")


process_types = [
"pooling",
"extraction",
"library_preparation",
]


collections = {
"biosample": "biosample_set",
"processed_sample": "processed_sample_set",
"extraction": "extraction_set",
"library_preparation": "library_preparation_set",
"pooling": "pooling_set",
}


omics_types = {
"metagenome": "Metagenome",
"metabolome": "Metabolomics",
"metabolomics": "Metabolomics",
"metaproteome": "Proteomics",
"proteomics": "Proteomics",
"metatranscriptome": "Metatranscriptome",
"organic matter characterization": "Organic Matter Characterization",
"nom": "Organic Matter Characterization",
}


Expand All @@ -65,17 +52,11 @@ def is_biosample(object_id, biosample_collection):

def find_parent_process(output_id: str, mongodb: Database) -> Optional[dict[str, Any]]:
"""Given a ProcessedSample ID, find the process (e.g. Extraction) that created it."""
output_found = False
collections_left = True
while not output_found and collections_left:
for name in process_types:
collection: Collection = mongodb[collections[name]]
query = collection.find({"has_output": output_id}, no_cursor_timeout=True)
result_list = list(query)
if len(result_list):
output_found = True
return result_list[0]
collections_left = False
material_processing_collection: Collection = mongodb["material_processing_set"]
query = material_processing_collection.find({"has_output": output_id}, no_cursor_timeout=True)
result_list = list(query)
if len(result_list):
return result_list[0]
return None


Expand Down Expand Up @@ -117,10 +98,6 @@ def load_omics_processing(db: Session, obj: Dict[str, Any], mongodb: Database, l
biosample_input_ids: set[str] = set()
for input_id in input_ids:
biosample_input_ids.union(get_biosample_input_ids(input_id, mongodb, biosample_input_ids))
if len(biosample_input_ids) > 1:
logger.error("Processed sample input detected")
logger.error(obj["id"])
logger.error(biosample_input_ids)

obj["biosample_inputs"] = []
biosample_input_objects = []
Expand All @@ -133,9 +110,16 @@ def load_omics_processing(db: Session, obj: Dict[str, Any], mongodb: Database, l
biosample_input_objects.append(biosample_object)

data_objects = obj.pop("has_output", [])
obj["study_id"] = obj.pop("part_of", [None])[0]
raw_omics_type: str = obj["omics_type"]["has_raw_value"]
obj["omics_type"]["has_raw_value"] = omics_types[raw_omics_type.lower()]
obj["study_id"] = obj.pop("associated_studies", [None])[0]
obj["analyte_category"] = omics_types[obj["analyte_category"].lower()]
obj["omics_type"] = omics_types[obj["analyte_category"].lower()]

# Get instrument name
instrument_id = obj.pop("instrument_used", [])
if instrument_id:
instrument = mongodb["instrument_set"].find_one({"id": instrument_id[0]})
if instrument:
obj["instrument_name"] = instrument["name"]

omics_processing = models.OmicsProcessing(**OmicsProcessing(**obj).dict())
for biosample_object in biosample_input_objects:
Expand Down
11 changes: 10 additions & 1 deletion nmdc_server/ingest/study.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,16 @@ def load(db: Session, cursor: Cursor):
doi["doi_value"] = transform_doi(doi.pop("doi_value"))

for doi in dois:
upsert_doi(db, **doi)
upsert_doi(
db,
doi_value=doi["doi_value"],
doi_category=doi["doi_category"],
doi_provider=doi.get("doi_provider", ""),
)

protocol_links = obj.pop("protocol_link", None)
if protocol_links:
obj["relevant_protocols"] = [p["url"] for p in protocol_links if "url" in p]

new_study = create_study(db, Study(**obj))
if dois:
Expand Down
Loading

0 comments on commit 219f034

Please sign in to comment.