Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(scrape_pacer_free_opinions): apply task recap_document_into_opinions #4638

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 7 additions & 45 deletions cl/corpus_importer/management/commands/recap_into_opinions.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,12 @@
from asgiref.sync import async_to_sync
from django.conf import settings
from django.core.management import BaseCommand
from django.db.models import Q
from eyecite.tokenizers import HyperscanTokenizer
from httpx import (
HTTPStatusError,
NetworkError,
RemoteProtocolError,
Response,
TimeoutException,
)

from cl.corpus_importer.tasks import ingest_recap_document

from cl.corpus_importer.tasks import recap_document_into_opinions
from cl.lib.celery_utils import CeleryThrottle
from cl.lib.command_utils import logger
from cl.lib.decorators import retry
from cl.lib.microservice_utils import microservice
from cl.search.models import SOURCES, Court, OpinionCluster, RECAPDocument

HYPERSCAN_TOKENIZER = HyperscanTokenizer(cache_dir=".hyperscan")


@retry(
ExceptionToCheck=(
NetworkError,
TimeoutException,
RemoteProtocolError,
HTTPStatusError,
),
tries=3,
delay=5,
backoff=2,
logger=logger,
)
def extract_recap_document(rd: RECAPDocument) -> Response:
"""Call recap-extract from doctor with retries

:param rd: the recap document to extract
:return: Response object
"""
response = async_to_sync(microservice)(
service="recap-extract",
item=rd,
params={"strip_margin": True},
)
response.raise_for_status()
return response


def import_opinions_from_recap(
jurisdiction: str | None = None,
Expand Down Expand Up @@ -89,6 +49,8 @@ def import_opinions_from_recap(

# Manually select the replica db which has an addt'l index added to
# improve this query.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a one-time script, right? Let's make sure we remove that index when we're done. Can we make a sub-issue to remember to do that? We need to be really careful not to permanently add indexes we won't use in the future.

# Since we don't have scrapers for FD courts, the last documents
# that are not from SOURCES.RECAP should be from Harvard or other import
latest_date_filed = (
OpinionCluster.objects.using(db_connection)
.filter(docket__court=court)
Expand All @@ -97,7 +59,7 @@ def import_opinions_from_recap(
.values_list("date_filed", flat=True)
.first()
)
if latest_date_filed == None:
if latest_date_filed is None:
logger.error(
msg=f"Court {court.id} has no opinion clusters for recap import"
)
Expand All @@ -121,8 +83,8 @@ def import_opinions_from_recap(
f"{count}: Importing rd {recap_document.id} in {court.id}"
)
throttle.maybe_wait()
ingest_recap_document.apply_async(
args=[recap_document.id, add_to_solr], queue=queue
recap_document_into_opinions.apply_async(
args=[{}, recap_document.id, add_to_solr], queue=queue
)
count += 1
if total_count > 0 and count >= total_count:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
get_and_save_free_document_report,
mark_court_done_on_date,
process_free_opinion_result,
recap_document_into_opinions,
)
from cl.corpus_importer.utils import CycleChecker
from cl.lib.argparse_types import valid_date
Expand Down Expand Up @@ -339,7 +340,6 @@ def get_pdfs(
f"before starting the next cycle."
)
time.sleep(2)

logger.info(f"Processing row id: {row.id} from {row.court_id}")
c = chain(
process_free_opinion_result.si(
Expand All @@ -348,8 +348,14 @@ def get_pdfs(
cnt,
).set(queue=q),
get_and_process_free_pdf.s(row.pk, row.court_id).set(queue=q),
# `recap_document_into_opinions` uses a different doctor extraction
# endpoint, so it doesn't depend on the document's content
# being extracted on `get_and_process_free_pdf`, where it's
# only extracted if it doesn't require OCR
recap_document_into_opinions.s().set(queue=q),
delete_pacer_row.s(row.pk).set(queue=q),
)

if index:
c = c | add_items_to_solr.s("search.RECAPDocument").set(queue=q)
c.apply_async()
Expand Down
43 changes: 32 additions & 11 deletions cl/corpus_importer/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2720,7 +2720,7 @@ def query_and_save_list_of_creditors(
backoff=2,
logger=logger,
)
def extract_recap_document(rd: RECAPDocument) -> Response:
def extract_recap_document_for_opinions(rd: RECAPDocument) -> Response:
"""Call recap-extract from doctor with retries

:param rd: the recap document to extract
Expand All @@ -2736,17 +2736,26 @@ def extract_recap_document(rd: RECAPDocument) -> Response:


@app.task(bind=True, max_retries=5, ignore_result=True)
def ingest_recap_document(
def recap_document_into_opinions(
self,
recap_document_id: int,
add_to_solr: bool,
) -> None:
task_data: Optional[TaskData] = None,
recap_document_id: Optional[int] = None,
add_to_solr: bool = False,
) -> Optional[TaskData]:
"""Ingest recap document into Opinions

:param task_data: dictionary that will contain the recap_document_id,
if called inside a chain() on the scraper_pacer_free_opinions
command. This task should be chained after the PDF has
been downloaded from PACER
:param recap_document_id: The document id to inspect and import
:param add_to_solr: Whether to add to solr
:return:None

:return: The same `task_data` that came as input
"""
if not recap_document_id and task_data:
recap_document_id = task_data["rd_pk"]

logger.info(f"Importing recap document {recap_document_id}")
recap_document = (
RECAPDocument.objects.select_related("docket_entry__docket")
Expand All @@ -2763,17 +2772,27 @@ def ingest_recap_document(
.get(id=recap_document_id)
)
docket = recap_document.docket_entry.docket
if recap_document.docket_entry.docket.court.jurisdiction == "FD":

jurisdiction = recap_document.docket_entry.docket.court.jurisdiction
court_id = recap_document.docket_entry.docket.court.id
# `dcd` has a regular juriscraper scraper. Avoid duplicates
if court_id in ["dcd", "orld"] or jurisdiction not in [
Court.FEDERAL_DISTRICT,
Court.FEDERAL_BANKRUPTCY,
]:
return task_data

if jurisdiction == Court.FEDERAL_DISTRICT:
if "cv" not in docket.docket_number.lower():
logger.info("Skipping non-civil opinion in district court")
return
return task_data

ops = Opinion.objects.filter(sha1=recap_document.sha1)
if ops.count() > 0:
logger.info(f"Skipping previously imported opinion: {ops[0].id}")
return
return task_data

response = extract_recap_document(rd=recap_document)
response = extract_recap_document_for_opinions(rd=recap_document)
r = response.json()

try:
Expand All @@ -2792,7 +2811,7 @@ def ingest_recap_document(
case_law_citations = filter_out_non_case_law_citations(citations)
if len(case_law_citations) == 0:
logger.info(f"No citation found for rd: {recap_document.id}")
return
return task_data

with transaction.atomic():
cluster = OpinionCluster.objects.create(
Expand Down Expand Up @@ -2823,3 +2842,5 @@ def ingest_recap_document(
cluster.id
)
)
# Return input task data to preserve the chain in scrape_pacer_free_opinion
return task_data