Skip to content

Commit

Permalink
refactor(harvard_merger): rework process_entry to be extensible
Browse files Browse the repository at this point in the history
`import_harvard_pdfs` has a `process_entry` method that is similar,
but different from the entry processing in `process_crosswalk_file`
and abandoned. Since a specialized method for processing an entry
is useful, this patch reconciles the two bits of functionality.

Finally, it adds a `--job` parameter to the command to make adding
other jobs relating to CAP migration easier.
  • Loading branch information
cweider committed Oct 25, 2024
1 parent 767d836 commit eb06405
Showing 1 changed file with 57 additions and 49 deletions.
106 changes: 57 additions & 49 deletions cl/search/management/commands/import_harvard_pdfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ def add_arguments(self, parser):
help="Directory for reading crosswalk files",
required=True,
)
parser.add_argument(
"--job",
type=str,
choices=["import_pdf"],
default="import_pdf",
help="",
)

def handle(self, *args: Any, **options: Any) -> None:
"""Handle the command execution.
Expand All @@ -65,6 +72,7 @@ def handle(self, *args: Any, **options: Any) -> None:

self.dry_run = options["dry_run"]
self.crosswalk_dir = options["crosswalk_dir"]
self.job = options["job"]

if not os.path.exists(self.crosswalk_dir):
logger.warning(
Expand Down Expand Up @@ -156,37 +164,23 @@ def process_crosswalk_file(self, crosswalk_file: str) -> None:
crosswalk_data = json.load(f)

for entry in tqdm(crosswalk_data, desc="Processing entries"):
logger.debug(f"Processing entry: {entry}")
try:
cap_case_id = entry["cap_case_id"]
cl_cluster_id = entry["cl_cluster_id"]
json_path = entry["cap_path"]
cap_path = entry["cap_path"]

# Construct the PDF path based on the JSON path
pdf_path = json_path.replace("cases", "case-pdfs").replace(
".json", ".pdf"
)

if pdf_path in self.processed_pdfs:
logger.info(f"Skipping already processed PDF: {pdf_path}")
if not all([cap_case_id, cl_cluster_id, cap_path]):
logger.error(
f"Missing key in entry: {json.dumps(entry, indent=2)}"
)
continue

logger.info(f"Processing PDF: {pdf_path}")

if not self.dry_run:
pdf_content = self.fetch_pdf_from_cap(pdf_path)
if pdf_content:
cluster = OpinionCluster.objects.get(id=cl_cluster_id)
self.store_pdf_in_cl(cluster, pdf_content)
else:
logger.info(f"Dry run: Would fetch PDF from {pdf_path}")
logger.debug(
f"Processing entry: cap_case_id={cap_case_id}, cl_cluster_id={cl_cluster_id}, cap_path={cap_path}"
)

self.processed_pdfs.add(pdf_path)
self.process_entry(cap_case_id, cl_cluster_id, cap_path)

except KeyError as e:
logger.error(
f"Missing key in entry: {e}. Entry: {json.dumps(entry, indent=2)}"
)
except Exception as e:
logger.error(
f"Error processing CAP ID {entry.get('cap_case_id', 'Unknown')}: {str(e)}",
Expand All @@ -206,36 +200,50 @@ def parse_cap_path(self, cap_path: str) -> Tuple[str, str, str]:
case_name = parts[-1].replace(".json", "")
return reporter_slug, volume_folder, case_name

def process_entry(self, entry: Dict[str, Any]) -> None:
def process_entry(
self, cap_case_id: int, cl_cluster_id: int, cap_path: str
) -> None:
"""Process a single crosswalk entry.
:param entry: Dictionary containing crosswalk entry data.
:param cap_case_id: CAP case id
:param cl_cluster_id: CL cluster id
:param cap_path: Path of CAP JSON data (in the CAP_R2 S3 bucket)
:return: None
"""
cap_case_id = entry["cap_case_id"]
cl_cluster_id = entry["cl_cluster_id"]
cap_path = entry["cap_path"]
logger.info(
f"Processing entry: cap_case_id={cap_case_id}, cl_cluster_id={cl_cluster_id}, cap_path={cap_path}"
)
try:
cluster = OpinionCluster.objects.get(id=cl_cluster_id)
logger.info(f"Found cluster: {cluster}")
pdf_content = self.fetch_pdf_from_cap(cap_path)
logger.info(
f"Fetched PDF content, length: {len(pdf_content) if pdf_content else 0}"
)
if pdf_content:
logger.info(
"PDF content is not empty, calling store_pdf_in_cl"
)
self.store_pdf_in_cl(cluster, pdf_content)
else:
logger.info("PDF content is empty, skipping storage")
except OpinionCluster.DoesNotExist:
logger.info(f"Cluster not found for id: {cl_cluster_id}")
except Exception as e:
logger.error(f"Error processing entry: {str(e)}", exc_info=True)

match self.job:
case "import_pdf":
try:
pdf_path = cap_path.replace("cases", "case-pdfs").replace(
".json", ".pdf"
)

if pdf_path in self.processed_pdfs:
logger.info(
f"Skipping already processed PDF: {pdf_path}"
)
return

logger.info(f"Processing PDF: {pdf_path}")

if not self.dry_run:
pdf_content = self.fetch_pdf_from_cap(pdf_path)
if pdf_content:
cluster = OpinionCluster.objects.get(
id=cl_cluster_id
)
self.store_pdf_in_cl(cluster, pdf_content)
else:
logger.info(
f"Dry run: Would fetch PDF from {pdf_path}"
)

self.processed_pdfs.add(pdf_path)

except OpinionCluster.DoesNotExist:
logger.info(f"Cluster not found for id: {cl_cluster_id}")
case _:
raise Exception(f"Unknown job {self.job}")

def fetch_pdf_from_cap(self, pdf_path: str) -> Optional[bytes]:
"""Fetch PDF content from CAP.
Expand Down

0 comments on commit eb06405

Please sign in to comment.