Skip to content

Commit

Permalink
lib: fix protein builder, #TASK-5576, #TASK-5564
Browse files Browse the repository at this point in the history
  • Loading branch information
jtarraga committed Jul 29, 2024
1 parent b0d1c67 commit 039aa81
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static org.opencb.cellbase.lib.builders.AbstractBuilder.BUILDING_DONE_LOG_MESSAGE;
import static org.opencb.cellbase.lib.builders.AbstractBuilder.BUILDING_LOG_MESSAGE;
import static org.opencb.cellbase.lib.builders.GenomeSequenceFastaBuilder.GENOME_OUTPUT_FILENAME;
import static org.opencb.cellbase.lib.builders.ProteinBuilder.OUTPUT_PROTEIN_OUTPUT_FILENAME;
import static org.opencb.cellbase.lib.builders.RegulatoryFeatureBuilder.*;
import static org.opencb.cellbase.lib.builders.RepeatsBuilder.REPEATS_OUTPUT_FILENAME;
import static org.opencb.cellbase.lib.download.GenomeDownloadManager.GENOME_INFO_FILENAME;
Expand Down Expand Up @@ -326,11 +327,24 @@ private AbstractBuilder buildRegulation() throws CellBaseException {
}

private AbstractBuilder buildProtein() throws CellBaseException {
logger.info(BUILDING_LOG_MESSAGE, getDataName(PROTEIN_DATA));

// Sanity check
Path proteinDownloadPath = downloadFolder.resolve(PROTEIN_DATA);
Path proteinBuildPath = buildFolder.resolve(PROTEIN_DATA);
copyVersionFiles(Arrays.asList(proteinDownloadPath.resolve(getDataVersionFilename(UNIPROT_DATA)),
proteinDownloadPath.resolve(getDataVersionFilename(INTERPRO_DATA))), proteinBuildPath);
List<Path> filesToCheck = Arrays.asList(proteinBuildPath.resolve(OUTPUT_PROTEIN_OUTPUT_FILENAME),
proteinBuildPath.resolve(getDataVersionFilename(INTERPRO_DATA)),
proteinBuildPath.resolve(getDataVersionFilename(INTACT_DATA)),
proteinBuildPath.resolve(getDataVersionFilename(UNIPROT_DATA)));
if (AbstractBuilder.existFiles(filesToCheck)) {
logger.warn(DATA_ALREADY_BUILT, getDataName(PROTEIN_DATA));
return null;
}

copyVersionFiles(Arrays.asList(proteinDownloadPath.resolve(INTERPRO_DATA).resolve(getDataVersionFilename(
INTERPRO_DATA)), proteinDownloadPath.resolve(INTACT_DATA).resolve(getDataVersionFilename(
INTACT_DATA)), proteinDownloadPath.resolve(UNIPROT_DATA).resolve(getDataVersionFilename(
UNIPROT_DATA))), proteinBuildPath);

// Create the file serializer and the protein builder
CellBaseSerializer serializer = new CellBaseJsonFileSerializer(proteinBuildPath, PROTEIN_DATA);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.xml.bind.JAXBException;
import java.io.BufferedReader;
Expand All @@ -41,6 +39,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.stream.Collectors;

import static org.opencb.cellbase.lib.EtlCommons.*;

Expand All @@ -49,7 +48,7 @@ public class ProteinBuilder extends AbstractBuilder {
private Path proteinPath;
private String species;

protected Logger logger = LoggerFactory.getLogger(this.getClass());
public static final String OUTPUT_PROTEIN_OUTPUT_FILENAME = PROTEIN_DATA + ".json.gz";

public ProteinBuilder(Path proteinPath, String species, CellBaseSerializer serializer) {
super(serializer);
Expand All @@ -60,32 +59,31 @@ public ProteinBuilder(Path proteinPath, String species, CellBaseSerializer seria

@Override
public void parse() throws CellBaseException, IOException {
logger.info(BUILDING_LOG_MESSAGE, getDataName(PROTEIN_DATA));

// Sanity check
checkDirectory(proteinPath, getDataName(PROTEIN_DATA));

// Check UniProt file
DataSource dataSource = dataSourceReader.readValue(proteinPath.resolve(getDataVersionFilename(UNIPROT_DATA)).toFile());
List<File> uniProtFiles = checkFiles(dataSource, proteinPath, getDataCategory(UNIPROT_DATA) + "/" + getDataName(UNIPROT_DATA));
DataSource dataSource = dataSourceReader.readValue(proteinPath.resolve(UNIPROT_DATA).resolve(getDataVersionFilename(UNIPROT_DATA))
.toFile());
List<File> uniProtFiles = checkFiles(dataSource, proteinPath.resolve(UNIPROT_DATA), getDataCategory(UNIPROT_DATA) + "/"
+ getDataName(UNIPROT_DATA));
if (uniProtFiles.size() != 1) {
throw new CellBaseException("Only one " + getDataName(UNIPROT_DATA) + " file is expected, but currently there are "
+ uniProtFiles.size() + " files");
throw new CellBaseException(getMismatchNumFilesErrorMessage(getDataName(UNIPROT_DATA), uniProtFiles.size()));
}

// Check InterPro file
dataSource = dataSourceReader.readValue(proteinPath.resolve(getDataVersionFilename(INTERPRO_DATA)).toFile());
List<File> interProFiles = checkFiles(dataSource, proteinPath, getDataCategory(INTERPRO_DATA) + "/" + getDataName(INTERPRO_DATA));
dataSource = dataSourceReader.readValue(proteinPath.resolve(INTERPRO_DATA).resolve(getDataVersionFilename(INTERPRO_DATA)).toFile());
List<File> interProFiles = checkFiles(dataSource, proteinPath.resolve(INTERPRO_DATA), getDataCategory(INTERPRO_DATA) + "/"
+ getDataName(INTERPRO_DATA));
if (interProFiles.size() != 1) {
throw new CellBaseException("Only one " + getDataName(INTERPRO_DATA) + " file is expected, but currently there are "
+ interProFiles.size() + " files");
throw new CellBaseException(getMismatchNumFilesErrorMessage(getDataName(INTERPRO_DATA), interProFiles.size()));
}

// Prepare UniProt data by splitting data in chunks
Path uniProtChunksPath = serializer.getOutdir().resolve(UNIPROT_CHUNKS_SUBDIRECTORY);
logger.info("Split {} file {} into chunks at {}", getDataName(UNIPROT_DATA), uniProtFiles.get(0).getName(), uniProtChunksPath);
Files.createDirectories(uniProtChunksPath);
splitUniprot(proteinPath.resolve(uniProtFiles.get(0).getName()), uniProtChunksPath);
splitUniprot(proteinPath.resolve(UNIPROT_DATA).resolve(uniProtFiles.get(0).getName()), uniProtChunksPath);

// Prepare RocksDB
RocksDB rocksDb = getDBConnection(uniProtChunksPath);
Expand All @@ -99,6 +97,7 @@ public void parse() throws CellBaseException, IOException {
try {
File[] files = uniProtChunksPath.toFile().listFiles((dir, name) -> name.endsWith(".xml") || name.endsWith(".xml.gz"));


for (File file : files) {
logger.info(PARSING_LOG_MESSAGE, file);
Uniprot uniprot = (Uniprot) UniProtParser.loadXMLInfo(file.toString(), UniProtParser.UNIPROT_CONTEXT);
Expand All @@ -108,18 +107,24 @@ public void parse() throws CellBaseException, IOException {
for (OrganismNameType organismNameType : entry.getOrganism().getName()) {
entryOrganism = organismNameType.getValue();
if (entryOrganism.equals(species)) {
proteinMap.put(entry.getAccession().get(0), entry);

// Update RocksDB
rocksDb.put(entry.getAccession().get(0).getBytes(), jsonObjectWriter.writeValueAsBytes(entry));
}
}
}
logger.info(PARSING_DONE_LOG_MESSAGE, file);
logger.info(PARSING_DONE_LOG_MESSAGE);
}
logger.info("Number of proteins stored in map: '{}'", proteinMap.size());
if (proteinMap.size() > 10) {
logger.info("First 10 protein IDs in map: {}", proteinMap.keySet().stream().collect(Collectors.toList()).subList(0, 10));
}
logger.debug("Number of proteins stored in map: '{}'", proteinMap.size());

logger.info(PARSING_LOG_MESSAGE, interProFiles.get(0));
try (BufferedReader interproBuffereReader = FileUtils.newBufferedReader(interProFiles.get(0).toPath())) {
Set<String> hashSet = new HashSet<>(proteinMap.keySet());
Set<String> visited = new HashSet<>(30000);
Set<String> hashSet = proteinMap.keySet();
Set<String> visited = new HashSet<>(proteinMap.size());

int numInterProLinesProcessed = 0;
int numUniqueProteinsProcessed = 0;
Expand All @@ -141,8 +146,6 @@ public void parse() throws CellBaseException, IOException {
&& featureType.getLocation().getEnd().getPosition() != null
&& featureType.getLocation().getBegin().getPosition().equals(start)
&& featureType.getLocation().getEnd().getPosition().equals(end)) {
featureType.setId(fields[1]);
featureType.setRef(fields[3]);
iprAdded = true;
break;
}
Expand All @@ -166,24 +169,38 @@ public void parse() throws CellBaseException, IOException {
bytes = rocksDb.get(fields[0].getBytes());
entry = mapper.readValue(bytes, Entry.class);
entry.getFeature().add(featureType);

if (fields[0].equalsIgnoreCase(entry.getAccession().get(0))) {
// Update RocksDB
rocksDb.put(fields[0].getBytes(), jsonObjectWriter.writeValueAsBytes(entry));
} else {
logger.info("Something wrong happen: interpro fields[0] = {} vs entry.getAccession().get(0) = {}",
fields[0], entry.getAccession().get(0));
}
}

if (!visited.contains(fields[0])) {
visited.add(fields[0]);
numUniqueProteinsProcessed++;
}
} else {
logger.info("{} not found in protein map", fields[0]);
}

if (++numInterProLinesProcessed % 10000000 == 0) {
logger.debug("{} {} lines processed. {} unique proteins processed", numInterProLinesProcessed,
getDataName(INTERPRO_DATA), numUniqueProteinsProcessed);
logger.info("{} {} lines processed", numInterProLinesProcessed, getDataName(INTERPRO_DATA));
logger.info("{} {} unique proteins processed", getDataName(INTERPRO_DATA), numUniqueProteinsProcessed);
}
}
logger.info(PARSING_DONE_LOG_MESSAGE, interProFiles.get(0));
logger.info("{} {} lines processed", numInterProLinesProcessed, getDataName(INTERPRO_DATA));
logger.info("{} {} unique proteins processed", getDataName(INTERPRO_DATA), numUniqueProteinsProcessed);

logger.info(PARSING_DONE_LOG_MESSAGE);
} catch (IOException e) {
throw new CellBaseException("Error parsing " + getDataName(INTERPRO_DATA) + " file: " + interProFiles.get(0), e);
}


// Serialize and save results
RocksIterator rocksIterator = rocksDb.newIterator();
for (rocksIterator.seekToFirst(); rocksIterator.isValid(); rocksIterator.next()) {
Expand Down Expand Up @@ -258,4 +275,8 @@ private void splitUniprot(Path uniprotFilePath, Path splitOutdirPath) throws IOE
}
}
}

private String getMismatchNumFilesErrorMessage(String dataName, int numFiles) {
return "Only one " + dataName + " file is expected, but currently there are " + numFiles + " files";
}
}

0 comments on commit 039aa81

Please sign in to comment.