From eb064056b39e6e9ff5a862e3393274d638f763d5 Mon Sep 17 00:00:00 2001 From: Chad Weider Date: Fri, 25 Oct 2024 11:10:11 -0700 Subject: [PATCH] refactor(harvard_merger): rework `process_entry` to be extensible `import_harvard_pdfs` has a `process_entry` method that is similar, but different from the entry processing in `process_crosswalk_file` and abandoned. Since a specialized method for processing an entry is useful, this patch reconciles the two bits of functionality. Finally, it adds a `--job` parameter to the command to make adding other jobs relating to CAP migration easier. --- .../commands/import_harvard_pdfs.py | 106 ++++++++++-------- 1 file changed, 57 insertions(+), 49 deletions(-) diff --git a/cl/search/management/commands/import_harvard_pdfs.py b/cl/search/management/commands/import_harvard_pdfs.py index b0e71f65db..4d98d47d8f 100644 --- a/cl/search/management/commands/import_harvard_pdfs.py +++ b/cl/search/management/commands/import_harvard_pdfs.py @@ -51,6 +51,13 @@ def add_arguments(self, parser): help="Directory for reading crosswalk files", required=True, ) + parser.add_argument( + "--job", + type=str, + choices=["import_pdf"], + default="import_pdf", + help="", + ) def handle(self, *args: Any, **options: Any) -> None: """Handle the command execution. @@ -65,6 +72,7 @@ def handle(self, *args: Any, **options: Any) -> None: self.dry_run = options["dry_run"] self.crosswalk_dir = options["crosswalk_dir"] + self.job = options["job"] if not os.path.exists(self.crosswalk_dir): logger.warning( @@ -156,37 +164,23 @@ def process_crosswalk_file(self, crosswalk_file: str) -> None: crosswalk_data = json.load(f) for entry in tqdm(crosswalk_data, desc="Processing entries"): - logger.debug(f"Processing entry: {entry}") try: cap_case_id = entry["cap_case_id"] cl_cluster_id = entry["cl_cluster_id"] - json_path = entry["cap_path"] + cap_path = entry["cap_path"] - # Construct the PDF path based on the JSON path - pdf_path = json_path.replace("cases", "case-pdfs").replace( - ".json", ".pdf" - ) - - if pdf_path in self.processed_pdfs: - logger.info(f"Skipping already processed PDF: {pdf_path}") + if not all([cap_case_id, cl_cluster_id, cap_path]): + logger.error( + f"Missing key in entry: {json.dumps(entry, indent=2)}" + ) continue - logger.info(f"Processing PDF: {pdf_path}") - - if not self.dry_run: - pdf_content = self.fetch_pdf_from_cap(pdf_path) - if pdf_content: - cluster = OpinionCluster.objects.get(id=cl_cluster_id) - self.store_pdf_in_cl(cluster, pdf_content) - else: - logger.info(f"Dry run: Would fetch PDF from {pdf_path}") + logger.debug( + f"Processing entry: cap_case_id={cap_case_id}, cl_cluster_id={cl_cluster_id}, cap_path={cap_path}" + ) - self.processed_pdfs.add(pdf_path) + self.process_entry(cap_case_id, cl_cluster_id, cap_path) - except KeyError as e: - logger.error( - f"Missing key in entry: {e}. Entry: {json.dumps(entry, indent=2)}" - ) except Exception as e: logger.error( f"Error processing CAP ID {entry.get('cap_case_id', 'Unknown')}: {str(e)}", @@ -206,36 +200,50 @@ def parse_cap_path(self, cap_path: str) -> Tuple[str, str, str]: case_name = parts[-1].replace(".json", "") return reporter_slug, volume_folder, case_name - def process_entry(self, entry: Dict[str, Any]) -> None: + def process_entry( + self, cap_case_id: int, cl_cluster_id: int, cap_path: str + ) -> None: """Process a single crosswalk entry. - :param entry: Dictionary containing crosswalk entry data. + :param cap_case_id: CAP case id + :param cl_cluster_id: CL cluster id + :param cap_path: Path of CAP JSON data (in the CAP_R2 S3 bucket) :return: None """ - cap_case_id = entry["cap_case_id"] - cl_cluster_id = entry["cl_cluster_id"] - cap_path = entry["cap_path"] - logger.info( - f"Processing entry: cap_case_id={cap_case_id}, cl_cluster_id={cl_cluster_id}, cap_path={cap_path}" - ) - try: - cluster = OpinionCluster.objects.get(id=cl_cluster_id) - logger.info(f"Found cluster: {cluster}") - pdf_content = self.fetch_pdf_from_cap(cap_path) - logger.info( - f"Fetched PDF content, length: {len(pdf_content) if pdf_content else 0}" - ) - if pdf_content: - logger.info( - "PDF content is not empty, calling store_pdf_in_cl" - ) - self.store_pdf_in_cl(cluster, pdf_content) - else: - logger.info("PDF content is empty, skipping storage") - except OpinionCluster.DoesNotExist: - logger.info(f"Cluster not found for id: {cl_cluster_id}") - except Exception as e: - logger.error(f"Error processing entry: {str(e)}", exc_info=True) + + match self.job: + case "import_pdf": + try: + pdf_path = cap_path.replace("cases", "case-pdfs").replace( + ".json", ".pdf" + ) + + if pdf_path in self.processed_pdfs: + logger.info( + f"Skipping already processed PDF: {pdf_path}" + ) + return + + logger.info(f"Processing PDF: {pdf_path}") + + if not self.dry_run: + pdf_content = self.fetch_pdf_from_cap(pdf_path) + if pdf_content: + cluster = OpinionCluster.objects.get( + id=cl_cluster_id + ) + self.store_pdf_in_cl(cluster, pdf_content) + else: + logger.info( + f"Dry run: Would fetch PDF from {pdf_path}" + ) + + self.processed_pdfs.add(pdf_path) + + except OpinionCluster.DoesNotExist: + logger.info(f"Cluster not found for id: {cl_cluster_id}") + case _: + raise Exception(f"Unknown job {self.job}") def fetch_pdf_from_cap(self, pdf_path: str) -> Optional[bytes]: """Fetch PDF content from CAP.