Skip to content

Commit

Permalink
Try to improve subprocess exception handling
Browse files Browse the repository at this point in the history
  • Loading branch information
ttys0dev committed Oct 21, 2024
1 parent d1fc891 commit ab0ea24
Showing 1 changed file with 125 additions and 70 deletions.
195 changes: 125 additions & 70 deletions cl/recap/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import concurrent.futures
import hashlib
import logging
import traceback
from dataclasses import dataclass
from datetime import datetime
from http import HTTPStatus
Expand Down Expand Up @@ -555,13 +556,23 @@ async def process_recap_docket(pk):
await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED)
return None

if process.current_process().daemon:
data = parse_docket_text(map_cl_to_pacer_id(pq.court_id), text)
else:
with concurrent.futures.ProcessPoolExecutor() as pool:
data = await asyncio.get_running_loop().run_in_executor(
pool, parse_docket_text, map_cl_to_pacer_id(pq.court_id), text
)
try:
if process.current_process().daemon:
data = parse_docket_text(map_cl_to_pacer_id(pq.court_id), text)
else:
with concurrent.futures.ProcessPoolExecutor() as pool:
data = await asyncio.get_running_loop().run_in_executor(
pool,
parse_docket_text,
map_cl_to_pacer_id(pq.court_id),
text,
)
except Exception as e:
logging.exception(e)
await mark_pq_status(
pq, traceback.format_exc(), PROCESSING_STATUS.FAILED
)
return None
logger.info(f"Parsing completed of item {pq}")

if data == {}:
Expand Down Expand Up @@ -732,18 +743,25 @@ async def process_recap_claims_register(pk):
await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED)
return None

if process.current_process().daemon:
data = parse_claims_register_text(
map_cl_to_pacer_id(pq.court_id), text
)
else:
with concurrent.futures.ProcessPoolExecutor() as pool:
data = await asyncio.get_running_loop().run_in_executor(
pool,
parse_claims_register_text,
map_cl_to_pacer_id(pq.court_id),
text,
try:
if process.current_process().daemon:
data = parse_claims_register_text(
map_cl_to_pacer_id(pq.court_id), text
)
else:
with concurrent.futures.ProcessPoolExecutor() as pool:
data = await asyncio.get_running_loop().run_in_executor(
pool,
parse_claims_register_text,
map_cl_to_pacer_id(pq.court_id),
text,
)
except Exception as e:
logging.exception(e)
await mark_pq_status(
pq, traceback.format_exc(), PROCESSING_STATUS.FAILED
)
return None
logger.info(f"Parsing completed for item {pq}")

if not data:
Expand Down Expand Up @@ -829,16 +847,25 @@ async def process_recap_docket_history_report(pk):
await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED)
return None

if process.current_process().daemon:
data = parse_docket_history_text(map_cl_to_pacer_id(pq.court_id), text)
else:
with concurrent.futures.ProcessPoolExecutor() as pool:
data = await asyncio.get_running_loop().run_in_executor(
pool,
parse_docket_history_text,
map_cl_to_pacer_id(pq.court_id),
text,
try:
if process.current_process().daemon:
data = parse_docket_history_text(
map_cl_to_pacer_id(pq.court_id), text
)
else:
with concurrent.futures.ProcessPoolExecutor() as pool:
data = await asyncio.get_running_loop().run_in_executor(
pool,
parse_docket_history_text,
map_cl_to_pacer_id(pq.court_id),
text,
)
except Exception as e:
logging.exception(e)
await mark_pq_status(
pq, traceback.format_exc(), PROCESSING_STATUS.FAILED
)
return None
logger.info(f"Parsing completed for item {pq}")

if data == {}:
Expand Down Expand Up @@ -936,18 +963,25 @@ async def process_case_query_page(pk):
await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED)
return None

if process.current_process().daemon:
data = parse_case_query_page_text(
map_cl_to_pacer_id(pq.court_id), text
)
else:
with concurrent.futures.ProcessPoolExecutor() as pool:
data = await asyncio.get_running_loop().run_in_executor(
pool,
parse_case_query_page_text,
map_cl_to_pacer_id(pq.court_id),
text,
try:
if process.current_process().daemon:
data = parse_case_query_page_text(
map_cl_to_pacer_id(pq.court_id), text
)
else:
with concurrent.futures.ProcessPoolExecutor() as pool:
data = await asyncio.get_running_loop().run_in_executor(
pool,
parse_case_query_page_text,
map_cl_to_pacer_id(pq.court_id),
text,
)
except Exception as e:
logging.exception(e)
await mark_pq_status(
pq, traceback.format_exc(), PROCESSING_STATUS.FAILED
)
return None
logger.info(f"Parsing completed for item {pq}")

if data == {}:
Expand Down Expand Up @@ -1070,16 +1104,23 @@ async def process_recap_appellate_docket(pk):
await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED)
return None

if process.current_process().daemon:
data = parse_appellate_text(map_cl_to_pacer_id(pq.court_id), text)
else:
with concurrent.futures.ProcessPoolExecutor() as pool:
data = await asyncio.get_running_loop().run_in_executor(
pool,
parse_appellate_text,
map_cl_to_pacer_id(pq.court_id),
text,
)
try:
if process.current_process().daemon:
data = parse_appellate_text(map_cl_to_pacer_id(pq.court_id), text)
else:
with concurrent.futures.ProcessPoolExecutor() as pool:
data = await asyncio.get_running_loop().run_in_executor(
pool,
parse_appellate_text,
map_cl_to_pacer_id(pq.court_id),
text,
)
except Exception as e:
logging.exception(e)
await mark_pq_status(
pq, traceback.format_exc(), PROCESSING_STATUS.FAILED
)
return None
logger.info(f"Parsing completed of item {pq}")

if data == {}:
Expand Down Expand Up @@ -1174,16 +1215,23 @@ async def process_recap_acms_docket(pk):
await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED)
return None

if process.current_process().daemon:
data = parse_acms_json(map_cl_to_pacer_id(pq.court_id), text)
else:
with concurrent.futures.ProcessPoolExecutor() as pool:
data = await asyncio.get_running_loop().run_in_executor(
pool,
parse_acms_json,
map_cl_to_pacer_id(pq.court_id),
text,
)
try:
if process.current_process().daemon:
data = parse_acms_json(map_cl_to_pacer_id(pq.court_id), text)
else:
with concurrent.futures.ProcessPoolExecutor() as pool:
data = await asyncio.get_running_loop().run_in_executor(
pool,
parse_acms_json,
map_cl_to_pacer_id(pq.court_id),
text,
)
except Exception as e:
logging.exception(e)
await mark_pq_status(
pq, traceback.format_exc(), PROCESSING_STATUS.FAILED
)
return None
logger.info(f"Parsing completed of item {pq}")

if data == {}:
Expand Down Expand Up @@ -1264,19 +1312,26 @@ async def process_recap_acms_appellate_attachment(
)
return pq_status, msg, []

if process.current_process().daemon:
# yyy
data = parse_acms_attachment_json(
map_cl_to_pacer_id(pq.court_id), text
)
else:
with concurrent.futures.ProcessPoolExecutor() as pool:
data = await asyncio.get_running_loop().run_in_executor(
pool,
parse_acms_attachment_json,
map_cl_to_pacer_id(pq.court_id),
text,
try:
if process.current_process().daemon:
# yyy
data = parse_acms_attachment_json(
map_cl_to_pacer_id(pq.court_id), text
)
else:
with concurrent.futures.ProcessPoolExecutor() as pool:
data = await asyncio.get_running_loop().run_in_executor(
pool,
parse_acms_attachment_json,
map_cl_to_pacer_id(pq.court_id),
text,
)
except Exception as e:
logging.exception(e)
await mark_pq_status(
pq, traceback.format_exc(), PROCESSING_STATUS.FAILED
)
return None
logger.info(f"Parsing completed of item {pq}")

if data == {}:
Expand Down

0 comments on commit ab0ea24

Please sign in to comment.