Skip to content

Commit

Permalink
Merge pull request #115 from ghrcdaac/mlh0079-cloud-ext
Browse files Browse the repository at this point in the history
Mlh0079 cloud ext
  • Loading branch information
sflynn-itsc authored Oct 18, 2024
2 parents 79d1086 + 085475b commit b666e32
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 12 deletions.
7 changes: 3 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ RUN pip install --upgrade --force-reinstall -r /tmp/requirements.txt --targe
ADD mdx ${LAMBDA_TASK_ROOT}

RUN if [ "$stage" != "prod" ] ; then \
pip install -r /tmp/requirements-dev.txt && \
python -m pytest --junitxml=./test_results/test_metadata_extractor.xml test; \
fi
pip install -r /tmp/requirements-dev.txt && \
python -m pytest --junitxml=./test_results/test_metadata_extractor.xml test; \
fi

RUN rm -rf test

CMD [ "main.handler" ]
#ENTRYPOINT ["/bin/bash"]
30 changes: 30 additions & 0 deletions mdx/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@

import json
import signal
import sys
import time
from main import main

class GracefulKiller:
kill_now = False

def __init__(self):
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)

def exit_gracefully(self, signum, frame):
print('Exiting gracefully')
self.kill_now = True


if __name__ == '__main__':
print(f'MDX __main__: {sys.argv}')
if len(sys.argv) <= 1:
killer = GracefulKiller()
print('MDX Task is running...')
while not killer.kill_now:
time.sleep(1)
else:
print('MDX calling main...')
main(json.loads(sys.argv[1]), {})

75 changes: 67 additions & 8 deletions mdx/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import json
import re

from run_cumulus_task import run_cumulus_task
import granule_metadata_extractor.processing as mdx
import granule_metadata_extractor.src as src
Expand Down Expand Up @@ -502,9 +505,11 @@ def extract_metadata(self, file_path, config, output_folder):
ds_short_name = collection.get('name')
version = collection.get('version')
metadata_extractor_vars = collection.get('meta', {}).get('metadata_extractor', [])
access_url = os.path.join(config.get('distribution_endpoint'), protected_bucket,
config['fileStagingDir'],
os.path.basename(file_path))
access_url = os.path.join(
config.get('distribution_endpoint'), protected_bucket,
config['fileStagingDir'],
os.path.basename(file_path)
)
processing_switcher = {
"netcdf": self.extract_netcdf_metadata,
"csv": self.extract_csv_metadata,
Expand Down Expand Up @@ -601,8 +606,57 @@ def get_output_files(output_file_path, excluded):
if os.path.isfile(f"{output_file_path}.cmr.json"):
output_files += [f"{output_file_path}.cmr.json"]
return output_files

def process(self):
if 'EBS_MNT' in os.environ:
print('Using DAAC Split Processing')
ret = self.process_mdx_ebs()
else:
print('Using Cumulus Processing')
ret = self.process_mdx_cumulus()

return ret

def process_mdx_ebs(self):
collection = self.config.get('collection')
c_id = f'{collection.get("name")}__{collection.get("version")}'
local_store = os.getenv('EBS_MNT')
collection_store = f'{local_store}/{c_id}'
self.path = collection_store
self.config['fileStagingDir'] = collection_store
event_file = f'{collection_store}/{c_id}.json'
with open(event_file, 'r') as output:
contents = json.load(output)
print(f'Granule Count: {len(contents.get("granules"))}')
granules = contents.get('granules')

for granule in granules:
mdata_file_paths = []
for file in granule.get('files'):
filename = file.get('fileName')
if not re.search('.nc',filename):
continue
file_path = file.get('key')
data = self.extract_metadata(file_path=file_path, config=self.config, output_folder=self.path)
mdata_filename = f'{data.get("UpdatedGranuleUR", filename)}.cmr.json'
mdata_file_paths.append(f'{collection_store}/{mdata_filename}')

for mdata_file_path in mdata_file_paths:
print(f'metadata file created: {mdata_file_path}')
granule.get('files').append({
'fileName': os.path.basename(mdata_file_path),
'key': mdata_file_path,
'size': os.path.getsize(mdata_file_path)
})

shutil.move(event_file, f'{event_file}.mdx.in')
with open(event_file, 'w+') as file:
file.write(json.dumps({'granules': granules}))

logger.info('MDX processing completed.')
return {"granules": granules, "input": self.output}

def process_mdx_cumulus(self):
"""
Override the processing wrapper
:return:
Expand Down Expand Up @@ -635,8 +689,7 @@ def process(self):
assert output[key], "fetched files list should not be empty"
files_sizes = {}
for output_file_path in output.get(key):
data = self.extract_metadata(file_path=output_file_path, config=self.config,
output_folder=self.path)
data = self.extract_metadata(file_path=output_file_path, config=self.config, output_folder=self.path)
generated_files = self.get_output_files(output_file_path, excluded)
if data.get('UpdatedGranuleUR', False):
updated_output_path = self.get_output_files(os.path.join(self.path, data['UpdatedGranuleUR']), excluded)
Expand Down Expand Up @@ -710,21 +763,27 @@ def task(event, context):
invocation, function, and execution environment
:return: mdx processing output
"""
# event = json.loads(event)
logger.info(event)
mdx_instance = MDX(input=event['input'], config=event['config'])
return mdx_instance.process()


def handler(event, context):
def handler(event, context=None):
"""
Lambda handler entry point which will run the mdx_task
:param event: AWS event passed into lambda
:param context: object provides methods and properties that provide information about the
invocation, function, and execution environment
"""

return run_cumulus_task(task, event, context)


def main(event, context):
# print(f'MDX main event: {event}')
return MDX(input=event.get('input'), config=event.get('config')).process()


if __name__ == '__main__':
print('calling CLI')
MDX.cli()

0 comments on commit b666e32

Please sign in to comment.