Skip to content

Commit

Permalink
lib: add MongoDB listener to check updates in the data_release collec…
Browse files Browse the repository at this point in the history
…tion, #TASK-6565
  • Loading branch information
jtarraga committed Jul 22, 2024
1 parent dc27856 commit f0d9599
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public ValidationCommandExecutor(AdminCliOptionsParser.ValidationCommandOptions
}

@Override
public void execute() {
public void execute() throws CellBaseException {
checkFilesExist();

VariantAnnotationCalculator variantAnnotationCalculator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,23 @@

package org.opencb.cellbase.lib.impl.core;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.bson.Document;
import org.opencb.cellbase.core.config.CellBaseConfiguration;
import org.opencb.cellbase.core.config.SpeciesConfiguration;
import org.opencb.cellbase.core.exception.CellBaseException;
import org.opencb.cellbase.core.models.DataRelease;
import org.opencb.cellbase.lib.db.MongoDBManager;
import org.opencb.cellbase.lib.managers.CellBaseManagerFactory;
import org.opencb.commons.datastore.mongodb.MongoDBCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
Expand All @@ -47,12 +55,14 @@ public final class DataReleaseSingleton {

private static DataReleaseSingleton instance;

private static final Logger LOGGER = LoggerFactory.getLogger(DataReleaseSingleton.class);

public static final String UNKOWN_DATABASE_MSG_PREFIX = "Unknown database ";
public static final String INVALID_RELEASE_MSG_PREFIX = "Invalid release ";
public static final String INVALID_DATA_MSG_PREFIX = "Invalid data ";

// Private constructor to prevent instantiation
private DataReleaseSingleton(CellBaseManagerFactory managerFactory) {
private DataReleaseSingleton(CellBaseManagerFactory managerFactory) throws CellBaseException {
this.managerFactory = managerFactory;

// Support multi species and assemblies
Expand All @@ -66,12 +76,29 @@ private DataReleaseSingleton(CellBaseManagerFactory managerFactory) {
assemblyMap.put(databaseName, assembly.getName());
rwLockMap.put(databaseName, new ReentrantReadWriteLock());
cachedData.put(databaseName, new HashMap<>());

MongoClient mongoClient = managerFactory.getDataReleaseManager(vertebrate.getId(), assembly.getName()).getMongoDatastore()
.getMongoClient();
MongoDatabase database = mongoClient.getDatabase(databaseName);
MongoCollection<Document> collection = database.getCollection(ReleaseMongoDBAdaptor.DATA_RELEASE_COLLECTION_NAME);
LOGGER.info("Setting listener for database {} and collection {}", database.getName(), collection.getNamespace()
.getCollectionName());
// Set up the change stream for the collection
new Thread(() -> {
collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).forEach(changeStreamDocument -> {
try {
handleDocumentChange(changeStreamDocument);
} catch (CellBaseException e) {
LOGGER.warn("Exception from handle document change function: {}", e.getStackTrace());
}
});
}).start();
}
}
}

// Initialization method to set up the instance with parameters
public static synchronized void initialize(CellBaseManagerFactory managerFactory) {
public static synchronized void initialize(CellBaseManagerFactory managerFactory) throws CellBaseException {
if (instance == null) {
instance = new DataReleaseSingleton(managerFactory);
}
Expand Down Expand Up @@ -149,4 +176,24 @@ public MongoDBCollection getMongoDBCollection(String dbname, String data, int re
checkDataRelease(dbname, release, data);
return cachedData.get(dbname).get(release).get(data);
}

private void handleDocumentChange(ChangeStreamDocument<Document> changeStreamDocument) throws CellBaseException {
// Get database name
String dbname = changeStreamDocument.getNamespace().getDatabaseName();
String collectionName = changeStreamDocument.getNamespace().getCollectionName();
LOGGER.info("Collection {} of database {} has been updated", collectionName, dbname);

// Handle the change event
if (!cachedData.containsKey(dbname)) {
// If the data release is invalid, throw an exception
String msg = UNKOWN_DATABASE_MSG_PREFIX + dbname;
throw new CellBaseException(msg);
}
rwLockMap.get(dbname).writeLock().lock();
try {
loadData(dbname);
} finally {
rwLockMap.get(dbname).writeLock().unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class ReleaseMongoDBAdaptor extends MongoDBAdaptor implements CellBaseCor

private MongoDBCollection mongoDBCollection;

private final String DATA_RELEASE_COLLECTION_NAME = "data_release";
public static final String DATA_RELEASE_COLLECTION_NAME = "data_release";

public ReleaseMongoDBAdaptor(MongoDataStore mongoDataStore) {
super(mongoDataStore);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,8 @@ protected List<Query> createQueries(Query query, String csvField, String queryKe
public void close() {
mongoDBManager.close();
}

public MongoDataStore getMongoDatastore() {
return mongoDatastore;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class CellBaseManagerFactory {

private Logger logger;

public CellBaseManagerFactory(CellBaseConfiguration configuration) {
public CellBaseManagerFactory(CellBaseConfiguration configuration) throws CellBaseException {
this.configuration = configuration;
logger = LoggerFactory.getLogger(this.getClass());

Expand Down Expand Up @@ -326,7 +326,7 @@ public OntologyManager getOntologyManager(String species, String assembly) throw
return ontologyManagers.get(multiKey);
}

public DataReleaseManager getDataRelesaseManager(String species) throws CellBaseException {
public DataReleaseManager getDataReleaseManager(String species) throws CellBaseException {
if (species == null) {
throw new CellBaseException("Species is required.");
}
Expand Down

0 comments on commit f0d9599

Please sign in to comment.