Skip to content

Commit

Permalink
feat(scrape_pacer_free_opinions): apply task recap_document_into_opin…
Browse files Browse the repository at this point in the history
…ions

Features:
- add corpus_importer.tasks.recap_document_into_opinions into the scrape_pacer_free_opinions chain: this will make the recap documents be ingested in real time into the case law database
- tweaked the task so it can be used by both the scraper_pacer_free_opinions and recap_into_opinions command

Refactors:
- renamed task `ingest_recap_document` to `recap_document_into_opinions`
- renamed task `extract_recap_document` to `extract_recap_document_for_opinions`
- deleted duplicated function `extract_recap_document` in recap_into_opinions command file; delete unused imports and code
  • Loading branch information
grossir committed Oct 31, 2024
1 parent 65dabe9 commit 7ca17a0
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 57 deletions.
52 changes: 7 additions & 45 deletions cl/corpus_importer/management/commands/recap_into_opinions.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,12 @@
from asgiref.sync import async_to_sync
from django.conf import settings
from django.core.management import BaseCommand
from django.db.models import Q
from eyecite.tokenizers import HyperscanTokenizer
from httpx import (
HTTPStatusError,
NetworkError,
RemoteProtocolError,
Response,
TimeoutException,
)

from cl.corpus_importer.tasks import ingest_recap_document

from cl.corpus_importer.tasks import recap_document_into_opinions
from cl.lib.celery_utils import CeleryThrottle
from cl.lib.command_utils import logger
from cl.lib.decorators import retry
from cl.lib.microservice_utils import microservice
from cl.search.models import SOURCES, Court, OpinionCluster, RECAPDocument

HYPERSCAN_TOKENIZER = HyperscanTokenizer(cache_dir=".hyperscan")


@retry(
ExceptionToCheck=(
NetworkError,
TimeoutException,
RemoteProtocolError,
HTTPStatusError,
),
tries=3,
delay=5,
backoff=2,
logger=logger,
)
def extract_recap_document(rd: RECAPDocument) -> Response:
"""Call recap-extract from doctor with retries
:param rd: the recap document to extract
:return: Response object
"""
response = async_to_sync(microservice)(
service="recap-extract",
item=rd,
params={"strip_margin": True},
)
response.raise_for_status()
return response


def import_opinions_from_recap(
jurisdiction: str | None = None,
Expand Down Expand Up @@ -89,6 +49,8 @@ def import_opinions_from_recap(

# Manually select the replica db which has an addt'l index added to
# improve this query.
# Since we don't have scrapers for FD courts, the last documents
# that are not from SOURCES.RECAP should be from Harvard or other import
latest_date_filed = (
OpinionCluster.objects.using(db_connection)
.filter(docket__court=court)
Expand All @@ -97,7 +59,7 @@ def import_opinions_from_recap(
.values_list("date_filed", flat=True)
.first()
)
if latest_date_filed == None:
if latest_date_filed is None:
logger.error(
msg=f"Court {court.id} has no opinion clusters for recap import"
)
Expand All @@ -121,8 +83,8 @@ def import_opinions_from_recap(
f"{count}: Importing rd {recap_document.id} in {court.id}"
)
throttle.maybe_wait()
ingest_recap_document.apply_async(
args=[recap_document.id, add_to_solr], queue=queue
recap_document_into_opinions.apply_async(
args=[{}, recap_document.id, add_to_solr], queue=queue
)
count += 1
if total_count > 0 and count >= total_count:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
get_and_save_free_document_report,
mark_court_done_on_date,
process_free_opinion_result,
recap_document_into_opinions,
)
from cl.corpus_importer.utils import CycleChecker
from cl.lib.argparse_types import valid_date
Expand Down Expand Up @@ -339,7 +340,6 @@ def get_pdfs(
f"before starting the next cycle."
)
time.sleep(2)

logger.info(f"Processing row id: {row.id} from {row.court_id}")
c = chain(
process_free_opinion_result.si(
Expand All @@ -348,8 +348,14 @@ def get_pdfs(
cnt,
).set(queue=q),
get_and_process_free_pdf.s(row.pk, row.court_id).set(queue=q),
# `recap_document_into_opinions` uses a different doctor extraction
# endpoint, so it doesn't depend on the document's content
# being extracted on `get_and_process_free_pdf`, where it's
# only extracted if it doesn't require OCR
recap_document_into_opinions.s().set(queue=q),
delete_pacer_row.s(row.pk).set(queue=q),
)

if index:
c = c | add_items_to_solr.s("search.RECAPDocument").set(queue=q)
c.apply_async()
Expand Down
43 changes: 32 additions & 11 deletions cl/corpus_importer/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2720,7 +2720,7 @@ def query_and_save_list_of_creditors(
backoff=2,
logger=logger,
)
def extract_recap_document(rd: RECAPDocument) -> Response:
def extract_recap_document_for_opinions(rd: RECAPDocument) -> Response:
"""Call recap-extract from doctor with retries
:param rd: the recap document to extract
Expand All @@ -2736,17 +2736,26 @@ def extract_recap_document(rd: RECAPDocument) -> Response:


@app.task(bind=True, max_retries=5, ignore_result=True)
def ingest_recap_document(
def recap_document_into_opinions(
self,
recap_document_id: int,
add_to_solr: bool,
) -> None:
task_data: Optional[TaskData] = None,
recap_document_id: Optional[int] = None,
add_to_solr: bool = False,
) -> Optional[TaskData]:
"""Ingest recap document into Opinions
:param task_data: dictionary that will contain the recap_document_id,
if called inside a chain() on the scraper_pacer_free_opinions
command. This task should be chained after the PDF has
been downloaded from PACER
:param recap_document_id: The document id to inspect and import
:param add_to_solr: Whether to add to solr
:return:None
:return: The same `task_data` that came as input
"""
if not recap_document_id and task_data:
recap_document_id = task_data["rd_pk"]

logger.info(f"Importing recap document {recap_document_id}")
recap_document = (
RECAPDocument.objects.select_related("docket_entry__docket")
Expand All @@ -2763,17 +2772,27 @@ def ingest_recap_document(
.get(id=recap_document_id)
)
docket = recap_document.docket_entry.docket
if recap_document.docket_entry.docket.court.jurisdiction == "FD":

jurisdiction = recap_document.docket_entry.docket.court.jurisdiction
court_id = recap_document.docket_entry.docket.court.id
# `dcd` has a regular juriscraper scraper. Avoid duplicates
if court_id in ["dcd", "orld"] or jurisdiction not in [
Court.FEDERAL_DISTRICT,
Court.FEDERAL_BANKRUPTCY,
]:
return task_data

if jurisdiction == Court.FEDERAL_DISTRICT:
if "cv" not in docket.docket_number.lower():
logger.info("Skipping non-civil opinion in district court")
return
return task_data

ops = Opinion.objects.filter(sha1=recap_document.sha1)
if ops.count() > 0:
logger.info(f"Skipping previously imported opinion: {ops[0].id}")
return
return task_data

response = extract_recap_document(rd=recap_document)
response = extract_recap_document_for_opinions(rd=recap_document)
r = response.json()

try:
Expand All @@ -2792,7 +2811,7 @@ def ingest_recap_document(
case_law_citations = filter_out_non_case_law_citations(citations)
if len(case_law_citations) == 0:
logger.info(f"No citation found for rd: {recap_document.id}")
return
return task_data

with transaction.atomic():
cluster = OpinionCluster.objects.create(
Expand Down Expand Up @@ -2823,3 +2842,5 @@ def ingest_recap_document(
cluster.id
)
)
# Return input task data to preserve the chain in scrape_pacer_free_opinion
return task_data

0 comments on commit 7ca17a0

Please sign in to comment.