Skip to content

Commit

Permalink
lib: improve PGS builder, #TASK-5407, #TASK-5387
Browse files Browse the repository at this point in the history
  • Loading branch information
jtarraga committed Oct 1, 2024
1 parent 2889192 commit 8c8a4ea
Showing 1 changed file with 77 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,73 +174,89 @@ public void parse() throws Exception {

logger.info(BUILDING_LOG_MESSAGE, getDataName(PGS_DATA));

int numFiles;
int counter;
String endsWith;
File[] files = downloadPath.toFile().listFiles();

// First, process metadata files
try (BufferedWriter bw = FileUtils.newBufferedWriter(serializer.getOutdir().resolve(PGS_COMMON_COLLECTION + JSON_GZ_EXTENSION))) {
int counter = 0;
File[] files = downloadPath.toFile().listFiles();
counter = 0;
endsWith = "_metadata" + TAR_GZ_EXTENSION;
numFiles = getNumFiles(files, endsWith);
for (File file : files) {
if (file.isFile()) {
if (file.getName().endsWith(TXT_GZ_EXTENSION)) {
// E.g.: PGS004905_hmPOS_GRCh38.txt.gz: it contains the variants
logger.info(PARSING_LOG_MESSAGE, file.getName());

String pgsId = null;
Map<String, Integer> columnPos = new HashMap<>();

try (BufferedReader br = FileUtils.newBufferedReader(file.toPath())) {
String line;
while ((line = br.readLine()) != null) {
if (line.startsWith("#")) {
if (line.startsWith("#pgs_id=")) {
pgsId = line.split("=")[1].trim();
// Sanity check
if (!file.getName().startsWith(pgsId)) {
throw new CellBaseException("Error parsing file " + file.getName() + ": pgs_id mismatch");
}
// Add PGS ID to the set
pgsIdSet.add(pgsId);
}
} else if (line.startsWith(RSID_COL) || line.startsWith(CHR_NAME_COL)) {
String[] fields = line.split("\t");
for (int i = 0; i < fields.length; i++) {
columnPos.put(fields[i], i);
}
} else {
// Sanity check
if (pgsId == null) {
throw new CellBaseException("Error parsing file " + file.getName() + ": pgs_id is null");
}
saveVariantPolygenicScore(line, columnPos, pgsId);
if (file.isFile() && file.getName().endsWith(endsWith)) {
// E.g.: PGS004905_metadata.tar.gz: it contains a set of files about metadata
logger.info(PARSING_LOG_MESSAGE, file.getName());
processPgsMetadataFile(file, bw);
logger.info(PARSING_DONE_LOG_MESSAGE, file.getName());
logger.info("Progress: {} of {} meta files", ++counter, numFiles);
}
}
}

// Second, process variant files
counter = 0;
endsWith = TXT_GZ_EXTENSION;
numFiles = getNumFiles(files, endsWith);
for (File file : files) {
if (file.isFile() && file.getName().endsWith(endsWith)) {
// E.g.: PGS004905_hmPOS_GRCh38.txt.gz: it contains the variants
logger.info(PARSING_LOG_MESSAGE, file.getName());

String pgsId = null;
Map<String, Integer> columnPos = new HashMap<>();

try (BufferedReader br = FileUtils.newBufferedReader(file.toPath())) {
String line;
while ((line = br.readLine()) != null) {
if (line.startsWith("#")) {
if (line.startsWith("#pgs_id=")) {
pgsId = line.split("=")[1].trim();
// Sanity check
if (!file.getName().startsWith(pgsId)) {
throw new CellBaseException("Error parsing file " + file.getName() + ": pgs_id mismatch");
}
// Add PGS ID to the set
pgsIdSet.add(pgsId);
}
} else if (line.startsWith(RSID_COL) || line.startsWith(CHR_NAME_COL)) {
String[] fields = line.split("\t");
for (int i = 0; i < fields.length; i++) {
columnPos.put(fields[i], i);
}
} else {
// Sanity check
if (pgsId == null) {
throw new CellBaseException("Error parsing file " + file.getName() + ": pgs_id is null");
}
saveVariantPolygenicScore(line, columnPos, pgsId);
}
logger.info(PARSING_DONE_LOG_MESSAGE, file.getName());
} else if (file.getName().endsWith("_metadata" + TAR_GZ_EXTENSION)) {
// E.g.: PGS004905_metadata.tar.gz: it contains a set of files about metadata
logger.info(PARSING_LOG_MESSAGE, file.getName());
processPgsMetadataFile(file, bw);
logger.info(PARSING_DONE_LOG_MESSAGE, file.getName());
}
}
logger.info("Progress {} of {} files", ++counter, files.length);
logger.info(PARSING_DONE_LOG_MESSAGE, file.getName());
logger.info("Progress: {} of {} variant files", ++counter, numFiles);
}
}

// Write remaining variant ID batch
RocksDB rdb = (RocksDB) varRDBConn[0];
// logger.info("Writing variant ID batch with {} items, {} KB", varBatch.count(), varBatchSize / 1024);
RocksDB rdb;
// Write remaining variant ID batch
if (varBatchCounter > 0) {
rdb = (RocksDB) varRDBConn[0];
rdb.write(new WriteOptions(), varBatch);
varBatch.clear();
// Write remaining PGS/variant batch
}
// Write remaining PGS/variant batch
if (varPgsBatchCounter > 0) {
rdb = (RocksDB) varPgsRDBConn[0];
// logger.info("Writing PGS batch with {} items, {} KB", varPgsBatch.count(), varPgsBatchSize / 1024);
rdb.write(new WriteOptions(), varPgsBatch);
varPgsBatch.clear();


// Serialize/write the saved variant polygenic scores in the RocksDB
serializeRDB();
serializer.close();
}

// Serialize/write the saved variant polygenic scores in the RocksDB
serializeRDB();
serializer.close();

logger.info(BUILDING_DONE_LOG_MESSAGE, getDataName(PGS_DATA));
}

Expand Down Expand Up @@ -691,4 +707,14 @@ private Object[] getDBConnection(String dbLocation, boolean forceCreate) {

return new Object[]{db, options, dbLocation, indexingNeeded};
}

private int getNumFiles(File[] files, String endsWith) {
int numFiles = 0;
for (File file : files) {
if (file.isFile() && file.getName().endsWith(endsWith)) {
++numFiles;
}
}
return numFiles;
}
}

0 comments on commit 8c8a4ea

Please sign in to comment.