diff --git a/cellbase-lib/src/main/java/org/opencb/cellbase/lib/builders/PolygenicScoreBuilder.java b/cellbase-lib/src/main/java/org/opencb/cellbase/lib/builders/PolygenicScoreBuilder.java index 79cbc8657..9ad89c093 100644 --- a/cellbase-lib/src/main/java/org/opencb/cellbase/lib/builders/PolygenicScoreBuilder.java +++ b/cellbase-lib/src/main/java/org/opencb/cellbase/lib/builders/PolygenicScoreBuilder.java @@ -54,8 +54,8 @@ public class PolygenicScoreBuilder extends AbstractBuilder { private Set pgsIdSet; private Object[] varRDBConn; private Object[] varPgsRDBConn; - private int varBatchSize; - private int varPgsBatchSize; + private int varBatchCounter = 0; + private int varPgsBatchCounter = 0; private WriteBatch varBatch; private WriteBatch varPgsBatch; @@ -65,7 +65,7 @@ public class PolygenicScoreBuilder extends AbstractBuilder { private static ObjectReader varPgsReader; private static ObjectWriter jsonObjectWriter; - private static final int MAX_BATCH_SIZE = 1024 * 1024; // 1 MB + private static final int MAX_BATCH_SIZE = 100; private static final String RSID_COL = "rsID"; private static final String CHR_NAME_COL = "chr_name"; @@ -154,11 +154,9 @@ public void check() throws CellBaseException, IOException { // Prepare RocksDB for variant IDs this.varRDBConn = getDBConnection(integrationPath.resolve("rdb-var.idx").toString(), true); this.varBatch = new WriteBatch(); - this.varBatchSize = 0; // Prepare RocksDB for PGS/variants this.varPgsRDBConn = getDBConnection(integrationPath.resolve("rdb-var-pgs.idx").toString(), true); this.varPgsBatch = new WriteBatch(); - this.varPgsBatchSize = 0; // PGS set this.pgsIdSet = new HashSet<>(); @@ -182,6 +180,7 @@ public void parse() throws Exception { for (File file : files) { if (file.isFile()) { if (file.getName().endsWith(TXT_GZ_EXTENSION)) { + // E.g.: PGS004905_hmPOS_GRCh38.txt.gz: it contains the variants logger.info(PARSING_LOG_MESSAGE, file.getName()); String pgsId = null; @@ -216,6 +215,7 @@ public void parse() throws Exception { } logger.info(PARSING_DONE_LOG_MESSAGE, file.getName()); } else if (file.getName().endsWith("_metadata" + TAR_GZ_EXTENSION)) { + // E.g.: PGS004905_metadata.tar.gz: it contains a set of files about metadata logger.info(PARSING_LOG_MESSAGE, file.getName()); processPgsMetadataFile(file, bw); logger.info(PARSING_DONE_LOG_MESSAGE, file.getName()); @@ -244,7 +244,7 @@ public void parse() throws Exception { logger.info(BUILDING_DONE_LOG_MESSAGE, getDataName(PGS_DATA)); } - private void processPgsMetadataFile(File metadataFile, BufferedWriter bw) throws IOException, CellBaseException { + private void processPgsMetadataFile(File metadataFile, BufferedWriter bw) throws CellBaseException { String pgsId = metadataFile.getName().split("_")[0]; Path tmp = serializer.getOutdir().resolve("tmp"); @@ -253,20 +253,16 @@ private void processPgsMetadataFile(File metadataFile, BufferedWriter bw) throws } String command = "tar -xzf " + metadataFile.getAbsolutePath() + " -C " + tmp.toAbsolutePath(); - Process process = Runtime.getRuntime().exec(command); - - // Wait for the process to complete try { - int exitCode = process.waitFor(); - // We don't check the code because it is failing in the CellBase worker machine -// if (exitCode != 0) { -// throw new IOException("Error executing the command. Exit code: " + exitCode); -// } - } catch (InterruptedException e) { - throw new IOException("Error waiting for the process '" + command + "' to complete.", e); + logger.info("Executing: {}", command); + Process process = Runtime.getRuntime().exec(command); + process.waitFor(); + } catch (IOException | InterruptedException e) { + throw new CellBaseException("Exception raised when executing: " + command, e); } // Create PGS object, with the common fields + String filename; CommonPolygenicScore pgs = new CommonPolygenicScore(); pgs.setId(pgsId); pgs.setSource(PGS_CATALOG_DATA); @@ -276,7 +272,8 @@ private void processPgsMetadataFile(File metadataFile, BufferedWriter bw) throws String[] field; // PGSxxxxx_metadata_publications.csv - try (BufferedReader br = FileUtils.newBufferedReader(tmp.resolve(pgsId + "_metadata_publications.csv"))) { + filename = pgsId + "_metadata_publications.csv"; + try (BufferedReader br = FileUtils.newBufferedReader(tmp.resolve(filename))) { // Skip first line br.readLine(); while ((line = br.readLine()) != null) { @@ -289,10 +286,13 @@ private void processPgsMetadataFile(File metadataFile, BufferedWriter bw) throws CSVRecord strings = csvParser.getRecords().get(0); pgs.getPubmedRefs().add(new PubmedReference(strings.get(8), strings.get(2), strings.get(3), strings.get(4), null)); } + } catch (IOException e) { + throw new CellBaseException("Parsing file " + filename, e); } // PGSxxxxx_metadata_efo_traits.csv - try (BufferedReader br = FileUtils.newBufferedReader(tmp.resolve(pgsId + "_metadata_efo_traits.csv"))) { + filename = pgsId + "_metadata_efo_traits.csv"; + try (BufferedReader br = FileUtils.newBufferedReader(tmp.resolve(filename))) { // Skip first line br.readLine(); while ((line = br.readLine()) != null) { @@ -304,10 +304,13 @@ private void processPgsMetadataFile(File metadataFile, BufferedWriter bw) throws pgs.getTraits().add(new OntologyTermAnnotation(strings.get(0), strings.get(1), strings.get(2), "EFO", strings.get(3), new HashMap<>())); } + } catch (IOException e) { + throw new CellBaseException("Parsing file " + filename, e); } // PGSxxxxx_metadata_scores.csv - try (BufferedReader br = FileUtils.newBufferedReader(tmp.resolve(pgsId + "_metadata_scores.csv"))) { + filename = pgsId + "_metadata_scores.csv"; + try (BufferedReader br = FileUtils.newBufferedReader(tmp.resolve(filename))) { // Skip first line br.readLine(); while ((line = br.readLine()) != null) { @@ -335,6 +338,8 @@ private void processPgsMetadataFile(File metadataFile, BufferedWriter bw) throws } pgs.setName(strings.get(1)); } + } catch (IOException e) { + throw new CellBaseException("Parsing file " + filename, e); } // TODO: PGSxxxxx_metadata_score_development_samples.csv @@ -348,7 +353,8 @@ private void processPgsMetadataFile(File metadataFile, BufferedWriter bw) throws // GWAS Catalog Study ID (GCST...) Source PubMed ID (PMID) Source DOI Cohort(s) Additional Sample/Cohort Information // PGSxxxxx_metadata_performance_metrics.csv - try (BufferedReader br = FileUtils.newBufferedReader(tmp.resolve(pgsId + "_metadata_performance_metrics.csv"))) { + filename = pgsId + "_metadata_performance_metrics.csv"; + try (BufferedReader br = FileUtils.newBufferedReader(tmp.resolve(filename))) { // Skip first line br.readLine(); while ((line = br.readLine()) != null) { @@ -394,6 +400,8 @@ private void processPgsMetadataFile(File metadataFile, BufferedWriter bw) throws } pgs.getValues().add(values); } + } catch (IOException e) { + throw new CellBaseException("Parsing file " + filename, e); } // TODO: PGSxxxxx_metadata_evaluation_sample_sets.csv @@ -407,7 +415,8 @@ private void processPgsMetadataFile(File metadataFile, BufferedWriter bw) throws // GWAS Catalog Study ID (GCST...) Source PubMed ID (PMID) Source DOI Cohort(s) Additional Sample/Cohort Information // PGSxxxxx_metadata_cohorts.csv - try (BufferedReader br = FileUtils.newBufferedReader(tmp.resolve(pgsId + "_metadata_cohorts.csv"))) { + filename = pgsId + "_metadata_cohorts.csv"; + try (BufferedReader br = FileUtils.newBufferedReader(tmp.resolve(filename))) { // Skip first line line = br.readLine(); while ((line = br.readLine()) != null) { @@ -418,11 +427,17 @@ private void processPgsMetadataFile(File metadataFile, BufferedWriter bw) throws CSVRecord strings = csvParser.getRecords().get(0); pgs.getCohorts().add(new PgsCohort(strings.get(0), strings.get(1), strings.get(2))); } + } catch (IOException e) { + throw new CellBaseException("Parsing file " + filename, e); } // Create PGS object, with the common fields - bw.write(jsonObjectWriter.writeValueAsString(pgs)); - bw.write("\n"); + try { + bw.write(jsonObjectWriter.writeValueAsString(pgs)); + bw.write("\n"); + } catch (IOException e) { + throw new CellBaseException("Writing CommonPolygenicScore data model", e); + } // Clean tmp folder for (File tmpFile : tmp.toFile().listFiles()) { @@ -536,16 +551,15 @@ private void saveVariantPolygenicScore(String line, Map columnP if (dbContent == null) { // Add data to batch varBatch.put(key.getBytes(), ONE); - varBatchSize += key.getBytes().length + ONE.length; - if (varBatchSize >= MAX_BATCH_SIZE) { + varBatchCounter++; + if (varBatchCounter >= MAX_BATCH_SIZE) { // Write the batch to the database // logger.info("Writing variant ID batch with {} items, {} KB", varBatch.count(), varBatchSize / 1024); rdb.write(new WriteOptions(), varBatch); // Reset batch varBatch.clear(); - varBatch = new WriteBatch(); - varBatchSize = 0; - } + varBatchCounter = 0; + } // rdb.put(key.getBytes(), ONE); } @@ -564,15 +578,14 @@ private void saveVariantPolygenicScore(String line, Map columnP byte[] rdbKey = key.getBytes(); byte[] rdbValue = jsonObjectWriter.writeValueAsBytes(varPgs); varPgsBatch.put(rdbKey, rdbValue); - varPgsBatchSize += rdbKey.length + rdbValue.length; - if (varPgsBatchSize >= MAX_BATCH_SIZE) { + varPgsBatchCounter++; + if (varPgsBatchCounter >= MAX_BATCH_SIZE) { // Write the batch to the database // logger.info("Writing PGS batch with {} items, {} KB", varPgsBatch.count(), varPgsBatchSize / 1024); rdb.write(new WriteOptions(), varPgsBatch); // Reset batch varPgsBatch.clear(); - varPgsBatch = new WriteBatch(); - varPgsBatchSize = 0; + varPgsBatchCounter = 0; } // rdb.put(key.getBytes(), jsonObjectWriter.writeValueAsBytes(varPgs)); }