From ab0ea24478c4583c1fa5c9d8cf8b2d5e4d874481 Mon Sep 17 00:00:00 2001 From: ttys0dev <126845556+ttys0dev@users.noreply.github.com> Date: Mon, 21 Oct 2024 16:19:17 -0600 Subject: [PATCH] Try to improve subprocess exception handling --- cl/recap/tasks.py | 195 +++++++++++++++++++++++++++++----------------- 1 file changed, 125 insertions(+), 70 deletions(-) diff --git a/cl/recap/tasks.py b/cl/recap/tasks.py index ea093f940f..fb67c746dd 100644 --- a/cl/recap/tasks.py +++ b/cl/recap/tasks.py @@ -2,6 +2,7 @@ import concurrent.futures import hashlib import logging +import traceback from dataclasses import dataclass from datetime import datetime from http import HTTPStatus @@ -555,13 +556,23 @@ async def process_recap_docket(pk): await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) return None - if process.current_process().daemon: - data = parse_docket_text(map_cl_to_pacer_id(pq.court_id), text) - else: - with concurrent.futures.ProcessPoolExecutor() as pool: - data = await asyncio.get_running_loop().run_in_executor( - pool, parse_docket_text, map_cl_to_pacer_id(pq.court_id), text - ) + try: + if process.current_process().daemon: + data = parse_docket_text(map_cl_to_pacer_id(pq.court_id), text) + else: + with concurrent.futures.ProcessPoolExecutor() as pool: + data = await asyncio.get_running_loop().run_in_executor( + pool, + parse_docket_text, + map_cl_to_pacer_id(pq.court_id), + text, + ) + except Exception as e: + logging.exception(e) + await mark_pq_status( + pq, traceback.format_exc(), PROCESSING_STATUS.FAILED + ) + return None logger.info(f"Parsing completed of item {pq}") if data == {}: @@ -732,18 +743,25 @@ async def process_recap_claims_register(pk): await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) return None - if process.current_process().daemon: - data = parse_claims_register_text( - map_cl_to_pacer_id(pq.court_id), text - ) - else: - with concurrent.futures.ProcessPoolExecutor() as pool: - data = await asyncio.get_running_loop().run_in_executor( - pool, - parse_claims_register_text, - map_cl_to_pacer_id(pq.court_id), - text, + try: + if process.current_process().daemon: + data = parse_claims_register_text( + map_cl_to_pacer_id(pq.court_id), text ) + else: + with concurrent.futures.ProcessPoolExecutor() as pool: + data = await asyncio.get_running_loop().run_in_executor( + pool, + parse_claims_register_text, + map_cl_to_pacer_id(pq.court_id), + text, + ) + except Exception as e: + logging.exception(e) + await mark_pq_status( + pq, traceback.format_exc(), PROCESSING_STATUS.FAILED + ) + return None logger.info(f"Parsing completed for item {pq}") if not data: @@ -829,16 +847,25 @@ async def process_recap_docket_history_report(pk): await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) return None - if process.current_process().daemon: - data = parse_docket_history_text(map_cl_to_pacer_id(pq.court_id), text) - else: - with concurrent.futures.ProcessPoolExecutor() as pool: - data = await asyncio.get_running_loop().run_in_executor( - pool, - parse_docket_history_text, - map_cl_to_pacer_id(pq.court_id), - text, + try: + if process.current_process().daemon: + data = parse_docket_history_text( + map_cl_to_pacer_id(pq.court_id), text ) + else: + with concurrent.futures.ProcessPoolExecutor() as pool: + data = await asyncio.get_running_loop().run_in_executor( + pool, + parse_docket_history_text, + map_cl_to_pacer_id(pq.court_id), + text, + ) + except Exception as e: + logging.exception(e) + await mark_pq_status( + pq, traceback.format_exc(), PROCESSING_STATUS.FAILED + ) + return None logger.info(f"Parsing completed for item {pq}") if data == {}: @@ -936,18 +963,25 @@ async def process_case_query_page(pk): await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) return None - if process.current_process().daemon: - data = parse_case_query_page_text( - map_cl_to_pacer_id(pq.court_id), text - ) - else: - with concurrent.futures.ProcessPoolExecutor() as pool: - data = await asyncio.get_running_loop().run_in_executor( - pool, - parse_case_query_page_text, - map_cl_to_pacer_id(pq.court_id), - text, + try: + if process.current_process().daemon: + data = parse_case_query_page_text( + map_cl_to_pacer_id(pq.court_id), text ) + else: + with concurrent.futures.ProcessPoolExecutor() as pool: + data = await asyncio.get_running_loop().run_in_executor( + pool, + parse_case_query_page_text, + map_cl_to_pacer_id(pq.court_id), + text, + ) + except Exception as e: + logging.exception(e) + await mark_pq_status( + pq, traceback.format_exc(), PROCESSING_STATUS.FAILED + ) + return None logger.info(f"Parsing completed for item {pq}") if data == {}: @@ -1070,16 +1104,23 @@ async def process_recap_appellate_docket(pk): await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) return None - if process.current_process().daemon: - data = parse_appellate_text(map_cl_to_pacer_id(pq.court_id), text) - else: - with concurrent.futures.ProcessPoolExecutor() as pool: - data = await asyncio.get_running_loop().run_in_executor( - pool, - parse_appellate_text, - map_cl_to_pacer_id(pq.court_id), - text, - ) + try: + if process.current_process().daemon: + data = parse_appellate_text(map_cl_to_pacer_id(pq.court_id), text) + else: + with concurrent.futures.ProcessPoolExecutor() as pool: + data = await asyncio.get_running_loop().run_in_executor( + pool, + parse_appellate_text, + map_cl_to_pacer_id(pq.court_id), + text, + ) + except Exception as e: + logging.exception(e) + await mark_pq_status( + pq, traceback.format_exc(), PROCESSING_STATUS.FAILED + ) + return None logger.info(f"Parsing completed of item {pq}") if data == {}: @@ -1174,16 +1215,23 @@ async def process_recap_acms_docket(pk): await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) return None - if process.current_process().daemon: - data = parse_acms_json(map_cl_to_pacer_id(pq.court_id), text) - else: - with concurrent.futures.ProcessPoolExecutor() as pool: - data = await asyncio.get_running_loop().run_in_executor( - pool, - parse_acms_json, - map_cl_to_pacer_id(pq.court_id), - text, - ) + try: + if process.current_process().daemon: + data = parse_acms_json(map_cl_to_pacer_id(pq.court_id), text) + else: + with concurrent.futures.ProcessPoolExecutor() as pool: + data = await asyncio.get_running_loop().run_in_executor( + pool, + parse_acms_json, + map_cl_to_pacer_id(pq.court_id), + text, + ) + except Exception as e: + logging.exception(e) + await mark_pq_status( + pq, traceback.format_exc(), PROCESSING_STATUS.FAILED + ) + return None logger.info(f"Parsing completed of item {pq}") if data == {}: @@ -1264,19 +1312,26 @@ async def process_recap_acms_appellate_attachment( ) return pq_status, msg, [] - if process.current_process().daemon: - # yyy - data = parse_acms_attachment_json( - map_cl_to_pacer_id(pq.court_id), text - ) - else: - with concurrent.futures.ProcessPoolExecutor() as pool: - data = await asyncio.get_running_loop().run_in_executor( - pool, - parse_acms_attachment_json, - map_cl_to_pacer_id(pq.court_id), - text, + try: + if process.current_process().daemon: + # yyy + data = parse_acms_attachment_json( + map_cl_to_pacer_id(pq.court_id), text ) + else: + with concurrent.futures.ProcessPoolExecutor() as pool: + data = await asyncio.get_running_loop().run_in_executor( + pool, + parse_acms_attachment_json, + map_cl_to_pacer_id(pq.court_id), + text, + ) + except Exception as e: + logging.exception(e) + await mark_pq_status( + pq, traceback.format_exc(), PROCESSING_STATUS.FAILED + ) + return None logger.info(f"Parsing completed of item {pq}") if data == {}: