From 7ca17a0414864ff0444c83fd01ea8b34aa2614aa Mon Sep 17 00:00:00 2001 From: Gianfranco Rossi Date: Wed, 30 Oct 2024 20:02:42 -0500 Subject: [PATCH] feat(scrape_pacer_free_opinions): apply task recap_document_into_opinions Features: - add corpus_importer.tasks.recap_document_into_opinions into the scrape_pacer_free_opinions chain: this will make the recap documents be ingested in real time into the case law database - tweaked the task so it can be used by both the scraper_pacer_free_opinions and recap_into_opinions command Refactors: - renamed task `ingest_recap_document` to `recap_document_into_opinions` - renamed task `extract_recap_document` to `extract_recap_document_for_opinions` - deleted duplicated function `extract_recap_document` in recap_into_opinions command file; delete unused imports and code --- .../commands/recap_into_opinions.py | 52 +++---------------- .../commands/scrape_pacer_free_opinions.py | 8 ++- cl/corpus_importer/tasks.py | 43 +++++++++++---- 3 files changed, 46 insertions(+), 57 deletions(-) diff --git a/cl/corpus_importer/management/commands/recap_into_opinions.py b/cl/corpus_importer/management/commands/recap_into_opinions.py index 3062441f75..95b6d5c5fe 100644 --- a/cl/corpus_importer/management/commands/recap_into_opinions.py +++ b/cl/corpus_importer/management/commands/recap_into_opinions.py @@ -1,52 +1,12 @@ -from asgiref.sync import async_to_sync from django.conf import settings from django.core.management import BaseCommand from django.db.models import Q -from eyecite.tokenizers import HyperscanTokenizer -from httpx import ( - HTTPStatusError, - NetworkError, - RemoteProtocolError, - Response, - TimeoutException, -) - -from cl.corpus_importer.tasks import ingest_recap_document + +from cl.corpus_importer.tasks import recap_document_into_opinions from cl.lib.celery_utils import CeleryThrottle from cl.lib.command_utils import logger -from cl.lib.decorators import retry -from cl.lib.microservice_utils import microservice from cl.search.models import SOURCES, Court, OpinionCluster, RECAPDocument -HYPERSCAN_TOKENIZER = HyperscanTokenizer(cache_dir=".hyperscan") - - -@retry( - ExceptionToCheck=( - NetworkError, - TimeoutException, - RemoteProtocolError, - HTTPStatusError, - ), - tries=3, - delay=5, - backoff=2, - logger=logger, -) -def extract_recap_document(rd: RECAPDocument) -> Response: - """Call recap-extract from doctor with retries - - :param rd: the recap document to extract - :return: Response object - """ - response = async_to_sync(microservice)( - service="recap-extract", - item=rd, - params={"strip_margin": True}, - ) - response.raise_for_status() - return response - def import_opinions_from_recap( jurisdiction: str | None = None, @@ -89,6 +49,8 @@ def import_opinions_from_recap( # Manually select the replica db which has an addt'l index added to # improve this query. + # Since we don't have scrapers for FD courts, the last documents + # that are not from SOURCES.RECAP should be from Harvard or other import latest_date_filed = ( OpinionCluster.objects.using(db_connection) .filter(docket__court=court) @@ -97,7 +59,7 @@ def import_opinions_from_recap( .values_list("date_filed", flat=True) .first() ) - if latest_date_filed == None: + if latest_date_filed is None: logger.error( msg=f"Court {court.id} has no opinion clusters for recap import" ) @@ -121,8 +83,8 @@ def import_opinions_from_recap( f"{count}: Importing rd {recap_document.id} in {court.id}" ) throttle.maybe_wait() - ingest_recap_document.apply_async( - args=[recap_document.id, add_to_solr], queue=queue + recap_document_into_opinions.apply_async( + args=[{}, recap_document.id, add_to_solr], queue=queue ) count += 1 if total_count > 0 and count >= total_count: diff --git a/cl/corpus_importer/management/commands/scrape_pacer_free_opinions.py b/cl/corpus_importer/management/commands/scrape_pacer_free_opinions.py index a32fec149e..13d65555b2 100644 --- a/cl/corpus_importer/management/commands/scrape_pacer_free_opinions.py +++ b/cl/corpus_importer/management/commands/scrape_pacer_free_opinions.py @@ -23,6 +23,7 @@ get_and_save_free_document_report, mark_court_done_on_date, process_free_opinion_result, + recap_document_into_opinions, ) from cl.corpus_importer.utils import CycleChecker from cl.lib.argparse_types import valid_date @@ -339,7 +340,6 @@ def get_pdfs( f"before starting the next cycle." ) time.sleep(2) - logger.info(f"Processing row id: {row.id} from {row.court_id}") c = chain( process_free_opinion_result.si( @@ -348,8 +348,14 @@ def get_pdfs( cnt, ).set(queue=q), get_and_process_free_pdf.s(row.pk, row.court_id).set(queue=q), + # `recap_document_into_opinions` uses a different doctor extraction + # endpoint, so it doesn't depend on the document's content + # being extracted on `get_and_process_free_pdf`, where it's + # only extracted if it doesn't require OCR + recap_document_into_opinions.s().set(queue=q), delete_pacer_row.s(row.pk).set(queue=q), ) + if index: c = c | add_items_to_solr.s("search.RECAPDocument").set(queue=q) c.apply_async() diff --git a/cl/corpus_importer/tasks.py b/cl/corpus_importer/tasks.py index 6d471b82f2..8ed46333f7 100644 --- a/cl/corpus_importer/tasks.py +++ b/cl/corpus_importer/tasks.py @@ -2720,7 +2720,7 @@ def query_and_save_list_of_creditors( backoff=2, logger=logger, ) -def extract_recap_document(rd: RECAPDocument) -> Response: +def extract_recap_document_for_opinions(rd: RECAPDocument) -> Response: """Call recap-extract from doctor with retries :param rd: the recap document to extract @@ -2736,17 +2736,26 @@ def extract_recap_document(rd: RECAPDocument) -> Response: @app.task(bind=True, max_retries=5, ignore_result=True) -def ingest_recap_document( +def recap_document_into_opinions( self, - recap_document_id: int, - add_to_solr: bool, -) -> None: + task_data: Optional[TaskData] = None, + recap_document_id: Optional[int] = None, + add_to_solr: bool = False, +) -> Optional[TaskData]: """Ingest recap document into Opinions + :param task_data: dictionary that will contain the recap_document_id, + if called inside a chain() on the scraper_pacer_free_opinions + command. This task should be chained after the PDF has + been downloaded from PACER :param recap_document_id: The document id to inspect and import :param add_to_solr: Whether to add to solr - :return:None + + :return: The same `task_data` that came as input """ + if not recap_document_id and task_data: + recap_document_id = task_data["rd_pk"] + logger.info(f"Importing recap document {recap_document_id}") recap_document = ( RECAPDocument.objects.select_related("docket_entry__docket") @@ -2763,17 +2772,27 @@ def ingest_recap_document( .get(id=recap_document_id) ) docket = recap_document.docket_entry.docket - if recap_document.docket_entry.docket.court.jurisdiction == "FD": + + jurisdiction = recap_document.docket_entry.docket.court.jurisdiction + court_id = recap_document.docket_entry.docket.court.id + # `dcd` has a regular juriscraper scraper. Avoid duplicates + if court_id in ["dcd", "orld"] or jurisdiction not in [ + Court.FEDERAL_DISTRICT, + Court.FEDERAL_BANKRUPTCY, + ]: + return task_data + + if jurisdiction == Court.FEDERAL_DISTRICT: if "cv" not in docket.docket_number.lower(): logger.info("Skipping non-civil opinion in district court") - return + return task_data ops = Opinion.objects.filter(sha1=recap_document.sha1) if ops.count() > 0: logger.info(f"Skipping previously imported opinion: {ops[0].id}") - return + return task_data - response = extract_recap_document(rd=recap_document) + response = extract_recap_document_for_opinions(rd=recap_document) r = response.json() try: @@ -2792,7 +2811,7 @@ def ingest_recap_document( case_law_citations = filter_out_non_case_law_citations(citations) if len(case_law_citations) == 0: logger.info(f"No citation found for rd: {recap_document.id}") - return + return task_data with transaction.atomic(): cluster = OpinionCluster.objects.create( @@ -2823,3 +2842,5 @@ def ingest_recap_document( cluster.id ) ) + # Return input task data to preserve the chain in scrape_pacer_free_opinion + return task_data