Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mlh0079 cloud ext #115

Merged
merged 5 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ RUN pip install --upgrade --force-reinstall -r /tmp/requirements.txt --targe
ADD mdx ${LAMBDA_TASK_ROOT}

RUN if [ "$stage" != "prod" ] ; then \
pip install -r /tmp/requirements-dev.txt && \
python -m pytest --junitxml=./test_results/test_metadata_extractor.xml test; \
fi
pip install -r /tmp/requirements-dev.txt && \
python -m pytest --junitxml=./test_results/test_metadata_extractor.xml test; \
fi

RUN rm -rf test

CMD [ "main.handler" ]
#ENTRYPOINT ["/bin/bash"]
30 changes: 30 additions & 0 deletions mdx/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@

import json
import signal
import sys
import time
from main import main

class GracefulKiller:
kill_now = False

def __init__(self):
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)

def exit_gracefully(self, signum, frame):
print('Exiting gracefully')
self.kill_now = True


if __name__ == '__main__':
print(f'MDX __main__: {sys.argv}')
if len(sys.argv) <= 1:
killer = GracefulKiller()
print('MDX Task is running...')
while not killer.kill_now:
time.sleep(1)
else:
print('MDX calling main...')
main(json.loads(sys.argv[1]), {})

75 changes: 67 additions & 8 deletions mdx/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import json
import re

from run_cumulus_task import run_cumulus_task
import granule_metadata_extractor.processing as mdx
import granule_metadata_extractor.src as src
Expand Down Expand Up @@ -502,9 +505,11 @@ def extract_metadata(self, file_path, config, output_folder):
ds_short_name = collection.get('name')
version = collection.get('version')
metadata_extractor_vars = collection.get('meta', {}).get('metadata_extractor', [])
access_url = os.path.join(config.get('distribution_endpoint'), protected_bucket,
config['fileStagingDir'],
os.path.basename(file_path))
access_url = os.path.join(
config.get('distribution_endpoint'), protected_bucket,
config['fileStagingDir'],
os.path.basename(file_path)
)
processing_switcher = {
"netcdf": self.extract_netcdf_metadata,
"csv": self.extract_csv_metadata,
Expand Down Expand Up @@ -601,8 +606,57 @@ def get_output_files(output_file_path, excluded):
if os.path.isfile(f"{output_file_path}.cmr.json"):
output_files += [f"{output_file_path}.cmr.json"]
return output_files

def process(self):
if 'EBS_MNT' in os.environ:
print('Using DAAC Split Processing')
ret = self.process_mdx_ebs()
else:
print('Using Cumulus Processing')
ret = self.process_mdx_cumulus()

return ret

def process_mdx_ebs(self):
collection = self.config.get('collection')
c_id = f'{collection.get("name")}__{collection.get("version")}'
local_store = os.getenv('EBS_MNT')
collection_store = f'{local_store}/{c_id}'
self.path = collection_store
self.config['fileStagingDir'] = collection_store
event_file = f'{collection_store}/{c_id}.json'
with open(event_file, 'r') as output:
contents = json.load(output)
print(f'Granule Count: {len(contents.get("granules"))}')
granules = contents.get('granules')

for granule in granules:
mdata_file_paths = []
for file in granule.get('files'):
filename = file.get('fileName')
if not re.search('.nc',filename):
continue
file_path = file.get('key')
data = self.extract_metadata(file_path=file_path, config=self.config, output_folder=self.path)
mdata_filename = f'{data.get("UpdatedGranuleUR", filename)}.cmr.json'
mdata_file_paths.append(f'{collection_store}/{mdata_filename}')

for mdata_file_path in mdata_file_paths:
print(f'metadata file created: {mdata_file_path}')
granule.get('files').append({
'fileName': os.path.basename(mdata_file_path),
'key': mdata_file_path,
'size': os.path.getsize(mdata_file_path)
})

shutil.move(event_file, f'{event_file}.mdx.in')
with open(event_file, 'w+') as file:
file.write(json.dumps({'granules': granules}))

logger.info('MDX processing completed.')
return {"granules": granules, "input": self.output}

def process_mdx_cumulus(self):
"""
Override the processing wrapper
:return:
Expand Down Expand Up @@ -635,8 +689,7 @@ def process(self):
assert output[key], "fetched files list should not be empty"
files_sizes = {}
for output_file_path in output.get(key):
data = self.extract_metadata(file_path=output_file_path, config=self.config,
output_folder=self.path)
data = self.extract_metadata(file_path=output_file_path, config=self.config, output_folder=self.path)
generated_files = self.get_output_files(output_file_path, excluded)
if data.get('UpdatedGranuleUR', False):
updated_output_path = self.get_output_files(os.path.join(self.path, data['UpdatedGranuleUR']), excluded)
Expand Down Expand Up @@ -710,21 +763,27 @@ def task(event, context):
invocation, function, and execution environment
:return: mdx processing output
"""
# event = json.loads(event)
logger.info(event)
mdx_instance = MDX(input=event['input'], config=event['config'])
return mdx_instance.process()


def handler(event, context):
def handler(event, context=None):
"""
Lambda handler entry point which will run the mdx_task
:param event: AWS event passed into lambda
:param context: object provides methods and properties that provide information about the
invocation, function, and execution environment
"""

return run_cumulus_task(task, event, context)


def main(event, context):
# print(f'MDX main event: {event}')
return MDX(input=event.get('input'), config=event.get('config')).process()


if __name__ == '__main__':
print('calling CLI')
MDX.cli()
Loading