Skip to content

Commit

Permalink
db/python/tables/output_file.py reduce complexity
Browse files Browse the repository at this point in the history
  • Loading branch information
violetbrina committed Oct 7, 2024
1 parent 53019cd commit d5d2dd2
Showing 1 changed file with 75 additions and 84 deletions.
159 changes: 75 additions & 84 deletions db/python/tables/output_file.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# pylint: disable=too-many-nested-blocks
from collections import defaultdict
from dataclasses import asdict, dataclass
import logging
from textwrap import dedent

Expand Down Expand Up @@ -65,8 +67,6 @@ async def create_or_update_output_file(
"""
Create a new file, and add it to database
"""
# file_obj = AnyPath(path, client=GSClient(storage_client=client))

if not path:
raise ValueError('Invalid cloud file path')

Expand Down Expand Up @@ -149,57 +149,48 @@ async def create_or_update_analysis_output_files_from_output(
file_ids: list[int] = []
outputs: list[str] = []

def save_file_outputs(file_id, file):
if file_id:
file_ids.append(file_id)
else:
outputs.append(file)

async with self.connection.transaction():
if 'main_files' in files:
for primary_file in files['main_files']:
parent_file_id = await self.create_or_update_output_file(
path=primary_file['basename'],
for primary_file in files.get('main_files', []):
parent_file_id = await self.create_or_update_output_file(
path=primary_file['basename'],
blobs=blobs,
)
await self.add_output_file_to_analysis(
analysis_id,
parent_file_id,
json_structure=primary_file['json_path'],
# If the file couldnt be created, we just pass the basename as the output
output=(None if parent_file_id else primary_file['basename']),
)

# Add secondary files if they exist
secondary_files = files.get('secondary_files_grouped', {}).get(
primary_file['basename'], []
)
for secondary_file in secondary_files:
secondary_file_id = await self.create_or_update_output_file(
path=secondary_file['basename'],
parent_id=parent_file_id,
blobs=blobs,
)
await self.add_output_file_to_analysis(
analysis_id,
parent_file_id,
json_structure=primary_file['json_path'],
secondary_file_id,
json_structure=secondary_file['json_path'],
# If the file couldnt be created, we just pass the basename as the output
output=(None if parent_file_id else primary_file['basename']),
output=(
None if secondary_file_id else secondary_file['basename']
),
)
secondary_files = files.get('secondary_files_grouped')
if secondary_files:
if primary_file['basename'] in secondary_files:
for secondary_file in secondary_files[
primary_file['basename']
]:
secondary_file_id = (
await self.create_or_update_output_file(
path=secondary_file['basename'],
parent_id=parent_file_id,
blobs=blobs,
)
)
await self.add_output_file_to_analysis(
analysis_id,
secondary_file_id,
json_structure=secondary_file['json_path'],
# If the file couldnt be created, we just pass the basename as the output
output=(
None
if secondary_file_id
else secondary_file['basename']
),
)
if secondary_file_id:
file_ids.append(secondary_file_id)
else:
outputs.append(secondary_file['basename'])
if parent_file_id:
file_ids.append(parent_file_id)
else:
outputs.append(primary_file['basename'])

# check that only the files in this json_dict should be in the analysis. Remove what isn't in this dict.
if not file_ids and not outputs:
# If both file_ids and outputs are empty, don't execute the query
pass
save_file_outputs(secondary_file_id, secondary_file['basename'])

save_file_outputs(parent_file_id, primary_file['basename'])

_update_query = dedent(
"""
Expand All @@ -212,6 +203,7 @@ async def create_or_update_analysis_output_files_from_output(
'analysis_id': analysis_id
}

# check that only the files in this json_dict should be in the analysis. Remove wha
# Add the OR condition to include file_ids or outputs
conditions = []

Expand All @@ -228,68 +220,67 @@ async def create_or_update_analysis_output_files_from_output(
query_params['outputs'] = outputs # Add outputs to query parameters

# Join the conditions with OR since either can be valid
if conditions:
_update_query += ' AND (' + ' OR '.join(conditions) + ')'
_update_query += ' AND (' + ' OR '.join(conditions) + ')'

if not file_ids and not outputs:
# Execute the query only if either file_ids or outputs were provided
await self.connection.execute(_update_query, query_params)

@dataclass
class Files:
"""Dataclass for storing filepaths during the find_files_from_dict method"""

# Execute the query only if either file_ids or outputs were provided
await self.connection.execute(_update_query, query_params)
main_files: list[dict[str, str]]
secondary_files_grouped: dict[str, list[dict[str, str]]]

async def find_files_from_dict(
self,
json_dict: dict,
json_path: list[str] | None = None,
collected: dict | None = None,
collected: Files = None,
) -> dict:
"""Retrieve filepaths from a dict of outputs"""
if collected is None:
collected = {'main_files': [], 'secondary_files_grouped': {}}
collected = self.Files(
main_files=[], secondary_files_grouped=defaultdict(list)
)

if json_path is None:
json_path = [] # Initialize path for tracking key path

if isinstance(json_dict, str):
# If the data is a plain string, return it as the basename with None as its keypath
collected['main_files'].append({'json_path': None, 'basename': json_dict})
collected.main_files.append({'json_path': None, 'basename': json_dict})
return collected

if isinstance(json_dict, dict):
# Check if current dict contains 'basename'
if 'basename' in json_dict:
# Add current item to main_files
collected['main_files'].append(
if isinstance(json_dict, dict) and (basename := json_dict.get('basename')):
# Add current item to main_files
collected.main_files.append(
{
'json_path': '.'.join(json_path),
'basename': basename,
}
)

# Handle secondary files if present
secondary = json_dict.get('secondary_files', {})
for key, value in secondary.items():
# Append each secondary file to the list in secondary_files under its parent basename
collected.secondary_files_grouped[basename].append(
{
'json_path': '.'.join(json_path),
'basename': json_dict['basename'],
'json_path': '.'.join(json_path + ['secondary_files', key]),
'basename': value['basename'],
}
)
current_basename = json_dict[
'basename'
] # Keep track of current basename for secondary files

# Handle secondary files if present
if 'secondary_files' in json_dict:
secondary = json_dict['secondary_files']
if current_basename not in collected['secondary_files_grouped']:
collected['secondary_files_grouped'][current_basename] = []
for key, value in secondary.items():
# Append each secondary file to the list in secondary_files under its parent basename
collected['secondary_files_grouped'][current_basename].append(
{
'json_path': '.'.join(
json_path + ['secondary_files', key]
),
'basename': value['basename'],
}
)

else:
for key, value in json_dict.items():
# Recur for each sub-dictionary, updating the path
await self.find_files_from_dict(value, json_path + [key], collected)
elif isinstance(json_dict, dict):
for key, value in json_dict.items():
# Recur for each sub-dictionary, updating the path
await self.find_files_from_dict(value, json_path + [key], collected)

elif isinstance(json_dict, list):
# Recur for each item in the list, without updating the path (as lists don't contribute to JSON path)
for item in json_dict:
await self.find_files_from_dict(item, json_path, collected)

return collected
return asdict(collected)

0 comments on commit d5d2dd2

Please sign in to comment.