Skip to content

Commit

Permalink
lib: limit WriteBatch for number of items, #TASK-5407, #TASK-5387
Browse files Browse the repository at this point in the history
  • Loading branch information
jtarraga committed Sep 12, 2024
1 parent 2950c0e commit 2889192
Showing 1 changed file with 45 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public class PolygenicScoreBuilder extends AbstractBuilder {
private Set<String> pgsIdSet;
private Object[] varRDBConn;
private Object[] varPgsRDBConn;
private int varBatchSize;
private int varPgsBatchSize;
private int varBatchCounter = 0;
private int varPgsBatchCounter = 0;
private WriteBatch varBatch;
private WriteBatch varPgsBatch;

Expand All @@ -65,7 +65,7 @@ public class PolygenicScoreBuilder extends AbstractBuilder {
private static ObjectReader varPgsReader;
private static ObjectWriter jsonObjectWriter;

private static final int MAX_BATCH_SIZE = 1024 * 1024; // 1 MB
private static final int MAX_BATCH_SIZE = 100;

private static final String RSID_COL = "rsID";
private static final String CHR_NAME_COL = "chr_name";
Expand Down Expand Up @@ -154,11 +154,9 @@ public void check() throws CellBaseException, IOException {
// Prepare RocksDB for variant IDs
this.varRDBConn = getDBConnection(integrationPath.resolve("rdb-var.idx").toString(), true);
this.varBatch = new WriteBatch();
this.varBatchSize = 0;
// Prepare RocksDB for PGS/variants
this.varPgsRDBConn = getDBConnection(integrationPath.resolve("rdb-var-pgs.idx").toString(), true);
this.varPgsBatch = new WriteBatch();
this.varPgsBatchSize = 0;
// PGS set
this.pgsIdSet = new HashSet<>();

Expand All @@ -182,6 +180,7 @@ public void parse() throws Exception {
for (File file : files) {
if (file.isFile()) {
if (file.getName().endsWith(TXT_GZ_EXTENSION)) {
// E.g.: PGS004905_hmPOS_GRCh38.txt.gz: it contains the variants
logger.info(PARSING_LOG_MESSAGE, file.getName());

String pgsId = null;
Expand Down Expand Up @@ -216,6 +215,7 @@ public void parse() throws Exception {
}
logger.info(PARSING_DONE_LOG_MESSAGE, file.getName());
} else if (file.getName().endsWith("_metadata" + TAR_GZ_EXTENSION)) {
// E.g.: PGS004905_metadata.tar.gz: it contains a set of files about metadata
logger.info(PARSING_LOG_MESSAGE, file.getName());
processPgsMetadataFile(file, bw);
logger.info(PARSING_DONE_LOG_MESSAGE, file.getName());
Expand Down Expand Up @@ -244,7 +244,7 @@ public void parse() throws Exception {
logger.info(BUILDING_DONE_LOG_MESSAGE, getDataName(PGS_DATA));
}

private void processPgsMetadataFile(File metadataFile, BufferedWriter bw) throws IOException, CellBaseException {
private void processPgsMetadataFile(File metadataFile, BufferedWriter bw) throws CellBaseException {
String pgsId = metadataFile.getName().split("_")[0];

Path tmp = serializer.getOutdir().resolve("tmp");
Expand All @@ -253,20 +253,16 @@ private void processPgsMetadataFile(File metadataFile, BufferedWriter bw) throws
}

String command = "tar -xzf " + metadataFile.getAbsolutePath() + " -C " + tmp.toAbsolutePath();
Process process = Runtime.getRuntime().exec(command);

// Wait for the process to complete
try {
int exitCode = process.waitFor();
// We don't check the code because it is failing in the CellBase worker machine
// if (exitCode != 0) {
// throw new IOException("Error executing the command. Exit code: " + exitCode);
// }
} catch (InterruptedException e) {
throw new IOException("Error waiting for the process '" + command + "' to complete.", e);
logger.info("Executing: {}", command);
Process process = Runtime.getRuntime().exec(command);
process.waitFor();
} catch (IOException | InterruptedException e) {
throw new CellBaseException("Exception raised when executing: " + command, e);
}

// Create PGS object, with the common fields
String filename;
CommonPolygenicScore pgs = new CommonPolygenicScore();
pgs.setId(pgsId);
pgs.setSource(PGS_CATALOG_DATA);
Expand All @@ -276,7 +272,8 @@ private void processPgsMetadataFile(File metadataFile, BufferedWriter bw) throws
String[] field;

// PGSxxxxx_metadata_publications.csv
try (BufferedReader br = FileUtils.newBufferedReader(tmp.resolve(pgsId + "_metadata_publications.csv"))) {
filename = pgsId + "_metadata_publications.csv";
try (BufferedReader br = FileUtils.newBufferedReader(tmp.resolve(filename))) {
// Skip first line
br.readLine();
while ((line = br.readLine()) != null) {
Expand All @@ -289,10 +286,13 @@ private void processPgsMetadataFile(File metadataFile, BufferedWriter bw) throws
CSVRecord strings = csvParser.getRecords().get(0);
pgs.getPubmedRefs().add(new PubmedReference(strings.get(8), strings.get(2), strings.get(3), strings.get(4), null));
}
} catch (IOException e) {
throw new CellBaseException("Parsing file " + filename, e);
}

// PGSxxxxx_metadata_efo_traits.csv
try (BufferedReader br = FileUtils.newBufferedReader(tmp.resolve(pgsId + "_metadata_efo_traits.csv"))) {
filename = pgsId + "_metadata_efo_traits.csv";
try (BufferedReader br = FileUtils.newBufferedReader(tmp.resolve(filename))) {
// Skip first line
br.readLine();
while ((line = br.readLine()) != null) {
Expand All @@ -304,10 +304,13 @@ private void processPgsMetadataFile(File metadataFile, BufferedWriter bw) throws
pgs.getTraits().add(new OntologyTermAnnotation(strings.get(0), strings.get(1), strings.get(2), "EFO", strings.get(3),
new HashMap<>()));
}
} catch (IOException e) {
throw new CellBaseException("Parsing file " + filename, e);
}

// PGSxxxxx_metadata_scores.csv
try (BufferedReader br = FileUtils.newBufferedReader(tmp.resolve(pgsId + "_metadata_scores.csv"))) {
filename = pgsId + "_metadata_scores.csv";
try (BufferedReader br = FileUtils.newBufferedReader(tmp.resolve(filename))) {
// Skip first line
br.readLine();
while ((line = br.readLine()) != null) {
Expand Down Expand Up @@ -335,6 +338,8 @@ private void processPgsMetadataFile(File metadataFile, BufferedWriter bw) throws
}
pgs.setName(strings.get(1));
}
} catch (IOException e) {
throw new CellBaseException("Parsing file " + filename, e);
}

// TODO: PGSxxxxx_metadata_score_development_samples.csv
Expand All @@ -348,7 +353,8 @@ private void processPgsMetadataFile(File metadataFile, BufferedWriter bw) throws
// GWAS Catalog Study ID (GCST...) Source PubMed ID (PMID) Source DOI Cohort(s) Additional Sample/Cohort Information

// PGSxxxxx_metadata_performance_metrics.csv
try (BufferedReader br = FileUtils.newBufferedReader(tmp.resolve(pgsId + "_metadata_performance_metrics.csv"))) {
filename = pgsId + "_metadata_performance_metrics.csv";
try (BufferedReader br = FileUtils.newBufferedReader(tmp.resolve(filename))) {
// Skip first line
br.readLine();
while ((line = br.readLine()) != null) {
Expand Down Expand Up @@ -394,6 +400,8 @@ private void processPgsMetadataFile(File metadataFile, BufferedWriter bw) throws
}
pgs.getValues().add(values);
}
} catch (IOException e) {
throw new CellBaseException("Parsing file " + filename, e);
}

// TODO: PGSxxxxx_metadata_evaluation_sample_sets.csv
Expand All @@ -407,7 +415,8 @@ private void processPgsMetadataFile(File metadataFile, BufferedWriter bw) throws
// GWAS Catalog Study ID (GCST...) Source PubMed ID (PMID) Source DOI Cohort(s) Additional Sample/Cohort Information

// PGSxxxxx_metadata_cohorts.csv
try (BufferedReader br = FileUtils.newBufferedReader(tmp.resolve(pgsId + "_metadata_cohorts.csv"))) {
filename = pgsId + "_metadata_cohorts.csv";
try (BufferedReader br = FileUtils.newBufferedReader(tmp.resolve(filename))) {
// Skip first line
line = br.readLine();
while ((line = br.readLine()) != null) {
Expand All @@ -418,11 +427,17 @@ private void processPgsMetadataFile(File metadataFile, BufferedWriter bw) throws
CSVRecord strings = csvParser.getRecords().get(0);
pgs.getCohorts().add(new PgsCohort(strings.get(0), strings.get(1), strings.get(2)));
}
} catch (IOException e) {
throw new CellBaseException("Parsing file " + filename, e);
}

// Create PGS object, with the common fields
bw.write(jsonObjectWriter.writeValueAsString(pgs));
bw.write("\n");
try {
bw.write(jsonObjectWriter.writeValueAsString(pgs));
bw.write("\n");
} catch (IOException e) {
throw new CellBaseException("Writing CommonPolygenicScore data model", e);
}

// Clean tmp folder
for (File tmpFile : tmp.toFile().listFiles()) {
Expand Down Expand Up @@ -536,16 +551,15 @@ private void saveVariantPolygenicScore(String line, Map<String, Integer> columnP
if (dbContent == null) {
// Add data to batch
varBatch.put(key.getBytes(), ONE);
varBatchSize += key.getBytes().length + ONE.length;
if (varBatchSize >= MAX_BATCH_SIZE) {
varBatchCounter++;
if (varBatchCounter >= MAX_BATCH_SIZE) {
// Write the batch to the database
// logger.info("Writing variant ID batch with {} items, {} KB", varBatch.count(), varBatchSize / 1024);
rdb.write(new WriteOptions(), varBatch);
// Reset batch
varBatch.clear();
varBatch = new WriteBatch();
varBatchSize = 0;
}
varBatchCounter = 0;
}
// rdb.put(key.getBytes(), ONE);
}

Expand All @@ -564,15 +578,14 @@ private void saveVariantPolygenicScore(String line, Map<String, Integer> columnP
byte[] rdbKey = key.getBytes();
byte[] rdbValue = jsonObjectWriter.writeValueAsBytes(varPgs);
varPgsBatch.put(rdbKey, rdbValue);
varPgsBatchSize += rdbKey.length + rdbValue.length;
if (varPgsBatchSize >= MAX_BATCH_SIZE) {
varPgsBatchCounter++;
if (varPgsBatchCounter >= MAX_BATCH_SIZE) {
// Write the batch to the database
// logger.info("Writing PGS batch with {} items, {} KB", varPgsBatch.count(), varPgsBatchSize / 1024);
rdb.write(new WriteOptions(), varPgsBatch);
// Reset batch
varPgsBatch.clear();
varPgsBatch = new WriteBatch();
varPgsBatchSize = 0;
varPgsBatchCounter = 0;
}
// rdb.put(key.getBytes(), jsonObjectWriter.writeValueAsBytes(varPgs));
}
Expand Down

0 comments on commit 2889192

Please sign in to comment.