diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml
index 1c81caa00..faed4cd2f 100644
--- a/.github/workflows/test.yaml
+++ b/.github/workflows/test.yaml
@@ -66,6 +66,7 @@ jobs:
- name: "Run unit tests"
id: runtests
run: |
+ export SM_ENVIRONMENT=local
coverage run -m unittest discover -p 'test*.py' -s '.'
rc=$?
coverage xml
diff --git a/.gitignore b/.gitignore
index d5fa95d5a..c4a999435 100644
--- a/.gitignore
+++ b/.gitignore
@@ -60,3 +60,9 @@ web/src/__generated__
# pulumi config files
Pulumi*.yaml
+
+# pnpm package manager
+pnpm-lock.yaml
+
+# env
+.env
diff --git a/api/graphql/schema.py b/api/graphql/schema.py
index 436a1c334..d19021fca 100644
--- a/api/graphql/schema.py
+++ b/api/graphql/schema.py
@@ -235,27 +235,29 @@ class GraphQLAnalysis:
id: int
type: str
status: strawberry.enum(AnalysisStatus)
- output: str | None
timestamp_completed: datetime.datetime | None = None
active: bool
meta: strawberry.scalars.JSON
-
+ output: strawberry.scalars.JSON
+ outputs: strawberry.scalars.JSON
@staticmethod
def from_internal(internal: AnalysisInternal) -> 'GraphQLAnalysis':
return GraphQLAnalysis(
id=internal.id,
type=internal.type,
status=internal.status,
- output=internal.output,
timestamp_completed=internal.timestamp_completed,
active=internal.active,
meta=internal.meta,
+ output=internal.output,
+ outputs=internal.outputs,
)
@strawberry.field
async def sequencing_groups(
self, info: Info, root: 'GraphQLAnalysis'
) -> list['GraphQLSequencingGroup']:
+
loader = info.context[LoaderKeys.SEQUENCING_GROUPS_FOR_ANALYSIS]
sgs = await loader.load(root.id)
return [GraphQLSequencingGroup.from_internal(sg) for sg in sgs]
diff --git a/api/routes/analysis.py b/api/routes/analysis.py
index b3298d2c2..7fd00a11d 100644
--- a/api/routes/analysis.py
+++ b/api/routes/analysis.py
@@ -1,7 +1,7 @@
import csv
import io
from datetime import date
-from typing import Any
+from typing import Any, Optional, Union
from fastapi import APIRouter
from fastapi.params import Body, Query
@@ -45,7 +45,7 @@ class AnalysisModel(BaseModel):
type: str
status: AnalysisStatus
meta: dict[str, Any] | None = None
- output: str | None = None
+ outputs: Optional[Union[str, dict]] = None
active: bool = True
# please don't use this, unless you're the analysis-runner,
# the usage is tracked ... (Ծ_Ծ)
@@ -56,7 +56,7 @@ class AnalysisUpdateModel(BaseModel):
"""Update analysis model"""
status: AnalysisStatus
- output: str | None = None
+ outputs: str | None = None
meta: dict[str, Any] | None = None
active: bool | None = None
@@ -73,7 +73,7 @@ class AnalysisQueryModel(BaseModel):
type: str | None = None
status: AnalysisStatus | None = None
meta: dict[str, Any] | None = None
- output: str | None = None
+ outputs: str | None = None
active: bool | None = None
def to_filter(self, project_id_map: dict[str, int]) -> AnalysisFilter:
@@ -130,7 +130,7 @@ async def update_analysis(
"""Update status of analysis"""
atable = AnalysisLayer(connection)
await atable.update_analysis(
- analysis_id, status=analysis.status, output=analysis.output, meta=analysis.meta
+ analysis_id, status=analysis.status, outputs=analysis.outputs, meta=analysis.meta
)
return True
diff --git a/db/project.xml b/db/project.xml
index 118a934ba..dccc0ab9d 100644
--- a/db/project.xml
+++ b/db/project.xml
@@ -1107,4 +1107,90 @@
ALTER TABLE sequencing_group_assay CHANGE author author VARCHAR(255) NULL;
ALTER TABLE sequencing_group_external_id CHANGE author author VARCHAR(255) NULL;
+
+ SET @@system_versioning_alter_history = 1;
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ ALTER TABLE `output_file` ADD SYSTEM VERSIONING;
+ ALTER TABLE `analysis_outputs` ADD SYSTEM VERSIONING;
+ ALTER TABLE `analysis_outputs` ADD CONSTRAINT `chk_file_id_output` CHECK ((file_id IS NOT NULL AND output IS NULL) OR (file_id IS NULL AND output IS NOT NULL));
+
diff --git a/db/python/connect.py b/db/python/connect.py
index 4a5abcba4..f647b8d6a 100644
--- a/db/python/connect.py
+++ b/db/python/connect.py
@@ -26,6 +26,7 @@
'sequencing_group',
'assay',
'sequencing_group_assay',
+ 'analysis_outputs',
'analysis_sequencing_group',
'analysis_sample',
'assay_external_id',
diff --git a/db/python/layers/analysis.py b/db/python/layers/analysis.py
index 878873386..851f8d021 100644
--- a/db/python/layers/analysis.py
+++ b/db/python/layers/analysis.py
@@ -1,4 +1,5 @@
import datetime
+import warnings
from collections import defaultdict
from typing import Any
@@ -7,6 +8,7 @@
from db.python.layers.base import BaseLayer
from db.python.layers.sequencing_group import SequencingGroupLayer
from db.python.tables.analysis import AnalysisFilter, AnalysisTable
+from db.python.tables.output_file import OutputFileTable
from db.python.tables.sample import SampleTable
from db.python.tables.sequencing_group import SequencingGroupFilter
from db.python.utils import GenericFilter, get_logger
@@ -46,8 +48,9 @@ class AnalysisLayer(BaseLayer):
def __init__(self, connection: Connection):
super().__init__(connection)
- self.sampt = SampleTable(connection)
- self.at = AnalysisTable(connection)
+ self.sample_table = SampleTable(connection)
+ self.analysis_table = AnalysisTable(connection)
+ self.output_file_table = OutputFileTable(connection)
# GETS
@@ -62,7 +65,7 @@ async def get_analyses_for_samples(
Get a list of all analysis that relevant for samples
"""
- projects, analysis = await self.at.get_analyses_for_samples(
+ projects, analysis = await self.analysis_table.get_analyses_for_samples(
sample_ids,
analysis_type=analysis_type,
status=status,
@@ -80,7 +83,7 @@ async def get_analyses_for_samples(
async def get_analysis_by_id(self, analysis_id: int, check_project_id=True):
"""Get analysis by ID"""
- project, analysis = await self.at.get_analysis_by_id(analysis_id)
+ project, analysis = await self.analysis_table.get_analysis_by_id(analysis_id)
if check_project_id:
await self.ptable.check_access_to_project_id(
self.author, project, readonly=True
@@ -95,7 +98,7 @@ async def get_latest_complete_analysis_for_type(
meta: dict[str, Any] = None,
) -> AnalysisInternal:
"""Get SINGLE latest complete analysis for some analysis type"""
- return await self.at.get_latest_complete_analysis_for_type(
+ return await self.analysis_table.get_latest_complete_analysis_for_type(
project=project, analysis_type=analysis_type, meta=meta
)
@@ -105,7 +108,7 @@ async def get_all_sequencing_group_ids_without_analysis_type(
"""
Find all the sequencing_groups that don't have an "analysis_type"
"""
- return await self.at.get_all_sequencing_group_ids_without_analysis_type(
+ return await self.analysis_table.get_all_sequencing_group_ids_without_analysis_type(
analysis_type=analysis_type, project=project
)
@@ -115,7 +118,7 @@ async def get_incomplete_analyses(
"""
Gets details of analysis with status queued or in-progress
"""
- return await self.at.get_incomplete_analyses(project=project)
+ return await self.analysis_table.get_incomplete_analyses(project=project)
async def get_sample_cram_path_map_for_seqr(
self,
@@ -124,7 +127,7 @@ async def get_sample_cram_path_map_for_seqr(
participant_ids: list[int] = None,
) -> list[dict[str, Any]]:
"""Get (ext_participant_id, cram_path, internal_id) map"""
- return await self.at.get_sample_cram_path_map_for_seqr(
+ return await self.analysis_table.get_sample_cram_path_map_for_seqr(
project=project,
sequencing_types=sequencing_types,
participant_ids=participant_ids,
@@ -132,7 +135,7 @@ async def get_sample_cram_path_map_for_seqr(
async def query(self, filter_: AnalysisFilter, check_project_ids=True):
"""Query analyses"""
- analyses = await self.at.query(filter_)
+ analyses = await self.analysis_table.query(filter_)
if not analyses:
return []
@@ -186,7 +189,7 @@ async def get_cram_size_proportionate_map(
sg_by_id = {sg.id: sg for sg in sequencing_groups}
sg_to_project = {sg.id: sg.project for sg in sequencing_groups}
- cram_list = await self.at.query(
+ cram_list = await self.analysis_table.query(
AnalysisFilter(
sequencing_group_id=GenericFilter(in_=list(sg_to_project.keys())),
type=GenericFilter(eq='cram'),
@@ -493,13 +496,13 @@ async def get_sgs_added_by_day_by_es_indices(
# was removed. So we'll sum up all SGs up to the start date and then use that
# as the starting point for the prop map.
- by_day[start] = await self.at.find_sgs_in_joint_call_or_es_index_up_to_date(
+ by_day[start] = await self.analysis_table.find_sgs_in_joint_call_or_es_index_up_to_date(
date=start
)
if start < ES_ANALYSIS_OBJ_INTRO_DATE:
# do a special check for joint-calling
- joint_calls = await self.at.query(
+ joint_calls = await self.analysis_table.query(
AnalysisFilter(
type=GenericFilter(eq='joint-calling'),
status=GenericFilter(eq=AnalysisStatus.COMPLETED),
@@ -514,7 +517,7 @@ async def get_sgs_added_by_day_by_es_indices(
for jc in joint_calls:
by_day[jc.timestamp_completed.date()].update(jc.sequencing_group_ids)
- es_indices = await self.at.query(
+ es_indices = await self.analysis_table.query(
AnalysisFilter(
type=GenericFilter(eq='es-index'),
status=GenericFilter(eq=AnalysisStatus.COMPLETED),
@@ -535,7 +538,7 @@ async def get_audit_logs_by_analysis_ids(
self, analysis_ids: list[int]
) -> dict[int, list[AuditLogInternal]]:
"""Get audit logs for analysis IDs"""
- return await self.at.get_audit_log_for_analysis_ids(analysis_ids)
+ return await self.analysis_table.get_audit_log_for_analysis_ids(analysis_ids)
# CREATE / UPDATE
@@ -545,27 +548,37 @@ async def create_analysis(
project: ProjectId = None,
) -> int:
"""Create a new analysis"""
- return await self.at.create_analysis(
+ new_analysis_id = await self.analysis_table.create_analysis(
analysis_type=analysis.type,
status=analysis.status,
sequencing_group_ids=analysis.sequencing_group_ids,
meta=analysis.meta,
- output=analysis.output,
active=analysis.active,
project=project,
)
+ # TODO deprecate the output field
+ if analysis.output:
+ warnings.warn('Analysis.output will be deprecated, use Analysis.outputs instead', PendingDeprecationWarning, stacklevel=2)
+
+ await self.output_file_table.create_or_update_analysis_output_files_from_json(analysis_id=new_analysis_id, json_dict=analysis.output)
+
+ elif analysis.outputs:
+ await self.output_file_table.create_or_update_analysis_output_files_from_json(analysis_id=new_analysis_id, json_dict=analysis.outputs)
+
+ return new_analysis_id
+
async def add_sequencing_groups_to_analysis(
self, analysis_id: int, sequencing_group_ids: list[int], check_project_id=True
):
"""Add samples to an analysis (through the linked table)"""
if check_project_id:
- project_ids = await self.at.get_project_ids_for_analysis_ids([analysis_id])
+ project_ids = await self.analysis_table.get_project_ids_for_analysis_ids([analysis_id])
await self.ptable.check_access_to_project_ids(
self.author, project_ids, readonly=False
)
- return await self.at.add_sequencing_groups_to_analysis(
+ return await self.analysis_table.add_sequencing_groups_to_analysis(
analysis_id=analysis_id, sequencing_group_ids=sequencing_group_ids
)
@@ -575,24 +588,30 @@ async def update_analysis(
status: AnalysisStatus,
meta: dict[str, Any] = None,
output: str | None = None,
+ outputs: str | None = None,
check_project_id=True,
):
"""
Update the status of an analysis, set timestamp_completed if relevant
"""
if check_project_id:
- project_ids = await self.at.get_project_ids_for_analysis_ids([analysis_id])
+ project_ids = await self.analysis_table.get_project_ids_for_analysis_ids([analysis_id])
await self.ptable.check_access_to_project_ids(
self.author, project_ids, readonly=False
)
- await self.at.update_analysis(
+ await self.analysis_table.update_analysis(
analysis_id=analysis_id,
status=status,
meta=meta,
- output=output,
)
+ if output:
+ warnings.warn('Analysis.output will be deprecated, use Analysis.outputs instead', PendingDeprecationWarning, stacklevel=2)
+ await self.output_file_table.create_or_update_analysis_output_files_from_json(analysis_id=analysis_id, json_dict=output)
+ elif outputs:
+ await self.output_file_table.create_or_update_analysis_output_files_from_json(analysis_id=analysis_id, json_dict=outputs)
+
async def get_analysis_runner_log(
self,
project_ids: list[int] = None,
@@ -603,7 +622,7 @@ async def get_analysis_runner_log(
"""
Get log for the analysis-runner, useful for checking this history of analysis
"""
- return await self.at.get_analysis_runner_log(
+ return await self.analysis_table.get_analysis_runner_log(
project_ids,
# author=author,
output_dir=output_dir,
diff --git a/db/python/tables/analysis.py b/db/python/tables/analysis.py
index 7955cfadb..22967d1c2 100644
--- a/db/python/tables/analysis.py
+++ b/db/python/tables/analysis.py
@@ -2,7 +2,7 @@
import dataclasses
import datetime
from collections import defaultdict
-from typing import Any, Dict, List, Optional, Set, Tuple
+from typing import Any, Dict, List, Optional, Set, Tuple, Union
from db.python.tables.base import DbBase
from db.python.utils import (
@@ -15,6 +15,7 @@
from models.enums import AnalysisStatus
from models.models.analysis import AnalysisInternal
from models.models.audit_log import AuditLogInternal
+from models.models.output_file import OutputFileInternal
from models.models.project import ProjectId
@@ -60,7 +61,6 @@ async def create_analysis(
status: AnalysisStatus,
sequencing_group_ids: List[int],
meta: Optional[Dict[str, Any]] = None,
- output: str = None,
active: bool = True,
project: ProjectId = None,
) -> int:
@@ -73,7 +73,6 @@ async def create_analysis(
('type', analysis_type),
('status', status.value),
('meta', to_db_json(meta or {})),
- ('output', output),
('audit_log_id', await self.audit_log_id()),
('project', project or self.project),
('active', active if active is not None else True),
@@ -145,7 +144,6 @@ async def update_analysis(
status: AnalysisStatus,
meta: Dict[str, Any] = None,
active: bool = None,
- output: Optional[str] = None,
):
"""
Update the status of an analysis, set timestamp_completed if relevant
@@ -169,10 +167,6 @@ async def update_analysis(
fields['timestamp_completed'] = datetime.datetime.utcnow()
setters.append('timestamp_completed = :timestamp_completed')
- if output:
- fields['output'] = output
- setters.append('output = :output')
-
if meta is not None and len(meta) > 0:
fields['meta'] = to_db_json(meta)
setters.append('meta = JSON_MERGE_PATCH(COALESCE(meta, "{}"), :meta)')
@@ -209,14 +203,13 @@ async def query(self, filter_: AnalysisFilter) -> List[AnalysisInternal]:
'type': 'a.type',
'status': 'a.status',
'meta': 'a.meta',
- 'output': 'a.output',
'active': 'a.active',
},
)
_query = f"""
SELECT a.id as id, a.type as type, a.status as status,
- a.output as output, a_sg.sequencing_group_id as sequencing_group_id,
+ a_sg.sequencing_group_id as sequencing_group_id,
a.project as project, a.timestamp_completed as timestamp_completed,
a.active as active, a.meta as meta, a.author as author
FROM analysis a
@@ -226,15 +219,83 @@ async def query(self, filter_: AnalysisFilter) -> List[AnalysisInternal]:
rows = await self.connection.fetch_all(_query, values)
retvals: Dict[int, AnalysisInternal] = {}
+ analysis_ids: list = [row['id'] for row in rows]
+ analysis_outputs_by_aid = await self.get_file_outputs_by_analysis_ids(analysis_ids)
for row in rows:
key = row['id']
if key in retvals:
retvals[key].sequencing_group_ids.append(row['sequencing_group_id'])
else:
- retvals[key] = AnalysisInternal.from_db(**dict(row))
-
+ # Pydantic doesn't allow item assignment on the model
+ # TODO: Deprecate `output` eventually
+ retvals[key] = AnalysisInternal.from_db(**dict(row)).copy(update={'output': analysis_outputs_by_aid.get(key, []), 'outputs': analysis_outputs_by_aid.get(key, [])})
return list(retvals.values())
+ async def get_file_outputs_by_analysis_ids(self, analysis_ids: list[int]) -> dict[int, Union[dict[str, Any], str]]:
+ """Fetches all output files for a list of analysis IDs"""
+
+ _query = """
+ SELECT ao.analysis_id, f.*, ao.json_structure, ao.output
+ FROM analysis_outputs ao
+ LEFT JOIN output_file f ON ao.file_id = f.id
+ WHERE ao.analysis_id IN :analysis_ids
+ """
+ rows = await self.connection.fetch_all(_query, {'analysis_ids': analysis_ids})
+
+ # Preparing to accumulate analysis files
+ analysis_files: dict[int, Union[list[Tuple[OutputFileInternal, str]], str]] = defaultdict(list)
+
+ # Extracting parent IDs with a guard for None values
+ parent_ids = [row['id'] for row in rows if row['id'] is not None]
+
+ # Fetching secondary files only if there are parent IDs to look up
+ secondary_files = await self.get_secondary_files_for_file_output(parent_ids) if parent_ids else {}
+
+ for row in rows:
+ file_id = row['id']
+ if file_id:
+ # If no json_structure, just set to the output.
+ if row['json_structure'] is None:
+ analysis_files[row['analysis_id']] = row['path']
+ # Building OutputFileInternal object with secondary files if available
+ file_internal = OutputFileInternal.from_db(**dict(row))
+ file_internal_with_secondary = file_internal.copy(update={
+ 'secondary_files': secondary_files.get(file_id, [])
+ })
+ if isinstance(analysis_files[row['analysis_id']], list):
+ analysis_files[row['analysis_id']].append((file_internal_with_secondary, row['json_structure'])) # type: ignore [union-attr]
+ else:
+ # If no file_id, just set to the output.
+ analysis_files[row['analysis_id']] = row['output']
+
+ # Transforming analysis_files into the desired output format
+ analysis_output_files = {a_id: OutputFileInternal.reconstruct_json(files) for a_id, files in analysis_files.items()}
+
+ return analysis_output_files
+
+ async def get_secondary_files_for_file_output(self, parent_file_ids: list[int]) -> dict[int, list[OutputFileInternal]]:
+ """Fetches all secondary files for a list of parent files"""
+
+ _query = """
+ SELECT f.*
+ FROM output_file f
+ WHERE f.parent_id IN :parent_file_ids
+ """
+ # Ensure parent_file_ids is a list to prevent SQL injection and errors
+ if not isinstance(parent_file_ids, list):
+ raise ValueError('parent_file_ids must be a list of integers')
+
+ # Fetching rows from the database
+ rows = await self.connection.fetch_all(_query, {'parent_file_ids': parent_file_ids})
+
+ # Accumulating secondary files
+ secondary_files: Dict[int, List[OutputFileInternal]] = defaultdict(list)
+ for row in rows:
+ secondary_file = OutputFileInternal.from_db(**dict(row))
+ secondary_files[row['parent_id']].append(secondary_file)
+
+ return secondary_files
+
async def get_latest_complete_analysis_for_type(
self,
project: ProjectId,
@@ -257,7 +318,7 @@ async def get_latest_complete_analysis_for_type(
_query = f"""
SELECT a.id as id, a.type as type, a.status as status,
- a.output as output, a_sg.sample_id as sample_id,
+ a_sg.sample_id as sample_id,
a.project as project, a.timestamp_completed as timestamp_completed,
a.meta as meta
FROM analysis_sequencing_group a_sg
@@ -272,7 +333,11 @@ async def get_latest_complete_analysis_for_type(
rows = await self.connection.fetch_all(_query, values)
if len(rows) == 0:
raise NotFoundError(f"Couldn't find any analysis with type {analysis_type}")
+ analysis_ids: list = [row['id'] for row in rows[0]]
+ analysis_outputs_by_aid = await self.get_file_outputs_by_analysis_ids(analysis_ids)
a = AnalysisInternal.from_db(**dict(rows[0]))
+ a.output = analysis_outputs_by_aid.get(rows[0]['id'], [])
+ a.outputs = analysis_outputs_by_aid.get(rows[0]['id'], [])
# .from_db maps 'sample_id' -> sample_ids
for row in rows[1:]:
a.sample_ids.append(row['sample_id'])
@@ -309,7 +374,7 @@ async def get_incomplete_analyses(
"""
_query = """
SELECT a.id as id, a.type as type, a.status as status,
- a.output as output, a_sg.sequencing_group_id as sequencing_group_id,
+ a_sg.sequencing_group_id as sequencing_group_id,
a.project as project, a.meta as meta
FROM analysis_sequencing_group a_sg
INNER JOIN analysis a ON a_sg.analysis_id = a.id
@@ -319,10 +384,14 @@ async def get_incomplete_analyses(
_query, {'project': project or self.project}
)
analysis_by_id = {}
+ analysis_ids: list = [row['id'] for row in rows]
+ analysis_outputs_by_aid = await self.get_file_outputs_by_analysis_ids(analysis_ids)
for row in rows:
aid = row['id']
if aid not in analysis_by_id:
analysis_by_id[aid] = AnalysisInternal.from_db(**dict(row))
+ analysis_by_id[aid].output = analysis_outputs_by_aid.get(aid, [])
+ analysis_by_id[aid].outputs = analysis_outputs_by_aid.get(aid, [])
else:
analysis_by_id[aid].sample_ids.append(row['sequencing_group_id'])
@@ -334,7 +403,7 @@ async def get_latest_complete_analysis_for_sequencing_group_ids_by_type(
"""Get the latest complete analysis for samples (one per sample)"""
_query = """
SELECT
- a.id AS id, a.type as type, a.status as status, a.output as output,
+ a.id AS id, a.type as type, a.status as status,
a.project as project, a_sg.sequencing_group_id as sample_id,
a.timestamp_completed as timestamp_completed, a.meta as meta
FROM analysis a
@@ -357,11 +426,16 @@ async def get_latest_complete_analysis_for_sequencing_group_ids_by_type(
rows = await self.connection.fetch_all(_query, values)
seen_sequencing_group_ids = set()
analyses: List[AnalysisInternal] = []
+ analysis_ids: list = [row['id'] for row in rows]
+ analysis_outputs_by_aid = await self.get_file_outputs_by_analysis_ids(analysis_ids)
for row in rows:
if row['sequencing_group_id'] in seen_sequencing_group_ids:
continue
seen_sequencing_group_ids.add(row['sequencing_group_id'])
- analyses.append(AnalysisInternal.from_db(**dict(row)))
+ analysis = AnalysisInternal.from_db(**dict(row))
+ analysis.output = analysis_outputs_by_aid.get(row['id'], [])
+ analysis.outputs = analysis_outputs_by_aid.get(row['id'], [])
+ analyses.append(analysis)
# reverse after timestamp_completed
return analyses[::-1]
@@ -373,7 +447,7 @@ async def get_analysis_by_id(
_query = """
SELECT
a.id as id, a.type as type, a.status as status,
- a.output as output, a.project as project,
+ a.project as project,
a_sg.sequencing_group_id as sequencing_group_id,
a.timestamp_completed as timestamp_completed, a.meta as meta
FROM analysis a
@@ -385,8 +459,11 @@ async def get_analysis_by_id(
raise NotFoundError(f"Couldn't find analysis with id = {analysis_id}")
project = rows[0]['project']
-
+ analysis_ids: list = [rows[0]['id']]
+ analysis_outputs_by_aid = await self.get_file_outputs_by_analysis_ids(analysis_ids)
a = AnalysisInternal.from_db(**dict(rows[0]))
+ a.output = analysis_outputs_by_aid.get(rows[0], [])
+ a.outputs = analysis_outputs_by_aid.get(rows[0], [])
for row in rows[1:]:
a.sample_ids.append(row['sequencing_group_id'])
@@ -417,7 +494,7 @@ async def get_analyses_for_samples(
_query = f"""
SELECT
a.id as id, a.type as type, a.status as status,
- a.output as output, a.project as project,
+ a.project as project,
a_sg.sequencing_group_id as sequencing_group_id,
a.timestamp_completed as timestamp_completed, a.meta as meta
FROM analysis a
@@ -428,13 +505,17 @@ async def get_analyses_for_samples(
rows = await self.connection.fetch_all(_query, values)
analyses = {}
projects: set[ProjectId] = set()
- for a in rows:
- a_id = a['id']
+ analysis_ids: list = [row['id'] for row in rows]
+ analysis_outputs_by_aid = await self.get_file_outputs_by_analysis_ids(analysis_ids)
+ for row in rows:
+ a_id = row['id']
if a_id not in analyses:
- analyses[a_id] = AnalysisInternal.from_db(**dict(a))
- projects.add(a['project'])
+ analyses[a_id] = AnalysisInternal.from_db(**dict(row))
+ analyses[a_id].output = analysis_outputs_by_aid.get(a_id, [])
+ analyses[a_id].outputs = analysis_outputs_by_aid.get(a_id, [])
+ projects.add(row['project'])
- analyses[a_id].sample_ids.append(a['sample_id'])
+ analyses[a_id].sample_ids.append(row['sample_id'])
return projects, list(analyses.values())
@@ -468,7 +549,7 @@ async def get_sample_cram_path_map_for_seqr(
values['pids'] = list(participant_ids)
_query = f"""
-SELECT p.external_id as participant_id, a.output as output, sg.id as sequencing_group_id
+SELECT p.external_id as participant_id, sg.id as sequencing_group_id
FROM analysis a
INNER JOIN analysis_sequencing_group a_sg ON a_sg.analysis_id = a.id
INNER JOIN sequencing_group sg ON a_sg.sequencing_group_id = sg.id
@@ -480,8 +561,10 @@ async def get_sample_cram_path_map_for_seqr(
"""
rows = await self.connection.fetch_all(_query, values)
+ analysis_ids: list = [row['id'] for row in rows[0]]
+ analysis_outputs_by_aid = await self.get_file_outputs_by_analysis_ids(analysis_ids)
# many per analysis
- return [dict(d) for d in rows]
+ return [dict(d).update({'output': analysis_outputs_by_aid.get(d['id'], []), 'outputs': analysis_outputs_by_aid.get(d['id'], [])}) for d in rows]
async def get_analysis_runner_log(
self,
@@ -498,12 +581,15 @@ async def get_analysis_runner_log(
"type = 'analysis-runner'",
'active',
]
+ joins = []
if project_ids:
wheres.append('project in :project_ids')
values['project_ids'] = project_ids
if output_dir:
- wheres.append('(output = :output OR output LIKE :output_like)')
+ joins.append('LEFT JOIN analysis_outputs ao ON analysis.id = ao.analysis_id',)
+ joins.append('LEFT JOIN output_file f ON ao.file_id = f.id')
+ wheres.append('(f.path = :output OR ao.output LIKE :output_like)')
values['output'] = output_dir
values['output_like'] = f'%{output_dir}'
@@ -512,9 +598,18 @@ async def get_analysis_runner_log(
values['ar_guid'] = ar_guid
wheres_str = ' AND '.join(wheres)
- _query = f'SELECT * FROM analysis WHERE {wheres_str}'
+ joins_str = ' '.join(joins)
+ _query = f'SELECT * FROM analysis {joins_str} WHERE {wheres_str}'
rows = await self.connection.fetch_all(_query, values)
- return [AnalysisInternal.from_db(**dict(r)) for r in rows]
+ analysis_ids: list = [row['id'] for row in rows]
+ analysis_outputs_by_aid = await self.get_file_outputs_by_analysis_ids(analysis_ids)
+ analyses: List[AnalysisInternal] = []
+ for row in rows:
+ analysis = AnalysisInternal.from_db(**dict(row))
+ analysis.output = analysis_outputs_by_aid.get(row['id'], [])
+ analysis.outputs = analysis_outputs_by_aid.get(row['id'], [])
+ analyses.append(analysis)
+ return analyses
# region STATS
diff --git a/db/python/tables/output_file.py b/db/python/tables/output_file.py
new file mode 100644
index 000000000..ba0975315
--- /dev/null
+++ b/db/python/tables/output_file.py
@@ -0,0 +1,170 @@
+import os
+from textwrap import dedent
+from typing import Optional, Union
+
+from cloudpathlib import AnyPath, GSClient
+from google.auth.credentials import AnonymousCredentials
+from google.cloud.storage import Client
+
+from db.python.tables.base import DbBase
+from models.models.output_file import OutputFileInternal
+
+
+class OutputFileTable(DbBase):
+ """
+ Capture Analysis table operations and queries
+ """
+
+ table_name = 'output_file'
+
+ async def create_or_update_output_file(
+ self,
+ path: str,
+ parent_id: Optional[int] = None,
+ client: Optional[Client] = None
+ ) -> int | None:
+ """
+ Create a new file, and add it to database
+ """
+ file_obj = AnyPath(path, client=GSClient(storage_client=client))
+
+ file_info = await OutputFileInternal.get_file_info(file_obj=file_obj, client=client)
+
+ if not file_info or not file_info.get('valid', False):
+ return None
+
+ kv_pairs = [
+ ('path', path),
+ ('basename', file_info['basename']),
+ ('dirname', file_info['dirname']),
+ ('nameroot', file_info['nameroot']),
+ ('nameext', file_info['nameext']),
+ ('file_checksum', file_info['checksum']),
+ ('size', file_info['size']),
+ ('valid', file_info['valid']),
+ ('parent_id', parent_id)
+ ]
+
+ kv_pairs = [(k, v) for k, v in kv_pairs if v is not None]
+ keys = [k for k, _ in kv_pairs]
+ cs_keys = ', '.join(keys)
+ cs_id_keys = ', '.join(f':{k}' for k in keys)
+ non_pk_keys = [k for k in keys if k != 'path']
+ update_clause = ', '.join([f'{k} = VALUES({k})' for k in non_pk_keys]) # ON DUPLICATE KEY UPDATE {update_clause}
+
+ async with self.connection.transaction():
+ _query = dedent(f"""INSERT INTO output_file ({cs_keys}) VALUES ({cs_id_keys}) ON DUPLICATE KEY UPDATE {update_clause} RETURNING id""")
+ id_of_new_file = await self.connection.fetch_val(
+ _query,
+ dict(kv_pairs),
+ )
+
+ return id_of_new_file
+
+ async def add_output_file_to_analysis(self, analysis_id: int, file_id: int, json_structure: Optional[str] = None, output: Optional[str] = None):
+ """Add file to an analysis (through the join table)"""
+ _query = dedent("""
+ INSERT INTO analysis_outputs
+ (analysis_id, file_id, json_structure, output)
+ VALUES (:analysis_id, :file_id, :json_structure, :output)
+ """)
+ await self.connection.execute(
+ _query,
+ {'analysis_id': analysis_id, 'file_id': file_id, 'json_structure': json_structure, 'output': output}
+ )
+
+ async def create_or_update_analysis_output_files_from_json(
+ self,
+ analysis_id: int,
+ json_dict: Union[dict, str],
+ ) -> None:
+ """
+ Create analysis files from JSON
+ """
+ files = await self.find_files_from_dict(json_dict=json_dict)
+ file_ids : list[int] = []
+
+ if os.environ.get('SM_ENVIRONMENT').lower() in ('development', 'local'):
+ client = Client(
+ credentials=AnonymousCredentials(),
+ project='test',
+ # Alternatively instead of using the global env STORAGE_EMULATOR_HOST. You can define it here.
+ # This will set this client object to point to the local google cloud storage.
+ client_options={'api_endpoint': 'http://localhost:4443'},
+ )
+ else:
+ client = Client()
+
+ async with self.connection.transaction():
+ if 'main_files' in files:
+ for primary_file in files['main_files']:
+ parent_file_id = await self.create_or_update_output_file(path=primary_file['basename'], client=client)
+ await self.add_output_file_to_analysis(
+ analysis_id,
+ parent_file_id,
+ json_structure=primary_file['json_path'],
+ output=None if parent_file_id else primary_file['basename']
+ )
+ if 'secondary_files_grouped' in files:
+ secondary_files = files['secondary_files_grouped']
+ if primary_file['basename'] in secondary_files:
+ for secondary_file in secondary_files[primary_file['basename']]:
+ await self.create_or_update_output_file(path=secondary_file, parent_id=parent_file_id, client=client)
+ file_ids.append(parent_file_id)
+
+ client.close()
+ # check that only the files in this json_dict should be in the analysis. Remove what isn't in this dict.
+ _update_query = dedent("""
+ DELETE ao FROM analysis_outputs ao
+ WHERE (ao.analysis_id = :analysis_id)
+ AND (ao.file_id NOT IN :file_ids)
+ """)
+
+ await self.connection.execute(
+ _update_query,
+ {
+ 'analysis_id': analysis_id,
+ 'file_ids': file_ids
+ }
+ )
+
+ async def find_files_from_dict(self, json_dict, json_path=None, collected=None) -> dict:
+ """Retrieve filepaths from a dict of outputs"""
+ if collected is None:
+ collected = {'main_files': [], 'secondary_files_grouped': {}}
+
+ if json_path is None:
+ json_path = [] # Initialize path for tracking key path
+
+ if isinstance(json_dict, str):
+ # If the data is a plain string, return it as the basename with None as its keypath
+ collected['main_files'].append({'json_path': None, 'basename': json_dict})
+ return collected
+
+ if isinstance(json_dict, dict):
+ # Check if current dict contains 'basename'
+ if 'basename' in json_dict:
+ # Add current item to main_files
+ collected['main_files'].append({'json_path': '.'.join(json_path), 'basename': json_dict['basename']})
+ current_basename = json_dict['basename'] # Keep track of current basename for secondary files
+
+ # Handle secondary files if present
+ if 'secondaryFiles' in json_dict:
+ secondary = json_dict['secondaryFiles']
+ if current_basename not in collected['secondary_files_grouped']:
+ collected['secondary_files_grouped'][current_basename] = []
+ for _, value in secondary.items():
+ # Append each secondary file to the list in secondary_files under its parent basename
+ collected['secondary_files_grouped'][current_basename].append(value['basename'])
+
+ else:
+ for key, value in json_dict.items():
+ # Recur for each sub-dictionary, updating the path
+ await self.find_files_from_dict(value, json_path + [key], collected)
+
+ elif isinstance(json_dict, list):
+ # Recur for each item in the list, without updating the path (as lists don't contribute to JSON path)
+ for item in json_dict:
+ await self.find_files_from_dict(item, json_path, collected)
+
+ return collected
diff --git a/db/python/tables/project.py b/db/python/tables/project.py
index d87c28960..2f401af6e 100644
--- a/db/python/tables/project.py
+++ b/db/python/tables/project.py
@@ -442,6 +442,9 @@ async def delete_project_data(
INNER JOIN sample ON sample.id = sg.sample_id
WHERE sample.project = :project
);
+DELETE FROM analysis_outputs WHERE analysis_id in (
+ SELECT id FROM analysis WHERE project = :project
+);
DELETE FROM analysis_sample WHERE sample_id in (
SELECT s.id FROM sample s
WHERE s.project = :project
diff --git a/metamist/audit/README.md b/metamist/audit/README.md
index b099376c8..bbbe8c626 100644
--- a/metamist/audit/README.md
+++ b/metamist/audit/README.md
@@ -80,7 +80,9 @@ print(json.dumps(participant_data))
"analyses": [
{
"id": 456,
- "output": "gs://cpg-test-dataset-main/cram/CPG123456.cram",
+ "outputs": {
+ "basename": "gs://cpg-test-dataset-main/cram/CPG123456.cram"
+ },
"timestampCompleted": "2023-09-01T05:04:24"
}
]
diff --git a/metamist/audit/generic_auditor.py b/metamist/audit/generic_auditor.py
index c69eacf40..82e873740 100644
--- a/metamist/audit/generic_auditor.py
+++ b/metamist/audit/generic_auditor.py
@@ -53,7 +53,7 @@
analyses(status: {eq: COMPLETED}, type: {in_: $analysisTypes}, project: {eq: $dataset}) {
id
meta
- output
+ outputs
type
timestampCompleted
}
@@ -277,14 +277,15 @@ async def get_analysis_cram_paths_for_dataset_sgs(
# For each sg id, collect the analysis id and cram paths
sg_cram_paths: dict[str, dict[int, str]] = defaultdict(dict)
for sg_id, analysis in analyses.items():
- cram_path = analysis['output']
+ logging.info(analysis['outputs'])
+ cram_path = analysis['outputs']['basename']
if not cram_path.startswith('gs://') or not cram_path.endswith('.cram'):
logging.warning(
- f'Analysis {analysis["id"]} invalid output path: {analysis["output"]}'
+ f'Analysis {analysis["id"]} invalid output path: {analysis["outputs"]["basename"]}'
)
continue
- sg_cram_paths[sg_id][analysis['id']] = analysis['output']
+ sg_cram_paths[sg_id][analysis['id']] = analysis['outputs']
return sg_cram_paths
@@ -313,7 +314,7 @@ async def analyses_for_sgs_without_crams(self, sgs_without_crams: list[str]):
analysis_entry = {
'analysis_id': analysis['id'],
'analysis_type': analysis['type'],
- 'analysis_output': analysis['output'],
+ 'analysis_output': analysis['outputs'],
'timestamp_completed': analysis['timestampCompleted'],
}
all_sg_analyses[sg_id].append(analysis_entry)
diff --git a/models/models/__init__.py b/models/models/__init__.py
index ce5868cb1..c5b71c2f0 100644
--- a/models/models/__init__.py
+++ b/models/models/__init__.py
@@ -27,6 +27,7 @@
FamilySimpleInternal,
PedRowInternal,
)
+from models.models.output_file import OutputFile, OutputFileInternal
from models.models.participant import (
NestedParticipant,
NestedParticipantInternal,
diff --git a/models/models/analysis.py b/models/models/analysis.py
index fb6e3152d..8ef56de2d 100644
--- a/models/models/analysis.py
+++ b/models/models/analysis.py
@@ -1,9 +1,9 @@
import enum
import json
from datetime import date, datetime
-from typing import Any
+from typing import Any, Optional, Union
-from pydantic import BaseModel
+from pydantic import BaseModel, ConfigDict
from models.base import SMBase
from models.enums import AnalysisStatus
@@ -15,17 +15,19 @@
class AnalysisInternal(SMBase):
"""Model for Analysis"""
+ model_config = ConfigDict(extra='allow')
- id: int | None = None
+ id: Optional[int] = None
type: str
status: AnalysisStatus
- output: str = None
+ output: Optional[Union[str, dict]] = None
+ outputs: Optional[Union[str, dict]] = {}
sequencing_group_ids: list[int] = []
- timestamp_completed: datetime | None = None
- project: int | None = None
- active: bool | None = None
+ timestamp_completed: Optional[datetime] = None
+ project: Optional[int] = None
+ active: Optional[bool] = None
meta: dict[str, Any] = {}
- author: str | None = None
+ author: Optional[str] = None
@staticmethod
def from_db(**kwargs):
@@ -52,7 +54,8 @@ def from_db(**kwargs):
type=analysis_type,
status=AnalysisStatus(status),
sequencing_group_ids=sequencing_group_ids or [],
- output=kwargs.pop('output', []),
+ output=kwargs.pop('output', None), # TODO deprecate
+ outputs=kwargs.pop('outputs', []),
timestamp_completed=timestamp_completed,
project=kwargs.get('project'),
meta=meta,
@@ -72,6 +75,7 @@ def to_external(self):
self.sequencing_group_ids
),
output=self.output,
+ outputs=self.outputs,
timestamp_completed=self.timestamp_completed.isoformat()
if self.timestamp_completed
else None,
@@ -87,13 +91,14 @@ class Analysis(BaseModel):
type: str
status: AnalysisStatus
- id: int | None = None
- output: str | None = None
+ id: Optional[int] = None
+ output: Optional[Union[str, dict]] = None
+ outputs: Optional[Union[str, dict]] = {}
sequencing_group_ids: list[str] = []
- author: str | None = None
- timestamp_completed: str | None = None
- project: int | None = None
- active: bool | None = None
+ author: Optional[str] = None
+ timestamp_completed: Optional[str] = None
+ project: Optional[int] = None
+ active: Optional[bool] = None
meta: dict[str, Any] = {}
def to_internal(self):
@@ -108,6 +113,7 @@ def to_internal(self):
self.sequencing_group_ids
),
output=self.output,
+ outputs=self.outputs,
# don't allow this to be set
timestamp_completed=None,
project=self.project,
diff --git a/models/models/output_file.py b/models/models/output_file.py
new file mode 100644
index 000000000..fd36fe98c
--- /dev/null
+++ b/models/models/output_file.py
@@ -0,0 +1,174 @@
+from typing import Any, Optional, Union
+
+from cloudpathlib import AnyPath, GSPath
+from google.cloud.storage import Client
+from pydantic import BaseModel
+
+from models.base import SMBase, parse_sql_bool
+
+
+class OutputFileInternal(SMBase):
+ """File model for internal use"""
+
+ id: int
+ path: str
+ basename: str
+ dirname: str
+ nameroot: str
+ nameext: Optional[str]
+ file_checksum: Optional[str]
+ size: int
+ meta: Optional[str] = None
+ valid: bool = False
+ secondary_files: list[dict[str, Any]] | None = None
+
+ @staticmethod
+ def from_db(**kwargs):
+ """
+ Convert from db keys, mainly converting id to id_
+ """
+
+ return OutputFileInternal(
+ id=kwargs.pop('id'),
+ path=kwargs.get('path'),
+ basename=kwargs.get('basename'),
+ dirname=kwargs.get('dirname'),
+ nameroot=kwargs.get('nameroot'),
+ nameext=kwargs.get('nameext'),
+ file_checksum=kwargs.get('file_checksum'),
+ size=kwargs.get('size'),
+ meta=kwargs.get('meta'),
+ valid=parse_sql_bool(kwargs.get('valid')),
+ )
+
+ def to_external(self):
+ """
+ Convert to external model
+ """
+ return OutputFile(
+ id=self.id,
+ path=self.path,
+ basename=self.basename,
+ dirname=self.dirname,
+ nameroot=self.nameroot,
+ nameext=self.nameext,
+ file_checksum=self.file_checksum,
+ size=self.size,
+ meta=self.meta,
+ valid=self.valid,
+ )
+
+ @staticmethod
+ async def get_file_info(file_obj: AnyPath | GSPath, client: Client) -> dict | None:
+ """Get file info for file at given path"""
+ try:
+ file_checksum = None
+ valid = False
+ size = 0
+ # try:
+ if isinstance(file_obj, GSPath) and client:
+ bucket = client.get_bucket(file_obj.bucket) # pylint: disable=E1101
+ blob = bucket.get_blob(file_obj.blob) # pylint: disable=E1101
+ if file_obj.suffix != '.mt':
+ file_checksum = blob.crc32c # pylint: disable=E1101
+ valid = True
+ size = blob.size # pylint: disable=E1101
+
+ return {
+ 'basename': file_obj.name, # pylint: disable=E1101
+ 'dirname': str(file_obj.parent), # pylint: disable=E1101
+ 'nameroot': file_obj.stem, # pylint: disable=E1101
+ 'nameext': file_obj.suffix, # pylint: disable=E1101
+ 'checksum': file_checksum,
+ 'size': size, # pylint: disable=E1101
+ 'valid': valid,
+ }
+ except (FileNotFoundError, ValueError):
+ return None
+
+ @staticmethod
+ def reconstruct_json(data: list | str) -> Union[dict[str, Any], str]:
+ """_summary_
+
+ Args:
+ data (list): A tuple of (OutputFileInternal, json_structure) if file_id is not None, else just the output string based on the
+ analysis_outputs table
+
+ Returns:
+ dict: Should return the JSON structure based on the input data.
+ """
+ root: dict = {}
+ if isinstance(data, str):
+ return data
+ for file in data:
+ file_root: dict = {}
+
+ # Check if the file is a tuple or a string
+ # If it's a tuple, it's a file object and a json structure
+ if isinstance(file, tuple):
+ file_obj, json_structure = file
+ file_obj = file_obj.dict()
+ fields = OutputFileInternal.__fields__.keys() # type:ignore[attr-defined]
+ for field in fields:
+ file_root[field] = file_obj.get(field)
+
+ if json_structure:
+ if isinstance(json_structure, str):
+ json_structure = json_structure.split('.')
+ # Split the path into components and parse the JSON content
+ path = json_structure
+ content = file_root
+
+ # Navigate down the tree to the correct position, creating dictionaries as needed
+ current = root
+ for key in path[:-1]:
+ current = current.setdefault(key, {})
+
+ if path[-1] in current:
+ current[path[-1]].update(content)
+ else:
+ current[path[-1]] = content
+ else:
+ root.update(file_root)
+
+ # If it's a string, it's just the output
+ else:
+ file_root['output'] = file
+ if 'output' in root:
+ root['output'].append(file)
+ else:
+ root['output'] = [file]
+ return root
+
+
+class OutputFile(BaseModel):
+ """File model for external use"""
+
+ id: int
+ path: str
+ basename: str
+ dirname: str
+ nameroot: str
+ nameext: Optional[str]
+ file_checksum: Optional[str]
+ size: int
+ meta: Optional[str] = None
+ valid: bool = False
+ secondary_files: list[dict[str, Any]] | None = None
+
+ def to_internal(self):
+ """
+ Convert to internal model
+ """
+ return OutputFileInternal(
+ id=self.id,
+ path=self.path,
+ basename=self.basename,
+ dirname=self.dirname,
+ nameroot=self.nameroot,
+ nameext=self.nameext,
+ file_checksum=self.file_checksum,
+ size=self.size,
+ meta=self.meta,
+ valid=self.valid,
+ )
diff --git a/requirements.txt b/requirements.txt
index 172fa8fe0..19517939f 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -5,7 +5,8 @@ botocore==1.31.56
cpg-utils
aiohttp
async_lru
-cloudpathlib
+# later cloudpathlib versions require a more recent google-cloud-storage
+cloudpathlib==0.17.0
requests
google-auth>=2.19.0
google-cloud-secret-manager==2.8.0
diff --git a/scripts/20240124_migrate_output_to_file.py b/scripts/20240124_migrate_output_to_file.py
new file mode 100644
index 000000000..47ea14e08
--- /dev/null
+++ b/scripts/20240124_migrate_output_to_file.py
@@ -0,0 +1,168 @@
+import asyncio
+import json
+import re
+from textwrap import dedent
+from typing import Dict
+
+import click
+from cloudpathlib import AnyPath
+from databases import Database
+from google.cloud.storage import Client
+
+from db.python.connect import CredentialedDatabaseConfiguration # pylint: disable=C0415
+from models.models.output_file import OutputFileInternal
+
+
+def _get_connection_string():
+ """Get connection string"""
+
+ config = CredentialedDatabaseConfiguration.dev_config()
+
+ # config = CredentialedDatabaseConfiguration(dbname='sm_dev', username='root')
+
+ return config.get_connection_string()
+
+
+async def get_analyses_without_fileid(connection):
+ """Get analyses without fileid"""
+ query = dedent(
+ """
+ SELECT a.id, a.output
+ FROM analysis a
+ LEFT JOIN analysis_outputs ao ON ao.analysis_id = a.id
+ LEFT JOIN output_file f ON f.id = ao.file_id
+ WHERE f.id IS NULL AND ao.output IS NULL
+ """
+ )
+ print('Fetching...')
+ rows = await connection.fetch_all(query=query)
+ print(f'Found {len(rows)} analyses without file_id and output fields set.')
+
+ return rows
+
+
+async def execute(connection, query, inserts):
+ """Executes inserts"""
+ await connection.execute(query, inserts)
+
+
+async def get_file_info(path: str, client: Client) -> Dict:
+ """Get file dict"""
+ print('Extracting file dict')
+ file_obj = AnyPath(path)
+
+ file_info = await OutputFileInternal.get_file_info(file_obj=file_obj, client=client)
+
+ if not file_info:
+ return None
+
+ return {
+ 'path': path,
+ 'basename': file_info['basename'],
+ 'dirname': file_info['dirname'],
+ 'nameroot': file_info['nameroot'],
+ 'nameext': file_info['nameext'],
+ 'file_checksum': file_info['checksum'],
+ 'valid': file_info['valid'],
+ 'size': file_info['size'],
+ }
+
+
+def extract_file_paths(input_str):
+ """Extract file paths from JSON-like string as well as plain strings into a dict"""
+ # Check if the input string matches the JSON-like pattern
+ json_pattern = r'^\{.*\}$'
+ if re.match(json_pattern, input_str):
+ try:
+ # Attempt to parse the modified JSON string
+ pattern = r"GSPath\('([^']+)'\)"
+
+ matches = re.findall(pattern, input_str)
+ file_paths = dict(zip(matches[::2], matches[1::2]))
+ return file_paths
+ except json.JSONDecodeError:
+ print('JSON Error')
+
+ # Treat input as a plain file path
+ return {'plain_file_path': input_str}
+
+
+async def prepare_files(analyses):
+ """Serialize files for insertion"""
+ files = []
+ client = Client()
+ print(f'Preparing files...{len(analyses)} analyses to process.')
+ for analysis in analyses:
+ path = analysis['output']
+ if path is None:
+ print('Path is None')
+ continue
+
+ path_dict = extract_file_paths(path)
+ print(path_dict)
+ if path_dict:
+ print('Found path dict')
+ for _, path in path_dict.items():
+ print(path)
+ files.append((
+ analysis['id'],
+ await get_file_info(path=path, client=client)
+ ))
+ print('Extracted and added.')
+ return files
+
+
+async def insert_files(connection, files):
+ """Insert files"""
+ query = dedent(
+ """INSERT INTO output_file (path, basename, dirname, nameroot, nameext, file_checksum, size, valid)
+ VALUES (:path, :basename, :dirname, :nameroot, :nameext, :file_checksum, :size, :valid)
+ RETURNING id"""
+ )
+ af_query = dedent(
+ """
+ INSERT INTO analysis_outputs (analysis_id, file_id, output, json_structure) VALUES (:analysis_id, :file_id, :output, :json_structure)
+ """
+ )
+ for analysis_id, file in files:
+ print('Inserting...')
+ file_id = await connection.fetch_val(
+ query,
+ file,
+ )
+ if not file_id:
+ join_inserts = {'analysis_id': analysis_id, 'file_id': None, 'output': file.get('path'), 'json_structure': None}
+ else:
+ join_inserts = {'analysis_id': analysis_id, 'file_id': file_id, 'output': None, 'json_structure': None}
+ await execute(
+ connection=connection,
+ query=af_query,
+ inserts=join_inserts,
+ )
+ print(f'Inserted {len(files)} files')
+
+
+@click.command()
+# @click.option('--dry-run/--no-dry-run', default=True)
+@click.option('--connection-string', default=None)
+# @click.argument('author', default='sequencing-group-migration')
+def main_sync(connection_string: str = None):
+ """Run synchronisation"""
+ asyncio.get_event_loop().run_until_complete(
+ main(connection_string=connection_string)
+ )
+
+
+async def main(connection_string: str = None):
+ """Run synchronisation"""
+ connection = Database(connection_string or _get_connection_string(), echo=True)
+ await connection.connect()
+ async with connection.transaction():
+ analyses = await get_analyses_without_fileid(connection=connection)
+ files = await prepare_files(analyses)
+ await insert_files(connection=connection, files=files)
+ await connection.disconnect()
+
+
+if __name__ == '__main__':
+ main_sync() # pylint: disable=no-value-for-parameter
diff --git a/test/data/fakegcs/file1.cram b/test/data/fakegcs/file1.cram
new file mode 100644
index 000000000..99304afb9
--- /dev/null
+++ b/test/data/fakegcs/file1.cram
@@ -0,0 +1 @@
+SAMPLE CRAM CONTENT
diff --git a/test/data/fakegcs/file1.qc b/test/data/fakegcs/file1.qc
new file mode 100644
index 000000000..303bc3e08
--- /dev/null
+++ b/test/data/fakegcs/file1.qc
@@ -0,0 +1 @@
+SAMPLE QC FILE
diff --git a/test/data/fakegcs/file1.qc.ext b/test/data/fakegcs/file1.qc.ext
new file mode 100644
index 000000000..6f1cd4372
--- /dev/null
+++ b/test/data/fakegcs/file1.qc.ext
@@ -0,0 +1 @@
+SAMPLE QC EXT
diff --git a/test/data/fakegcs/file1.qc.meta b/test/data/fakegcs/file1.qc.meta
new file mode 100644
index 000000000..c549c3e09
--- /dev/null
+++ b/test/data/fakegcs/file1.qc.meta
@@ -0,0 +1 @@
+SAMPLE QC META
diff --git a/test/data/fakegcs/file1.txt b/test/data/fakegcs/file1.txt
new file mode 100644
index 000000000..088eccbc7
--- /dev/null
+++ b/test/data/fakegcs/file1.txt
@@ -0,0 +1 @@
+SMAPLE RESULTS TXT
diff --git a/test/data/fakegcs/file2.cram b/test/data/fakegcs/file2.cram
new file mode 100644
index 000000000..99304afb9
--- /dev/null
+++ b/test/data/fakegcs/file2.cram
@@ -0,0 +1 @@
+SAMPLE CRAM CONTENT
diff --git a/test/data/fakegcs/file2.cram.ext b/test/data/fakegcs/file2.cram.ext
new file mode 100644
index 000000000..7fe99d84c
--- /dev/null
+++ b/test/data/fakegcs/file2.cram.ext
@@ -0,0 +1 @@
+SAMPLE CRAM EXT FILE
diff --git a/test/data/fakegcs/file2.cram.meta b/test/data/fakegcs/file2.cram.meta
new file mode 100644
index 000000000..1664e7db8
--- /dev/null
+++ b/test/data/fakegcs/file2.cram.meta
@@ -0,0 +1 @@
+SAMPLE CRAM META
diff --git a/test/data/fakegcs/file3.cram b/test/data/fakegcs/file3.cram
new file mode 100644
index 000000000..99304afb9
--- /dev/null
+++ b/test/data/fakegcs/file3.cram
@@ -0,0 +1 @@
+SAMPLE CRAM CONTENT
diff --git a/test/data/fakegcs/file3.cram.ext b/test/data/fakegcs/file3.cram.ext
new file mode 100644
index 000000000..1d2089a5d
--- /dev/null
+++ b/test/data/fakegcs/file3.cram.ext
@@ -0,0 +1 @@
+SAMPLE CRAM EXT
diff --git a/test/data/fakegcs/file3.cram.meta b/test/data/fakegcs/file3.cram.meta
new file mode 100644
index 000000000..1664e7db8
--- /dev/null
+++ b/test/data/fakegcs/file3.cram.meta
@@ -0,0 +1 @@
+SAMPLE CRAM META
diff --git a/test/data/generate_data.py b/test/data/generate_data.py
index b04f270bc..38b80cace 100755
--- a/test/data/generate_data.py
+++ b/test/data/generate_data.py
@@ -167,11 +167,68 @@ def generate_random_number_within_distribution():
]
analyses_to_insert = [
+ Analysis(
+ sequencing_group_ids=random.sample(
+ sequencing_group_ids, len(sequencing_group_ids) // 2
+ ),
+ type='es-index',
+ status=AnalysisStatus('completed'),
+ outputs={
+ 'qc_results': {
+ 'basename': 'gs://sm-dev-test/file1.txt'
+ },
+ 'cram': {
+ 'basename': 'gs://sm-dev-test/file1.cram'
+ },
+ 'qc': {
+ 'cram': {
+ 'basename': 'gs://sm-dev-test/file2.cram',
+ 'secondaryFiles':
+ {
+ 'cram_ext': {
+ 'basename': 'gs://sm-dev-test/file2.cram.ext'
+ },
+ 'cram_meta': {
+ 'basename': 'gs://sm-dev-test/file2.cram.meta'
+ }
+ }
+ },
+ 'aggregate': {
+ 'cram': {
+ 'basename': 'gs://sm-dev-test/file3.cram',
+ 'secondaryFiles': {
+ 'cram_ext': {
+ 'basename': 'gs://sm-dev-test/file3.cram.ext'
+ },
+ 'cram_meta': {
+ 'basename': 'gs://sm-dev-test/file3.cram.meta'
+ }
+ }
+ },
+ 'qc': {
+ 'basename': 'gs://sm-dev-test/file1.qc',
+ 'secondaryFiles': {
+ 'qc_ext': {
+ 'basename': 'gs://sm-dev-test/file1.qc.ext'
+ },
+ 'qc_meta': {
+ 'basename': 'gs://sm-dev-test/file1.qc.meta'
+ }
+ }
+ },
+ }
+ }
+ },
+ meta={},
+ )
+ ]
+
+ analyses_to_insert.extend([
Analysis(
sequencing_group_ids=[s],
type='cram',
status=AnalysisStatus('completed'),
- output=f'FAKE://greek-myth/crams/{s}.cram',
+ outputs={'basename': f'FAKE://greek-myth/crams/{s}.cram'},
timestamp_completed=(
datetime.datetime.now() - datetime.timedelta(days=random.randint(1, 15))
).isoformat(),
@@ -182,7 +239,7 @@ def generate_random_number_within_distribution():
},
)
for s in sequencing_group_ids
- ]
+ ])
analyses_to_insert.extend(
[
@@ -190,7 +247,7 @@ def generate_random_number_within_distribution():
sample_ids=[],
type='analysis-runner',
status=AnalysisStatus('completed'),
- output=f'FAKE://greek-myth-test/joint-calling/{s}.joint',
+ outputs={'basename': f'FAKE://greek-myth-test/joint-calling/{s}.joint'},
active=True,
meta={
'accessLevel': 'full',
@@ -217,7 +274,7 @@ def generate_random_number_within_distribution():
),
type='es-index',
status=AnalysisStatus('completed'),
- output=f'FAKE::greek-myth-genome-{datetime.date.today()}',
+ outputs={'basename': f'FAKE::greek-myth-genome-{datetime.date.today()}'},
meta={},
)
)
@@ -237,7 +294,7 @@ def generate_random_number_within_distribution():
sequencing_group_ids=[],
type='analysis-runner',
status=AnalysisStatus('unknown'),
- output='gs://cpg-fake-bucket/output',
+ outputs={'basename': 'FAKE://cpg-fake-bucket/output'},
meta={
'timestamp': f'2022-08-{i+1}T10:00:00.0000+00:00',
'accessLevel': 'standard',
diff --git a/test/data/generate_seqr_project_data.py b/test/data/generate_seqr_project_data.py
index 5a3f7f5bf..34885876e 100644
--- a/test/data/generate_seqr_project_data.py
+++ b/test/data/generate_seqr_project_data.py
@@ -349,7 +349,7 @@ async def generate_cram_analyses(project: str, analyses_to_insert: list[Analysis
sequencing_group_ids=[sg['id']],
type='cram',
status=AnalysisStatus('completed'),
- output=f'FAKE://{project}/crams/{sg["id"]}.cram',
+ outputs={'basename': f'FAKE://{project}/crams/{sg["id"]}.cram'},
timestamp_completed=(
datetime.datetime.now() - datetime.timedelta(days=random.randint(1, 15))
).isoformat(),
@@ -386,14 +386,14 @@ async def generate_joint_called_analyses(project: str, aligned_sgs: list[dict],
sequencing_group_ids=joint_called_sgs,
type='custom',
status=AnalysisStatus('completed'),
- output=f'FAKE::{project}-{seq_type}-{datetime.date.today()}.mt',
+ outputs={'basename': f'FAKE::{project}-{seq_type}-{datetime.date.today()}.mt'},
meta={'stage': 'AnnotateDataset', 'sequencing_type': seq_type},
),
Analysis(
sequencing_group_ids=joint_called_sgs,
type='es-index',
status=AnalysisStatus('completed'),
- output=f'FAKE::{project}-{seq_type}-es-{datetime.date.today()}',
+ outputs={'basename': f'FAKE::{project}-{seq_type}-es-{datetime.date.today()}'},
meta={'stage': 'MtToEs', 'sequencing_type': seq_type},
)
]
diff --git a/test/test_analysis.py b/test/test_analysis.py
index be7a15e90..a1b841f50 100644
--- a/test/test_analysis.py
+++ b/test/test_analysis.py
@@ -1,6 +1,11 @@
# pylint: disable=invalid-overridden-method
+import logging
+import os
from test.testbase import DbIsolatedTest, run_as_sync
+import requests
+from testcontainers.core.container import DockerContainer
+
from db.python.layers.analysis import AnalysisLayer
from db.python.layers.assay import AssayLayer
from db.python.layers.sample import SampleLayer
@@ -19,6 +24,17 @@
class TestAnalysis(DbIsolatedTest):
"""Test sample class"""
+ gcs: DockerContainer
+
+ @classmethod
+ def setUpClass(cls) -> None:
+ super().setUpClass()
+ # Convert the relative path to an absolute path
+ absolute_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data')
+ gcs = DockerContainer('fsouza/fake-gcs-server').with_bind_ports(4443, 4443).with_volume_mapping(absolute_path, '/data',).with_command('-scheme http')
+ gcs.start()
+ cls.gcs = gcs
+
@run_as_sync
async def setUp(self) -> None:
# don't need to await because it's tagged @run_as_sync
@@ -76,6 +92,12 @@ async def setUp(self) -> None:
self.genome_sequencing_group_id = sample.sequencing_groups[0].id
self.exome_sequencing_group_id = sample.sequencing_groups[self.project_id].id
+ @classmethod
+ def tearDownClass(cls) -> None:
+ if cls.gcs:
+ cls.gcs.stop()
+ super().tearDownClass()
+
@run_as_sync
async def test_get_analysis_by_id(self):
"""
@@ -148,6 +170,581 @@ async def test_get_analysis(self):
status=AnalysisStatus.UNKNOWN,
sequencing_group_ids=[],
output=None,
+ outputs=None,
+ timestamp_completed=None,
+ project=1,
+ meta={},
+ active=True,
+ author=None,
+ ).copy(update={'output': [], 'outputs': []})
+ ]
+
+ self.assertEqual(analyses, expected)
+
+ @run_as_sync
+ async def test_analysis_output_str(self):
+ """
+ Test adding an analysis of type ANALYSIS_RUNNER
+ with output file as a string via `output` field
+ """
+
+ a_id = await self.al.create_analysis(
+ AnalysisInternal(
+ type='analysis-runner',
+ status=AnalysisStatus.UNKNOWN,
+ sequencing_group_ids=[],
+ meta={},
+ output='RANDOM_OUTPUT_STRING'
+ )
+ )
+
+ analyses = await self.al.query(
+ AnalysisFilter(
+ id=GenericFilter(eq=a_id),
+ )
+ )
+ expected = [
+ AnalysisInternal(
+ id=a_id,
+ type='analysis-runner',
+ status=AnalysisStatus.UNKNOWN,
+ sequencing_group_ids=[],
+ output='RANDOM_OUTPUT_STRING',
+ outputs='RANDOM_OUTPUT_STRING',
+ timestamp_completed=None,
+ project=1,
+ meta={},
+ active=True,
+ author=None,
+ )
+ ]
+
+ self.assertEqual(analyses, expected)
+
+ @run_as_sync
+ async def test_analysis_outputs_str(self):
+ """
+ Test adding an analysis of type ANALYSIS_RUNNER
+ with output file as a string via `outputs` field
+ """
+
+ a_id = await self.al.create_analysis(
+ AnalysisInternal(
+ type='analysis-runner',
+ status=AnalysisStatus.UNKNOWN,
+ sequencing_group_ids=[],
+ meta={},
+ outputs='RANDOM_OUTPUT_STRING'
+ )
+ )
+
+ analyses = await self.al.query(
+ AnalysisFilter(
+ id=GenericFilter(eq=a_id),
+ )
+ )
+ expected = [
+ AnalysisInternal(
+ id=a_id,
+ type='analysis-runner',
+ status=AnalysisStatus.UNKNOWN,
+ sequencing_group_ids=[],
+ output='RANDOM_OUTPUT_STRING',
+ outputs='RANDOM_OUTPUT_STRING',
+ timestamp_completed=None,
+ project=1,
+ meta={},
+ active=True,
+ author=None,
+ )
+ ]
+
+ self.assertEqual(analyses, expected)
+
+ @run_as_sync
+ async def test_analysis_output_gspath_str(self):
+ """
+ Test adding an analysis of type ANALYSIS_RUNNER
+ with valid gs path output file as a string via `output` field
+ """
+
+ a_id = await self.al.create_analysis(
+ AnalysisInternal(
+ type='analysis-runner',
+ status=AnalysisStatus.UNKNOWN,
+ sequencing_group_ids=[],
+ meta={},
+ output='gs://fakegcs/file1.txt'
+ )
+ )
+
+ analyses = await self.al.query(
+ AnalysisFilter(
+ id=GenericFilter(eq=a_id),
+ )
+ )
+ expected = [
+ AnalysisInternal(
+ id=a_id,
+ type='analysis-runner',
+ status=AnalysisStatus.UNKNOWN,
+ sequencing_group_ids=[],
+ output='gs://fakegcs/file1.txt',
+ outputs='gs://fakegcs/file1.txt',
+ timestamp_completed=None,
+ project=1,
+ meta={},
+ active=True,
+ author=None,
+ )
+ ]
+
+ self.assertEqual(analyses, expected)
+
+ @run_as_sync
+ async def test_analysis_output_gspath_dict(self):
+ """
+ Test adding an analysis of type ANALYSIS_RUNNER
+ with valid gs path output file as a string via `output` field
+ """
+
+ a_id = await self.al.create_analysis(
+ AnalysisInternal(
+ type='analysis-runner',
+ status=AnalysisStatus.UNKNOWN,
+ sequencing_group_ids=[],
+ meta={},
+ outputs={'basename': 'gs://fakegcs/file1.txt'}
+ )
+ )
+
+ analyses = await self.al.query(
+ AnalysisFilter(
+ id=GenericFilter(eq=a_id),
+ )
+ )
+ expected = [
+ AnalysisInternal(
+ id=a_id,
+ type='analysis-runner',
+ status=AnalysisStatus.UNKNOWN,
+ sequencing_group_ids=[],
+ output={
+ 'id': 1,
+ 'path': 'gs://fakegcs/file1.txt',
+ 'basename': 'file1.txt',
+ 'dirname': 'gs://fakegcs',
+ 'nameroot': 'file1',
+ 'nameext': '.txt',
+ 'file_checksum': 'DG+fhg==',
+ 'size': 19,
+ 'meta': None,
+ 'valid': True,
+ 'secondary_files': []
+ },
+ outputs={
+ 'id': 1,
+ 'path': 'gs://fakegcs/file1.txt',
+ 'basename': 'file1.txt',
+ 'dirname': 'gs://fakegcs',
+ 'nameroot': 'file1',
+ 'nameext': '.txt',
+ 'file_checksum': 'DG+fhg==',
+ 'size': 19,
+ 'meta': None,
+ 'valid': True,
+ 'secondary_files': []
+ },
+ timestamp_completed=None,
+ project=1,
+ meta={},
+ active=True,
+ author=None,
+ )
+ ]
+
+ self.assertEqual(analyses, expected)
+
+ @run_as_sync
+ async def test_analysis_output_files_json(self):
+ """
+ Test adding an analysis of type ANALYSIS_RUNNER
+ with output files as a dict/json via `outputs` field
+ """
+
+ logger = logging.getLogger()
+ logger.info(requests.get('http://0.0.0.0:4443/storage/v1/b/fakegcs/o', timeout=10).text)
+
+ a_id = await self.al.create_analysis(
+ AnalysisInternal(
+ type='analysis-runner',
+ status=AnalysisStatus.UNKNOWN,
+ sequencing_group_ids=[],
+ meta={},
+ outputs={
+ 'qc_results': {
+ 'basename': 'gs://fakegcs/file1.txt'
+ },
+ 'cram': {
+ 'basename': 'gs://fakegcs/file1.cram'
+ },
+ 'qc': {
+ 'cram': {
+ 'basename': 'gs://fakegcs/file2.cram',
+ 'secondaryFiles':
+ {
+ 'cram_ext': {
+ 'basename': 'gs://fakegcs/file2.cram.ext'
+ },
+ 'cram_meta': {
+ 'basename': 'gs://fakegcs/file2.cram.meta'
+ }
+ }
+ },
+ 'aggregate': {
+ 'cram': {
+ 'basename': 'gs://fakegcs/file3.cram',
+ 'secondaryFiles': {
+ 'cram_ext': {
+ 'basename': 'gs://fakegcs/file3.cram.ext'
+ },
+ 'cram_meta': {
+ 'basename': 'gs://fakegcs/file3.cram.meta'
+ }
+ }
+ },
+ 'qc': {
+ 'basename': 'gs://fakegcs/file1.qc',
+ 'secondaryFiles': {
+ 'qc_ext': {
+ 'basename': 'gs://fakegcs/file1.qc.ext'
+ },
+ 'qc_meta': {
+ 'basename': 'gs://fakegcs/file1.qc.meta'
+ }
+ }
+ },
+ }
+ }
+ }
+ )
+ )
+
+ analyses = await self.al.query(
+ AnalysisFilter(
+ id=GenericFilter(eq=a_id),
+ )
+ )
+ expected = [
+ AnalysisInternal(
+ id=a_id,
+ type='analysis-runner',
+ status=AnalysisStatus.UNKNOWN,
+ sequencing_group_ids=[],
+ output={
+ 'qc_results': {
+ 'id': 1,
+ 'path': 'gs://fakegcs/file1.txt',
+ 'basename': 'file1.txt',
+ 'dirname': 'gs://fakegcs',
+ 'nameroot': 'file1',
+ 'nameext': '.txt',
+ 'file_checksum': 'DG+fhg==',
+ 'size': 19,
+ 'meta': None,
+ 'valid': True,
+ 'secondary_files': []
+ },
+ 'cram': {
+ 'id': 2,
+ 'path': 'gs://fakegcs/file1.cram',
+ 'basename': 'file1.cram',
+ 'dirname': 'gs://fakegcs',
+ 'nameroot': 'file1',
+ 'nameext': '.cram',
+ 'file_checksum': 'sl7SXw==',
+ 'size': 20,
+ 'meta': None,
+ 'valid': True,
+ 'secondary_files': []
+ },
+ 'qc': {
+ 'cram': {
+ 'id': 3,
+ 'path': 'gs://fakegcs/file2.cram',
+ 'basename': 'file2.cram',
+ 'dirname': 'gs://fakegcs',
+ 'nameroot': 'file2',
+ 'nameext': '.cram',
+ 'file_checksum': 'sl7SXw==',
+ 'size': 20,
+ 'meta': None,
+ 'valid': True,
+ 'secondary_files': [
+ {
+ 'id': 4,
+ 'path': 'gs://fakegcs/file2.cram.ext',
+ 'basename': 'file2.cram.ext',
+ 'dirname': 'gs://fakegcs',
+ 'nameroot': 'file2.cram',
+ 'nameext': '.ext',
+ 'file_checksum': 'gb1EbA==',
+ 'size': 21,
+ 'meta': None,
+ 'valid': True,
+ 'secondary_files': None
+ },
+ {
+ 'id': 5,
+ 'path': 'gs://fakegcs/file2.cram.meta',
+ 'basename': 'file2.cram.meta',
+ 'dirname': 'gs://fakegcs',
+ 'nameroot': 'file2.cram',
+ 'nameext': '.meta',
+ 'file_checksum': 'af/YSw==',
+ 'size': 17,
+ 'meta': None,
+ 'valid': True,
+ 'secondary_files': None
+ }
+ ]
+ },
+ 'aggregate': {
+ 'cram': {
+ 'id': 6,
+ 'path': 'gs://fakegcs/file3.cram',
+ 'basename': 'file3.cram',
+ 'dirname': 'gs://fakegcs',
+ 'nameroot': 'file3',
+ 'nameext': '.cram',
+ 'file_checksum': 'sl7SXw==',
+ 'size': 20,
+ 'meta': None,
+ 'valid': True,
+ 'secondary_files': [
+ {
+ 'id': 7,
+ 'path': 'gs://fakegcs/file3.cram.ext',
+ 'basename': 'file3.cram.ext',
+ 'dirname': 'gs://fakegcs',
+ 'nameroot': 'file3.cram',
+ 'nameext': '.ext',
+ 'file_checksum': 'HU8n6w==',
+ 'size': 16,
+ 'meta': None,
+ 'valid': True,
+ 'secondary_files': None
+ },
+ {
+ 'id': 8,
+ 'path': 'gs://fakegcs/file3.cram.meta',
+ 'basename': 'file3.cram.meta',
+ 'dirname': 'gs://fakegcs',
+ 'nameroot': 'file3.cram',
+ 'nameext': '.meta',
+ 'file_checksum': 'af/YSw==',
+ 'size': 17,
+ 'meta': None,
+ 'valid': True,
+ 'secondary_files': None
+ }
+ ]
+ },
+ 'qc': {
+ 'id': 9,
+ 'path': 'gs://fakegcs/file1.qc',
+ 'basename': 'file1.qc',
+ 'dirname': 'gs://fakegcs',
+ 'nameroot': 'file1',
+ 'nameext': '.qc',
+ 'file_checksum': 'uZe/hQ==',
+ 'size': 15,
+ 'meta': None,
+ 'valid': True,
+ 'secondary_files': [
+ {
+ 'id': 10,
+ 'path': 'gs://fakegcs/file1.qc.ext',
+ 'basename': 'file1.qc.ext',
+ 'dirname': 'gs://fakegcs',
+ 'nameroot': 'file1.qc',
+ 'nameext': '.ext',
+ 'file_checksum': '/18MDg==',
+ 'size': 14,
+ 'meta': None,
+ 'valid': True,
+ 'secondary_files': None
+ },
+ {
+ 'id': 11,
+ 'path': 'gs://fakegcs/file1.qc.meta',
+ 'basename': 'file1.qc.meta',
+ 'dirname': 'gs://fakegcs',
+ 'nameroot': 'file1.qc',
+ 'nameext': '.meta',
+ 'file_checksum': 'v9x0Zg==',
+ 'size': 15,
+ 'meta': None,
+ 'valid': True,
+ 'secondary_files': None
+ }
+ ]
+ }
+ }
+ }
+ },
+ outputs={
+ 'qc_results': {
+ 'id': 1,
+ 'path': 'gs://fakegcs/file1.txt',
+ 'basename': 'file1.txt',
+ 'dirname': 'gs://fakegcs',
+ 'nameroot': 'file1',
+ 'nameext': '.txt',
+ 'file_checksum': 'DG+fhg==',
+ 'size': 19,
+ 'meta': None,
+ 'valid': True,
+ 'secondary_files': []
+ },
+ 'cram': {
+ 'id': 2,
+ 'path': 'gs://fakegcs/file1.cram',
+ 'basename': 'file1.cram',
+ 'dirname': 'gs://fakegcs',
+ 'nameroot': 'file1',
+ 'nameext': '.cram',
+ 'file_checksum': 'sl7SXw==',
+ 'size': 20,
+ 'meta': None,
+ 'valid': True,
+ 'secondary_files': []
+ },
+ 'qc': {
+ 'cram': {
+ 'id': 3,
+ 'path': 'gs://fakegcs/file2.cram',
+ 'basename': 'file2.cram',
+ 'dirname': 'gs://fakegcs',
+ 'nameroot': 'file2',
+ 'nameext': '.cram',
+ 'file_checksum': 'sl7SXw==',
+ 'size': 20,
+ 'meta': None,
+ 'valid': True,
+ 'secondary_files': [
+ {
+ 'id': 4,
+ 'path': 'gs://fakegcs/file2.cram.ext',
+ 'basename': 'file2.cram.ext',
+ 'dirname': 'gs://fakegcs',
+ 'nameroot': 'file2.cram',
+ 'nameext': '.ext',
+ 'file_checksum': 'gb1EbA==',
+ 'size': 21,
+ 'meta': None,
+ 'valid': True,
+ 'secondary_files': None
+ },
+ {
+ 'id': 5,
+ 'path': 'gs://fakegcs/file2.cram.meta',
+ 'basename': 'file2.cram.meta',
+ 'dirname': 'gs://fakegcs',
+ 'nameroot': 'file2.cram',
+ 'nameext': '.meta',
+ 'file_checksum': 'af/YSw==',
+ 'size': 17,
+ 'meta': None,
+ 'valid': True,
+ 'secondary_files': None
+ }
+ ]
+ },
+ 'aggregate': {
+ 'cram': {
+ 'id': 6,
+ 'path': 'gs://fakegcs/file3.cram',
+ 'basename': 'file3.cram',
+ 'dirname': 'gs://fakegcs',
+ 'nameroot': 'file3',
+ 'nameext': '.cram',
+ 'file_checksum': 'sl7SXw==',
+ 'size': 20,
+ 'meta': None,
+ 'valid': True,
+ 'secondary_files': [
+ {
+ 'id': 7,
+ 'path': 'gs://fakegcs/file3.cram.ext',
+ 'basename': 'file3.cram.ext',
+ 'dirname': 'gs://fakegcs',
+ 'nameroot': 'file3.cram',
+ 'nameext': '.ext',
+ 'file_checksum': 'HU8n6w==',
+ 'size': 16,
+ 'meta': None,
+ 'valid': True,
+ 'secondary_files': None
+ },
+ {
+ 'id': 8,
+ 'path': 'gs://fakegcs/file3.cram.meta',
+ 'basename': 'file3.cram.meta',
+ 'dirname': 'gs://fakegcs',
+ 'nameroot': 'file3.cram',
+ 'nameext': '.meta',
+ 'file_checksum': 'af/YSw==',
+ 'size': 17,
+ 'meta': None,
+ 'valid': True,
+ 'secondary_files': None
+ }
+ ]
+ },
+ 'qc': {
+ 'id': 9,
+ 'path': 'gs://fakegcs/file1.qc',
+ 'basename': 'file1.qc',
+ 'dirname': 'gs://fakegcs',
+ 'nameroot': 'file1',
+ 'nameext': '.qc',
+ 'file_checksum': 'uZe/hQ==',
+ 'size': 15,
+ 'meta': None,
+ 'valid': True,
+ 'secondary_files': [
+ {
+ 'id': 10,
+ 'path': 'gs://fakegcs/file1.qc.ext',
+ 'basename': 'file1.qc.ext',
+ 'dirname': 'gs://fakegcs',
+ 'nameroot': 'file1.qc',
+ 'nameext': '.ext',
+ 'file_checksum': '/18MDg==',
+ 'size': 14,
+ 'meta': None,
+ 'valid': True,
+ 'secondary_files': None
+ },
+ {
+ 'id': 11,
+ 'path': 'gs://fakegcs/file1.qc.meta',
+ 'basename': 'file1.qc.meta',
+ 'dirname': 'gs://fakegcs',
+ 'nameroot': 'file1.qc',
+ 'nameext': '.meta',
+ 'file_checksum': 'v9x0Zg==',
+ 'size': 15,
+ 'meta': None,
+ 'valid': True,
+ 'secondary_files': None
+ }
+ ]
+ }
+ }
+ }
+ },
timestamp_completed=None,
project=1,
meta={},
diff --git a/test/test_generic_auditor.py b/test/test_generic_auditor.py
index 491c97d05..574be2e57 100644
--- a/test/test_generic_auditor.py
+++ b/test/test_generic_auditor.py
@@ -384,7 +384,9 @@ async def test_query_genome_analyses_crams(self, mock_query):
'CPGaaa',
],
},
- 'output': 'gs://cpg-dataset-main/cram/CPGaaa.cram',
+ 'outputs': {
+ 'basename': 'gs://cpg-dataset-main/cram/CPGaaa.cram'
+ },
},
],
},
@@ -395,7 +397,7 @@ async def test_query_genome_analyses_crams(self, mock_query):
test_result = await auditor.get_analysis_cram_paths_for_dataset_sgs(
assay_sg_id_map={1: 'CPGaaa'}
)
- expected_result = {'CPGaaa': {1: 'gs://cpg-dataset-main/cram/CPGaaa.cram'}}
+ expected_result = {'CPGaaa': {1: {'basename': 'gs://cpg-dataset-main/cram/CPGaaa.cram'}}}
self.assertDictEqual(test_result, expected_result)
@@ -421,7 +423,9 @@ async def test_query_genome_and_exome_analyses_crams(self, mock_query):
'CPGaaa',
],
},
- 'output': 'gs://cpg-dataset-main/cram/CPGaaa.cram',
+ 'outputs': {
+ 'basename': 'gs://cpg-dataset-main/cram/CPGaaa.cram'
+ },
},
],
},
@@ -437,7 +441,9 @@ async def test_query_genome_and_exome_analyses_crams(self, mock_query):
'CPGbbb',
],
},
- 'output': 'gs://cpg-dataset-main/exome/cram/CPGaaa.cram',
+ 'outputs': {
+ 'basename': 'gs://cpg-dataset-main/exome/cram/CPGaaa.cram'}
+ ,
},
],
},
@@ -450,8 +456,8 @@ async def test_query_genome_and_exome_analyses_crams(self, mock_query):
)
expected_result = {
- 'CPGaaa': {1: 'gs://cpg-dataset-main/cram/CPGaaa.cram'},
- 'CPGbbb': {2: 'gs://cpg-dataset-main/exome/cram/CPGaaa.cram'},
+ 'CPGaaa': {1: {'basename': 'gs://cpg-dataset-main/cram/CPGaaa.cram'}},
+ 'CPGbbb': {2: {'basename': 'gs://cpg-dataset-main/exome/cram/CPGaaa.cram'}},
}
self.assertDictEqual(test_result, expected_result)
@@ -479,7 +485,7 @@ async def test_query_broken_analyses_crams(self, mock_query):
'sample_ids': [
'CPGaaa',
],
- 'output': '',
+ 'outputs': {},
},
],
},
@@ -508,7 +514,9 @@ async def test_query_analyses_crams_warning(self, mock_query):
'meta': {
'sequencing_type': 'genome',
},
- 'output': 'gs://cpg-dataset-main/cram/CPGaaa.notcram',
+ 'outputs': {
+ 'basename': 'gs://cpg-dataset-main/cram/CPGaaa.notcram'
+ },
},
],
}
@@ -563,7 +571,9 @@ async def test_analyses_for_sgs_without_crams(self, mock_query):
{
'id': 1,
'meta': {'sequencing_type': 'genome', 'sample': 'CPGaaa'},
- 'output': 'gs://cpg-dataset-main/gvcf/CPGaaa.g.vcf.gz',
+ 'outputs': {
+ 'basename': 'gs://cpg-dataset-main/gvcf/CPGaaa.g.vcf.gz'
+ },
'type': 'gvcf',
'timestampCompleted': '2023-05-11T16:33:00',
}
@@ -584,7 +594,7 @@ async def test_analyses_for_sgs_without_crams(self, mock_query):
self.assertEqual(len(log.output), 1)
self.assertEqual(len(log.records), 1)
self.assertIn(
- "WARNING:root:dev :: SG CPGaaa missing CRAM but has analysis {'analysis_id': 1, 'analysis_type': 'gvcf', 'analysis_output': 'gs://cpg-dataset-main/gvcf/CPGaaa.g.vcf.gz', 'timestamp_completed': '2023-05-11T16:33:00'}",
+ "WARNING:root:dev :: SG CPGaaa missing CRAM but has analysis {'analysis_id': 1, 'analysis_type': 'gvcf', 'analysis_output': {'basename': 'gs://cpg-dataset-main/gvcf/CPGaaa.g.vcf.gz'}, 'timestamp_completed': '2023-05-11T16:33:00'}",
log.output[0],
)
diff --git a/test/test_graphql.py b/test/test_graphql.py
index 294e1ad56..8ecf05433 100644
--- a/test/test_graphql.py
+++ b/test/test_graphql.py
@@ -203,7 +203,7 @@ async def test_sg_analyses_query(self):
type='cram',
status=AnalysisStatus.COMPLETED,
meta={},
- output='some-output',
+ outputs={'basename': 'some-output'},
)
)
@@ -213,7 +213,7 @@ async def test_sg_analyses_query(self):
analyses(project: {eq: $project}) {
id
meta
- output
+ outputs
}
}
}"""
@@ -229,7 +229,7 @@ async def test_sg_analyses_query(self):
analyses = resp['sequencingGroups'][0]['analyses']
self.assertIn('id', analyses[0])
self.assertIn('meta', analyses[0])
- self.assertIn('output', analyses[0])
+ self.assertIn('outputs', analyses[0])
@run_as_sync
async def test_participant_phenotypes(self):
diff --git a/test/testbase.py b/test/testbase.py
index fa60ee66f..3019a2961 100644
--- a/test/testbase.py
+++ b/test/testbase.py
@@ -179,6 +179,9 @@ def tearDownClass(cls) -> None:
db.exec(f'DROP DATABASE {db.dbname};')
db.stop()
+ # gcs = cls.gcs
+ # gcs.stop()
+
def setUp(self) -> None:
self._connection = self.connections[self.__class__.__name__]
# create a connection on each test so we can generate a new
diff --git a/web/src/pages/project/AnalysisRunnerView/AnalysisRunnerSummary.tsx b/web/src/pages/project/AnalysisRunnerView/AnalysisRunnerSummary.tsx
index 7bb4f8db2..17588bbd6 100644
--- a/web/src/pages/project/AnalysisRunnerView/AnalysisRunnerSummary.tsx
+++ b/web/src/pages/project/AnalysisRunnerView/AnalysisRunnerSummary.tsx
@@ -23,7 +23,7 @@ query AnalysisRunnerLogs($project_name: String!) {
analyses(type: { eq: "analysis-runner" }, status: {eq: UNKNOWN}) {
id
meta
- output
+ outputs
auditLogs {
author
timestamp