Skip to content

Commit

Permalink
lib: improve code and exception management in DataReleaseSingleton, #…
Browse files Browse the repository at this point in the history
…TASK-6565
  • Loading branch information
jtarraga committed Aug 8, 2024
1 parent f0d9599 commit 8665fa5
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.opencb.cellbase.lib.impl.core;

import org.opencb.cellbase.core.exception.CellBaseException;
import org.opencb.cellbase.lib.impl.core.singleton.DataReleaseSingleton;
import org.opencb.commons.datastore.mongodb.MongoDBCollection;
import org.opencb.commons.datastore.mongodb.MongoDataStore;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,4 +715,8 @@ protected List<IntervalFeatureFrequency> getIntervalFeatureFrequencies(Region re
// public void setAssembly(String assembly) {
// this.assembly = assembly;
// }

public MongoDataStore getMongoDataStore() {
return mongoDataStore;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package org.opencb.cellbase.lib.impl.core;
package org.opencb.cellbase.lib.impl.core.singleton;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
Expand All @@ -29,27 +29,22 @@
import org.opencb.cellbase.core.exception.CellBaseException;
import org.opencb.cellbase.core.models.DataRelease;
import org.opencb.cellbase.lib.db.MongoDBManager;
import org.opencb.cellbase.lib.impl.core.ReleaseMongoDBAdaptor;
import org.opencb.cellbase.lib.managers.CellBaseManagerFactory;
import org.opencb.commons.datastore.mongodb.MongoDBCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public final class DataReleaseSingleton {

// {key = dbname, value = species}
private Map<String, String> speciesMap = new HashMap<>();
// {key = dbname, value = assembly}
private Map<String, String> assemblyMap = new HashMap<>();
// {key = dbname, value = lock}
private Map<String, ReentrantReadWriteLock> rwLockMap = new HashMap<>();

// {key = dbname, value = { key = release, value = { key = data, value = collection } } }
private Map<String, Map<Integer, Map<String, MongoDBCollection>>> cachedData = new HashMap<>();
// Map where the key is dbname and the value is the DatabaseInfo
private Map<String, DatabaseInfo> dbInfoMap = new HashMap<>();

private CellBaseManagerFactory managerFactory;

Expand All @@ -72,10 +67,9 @@ private DataReleaseSingleton(CellBaseManagerFactory managerFactory) throws CellB
String databaseName = MongoDBManager.getDatabaseName(vertebrate.getId(), assembly.getName(), configuration.getVersion());
// This is necessary, before creating the database name the assembly is "cleaned", and we need to get the data release
// manager from the species and the assembly
speciesMap.put(databaseName, vertebrate.getId());
assemblyMap.put(databaseName, assembly.getName());
rwLockMap.put(databaseName, new ReentrantReadWriteLock());
cachedData.put(databaseName, new HashMap<>());
DatabaseInfo dbInfo = new DatabaseInfo(databaseName, vertebrate.getId(), assembly.getName(), new ReentrantReadWriteLock(),
new HashMap<>());
dbInfoMap.put(databaseName, dbInfo);

MongoClient mongoClient = managerFactory.getDataReleaseManager(vertebrate.getId(), assembly.getName()).getMongoDatastore()
.getMongoClient();
Expand All @@ -85,13 +79,31 @@ private DataReleaseSingleton(CellBaseManagerFactory managerFactory) throws CellB
.getCollectionName());
// Set up the change stream for the collection
new Thread(() -> {
collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).forEach(changeStreamDocument -> {
while (true) {
try {
handleDocumentChange(changeStreamDocument);
} catch (CellBaseException e) {
LOGGER.warn("Exception from handle document change function: {}", e.getStackTrace());
collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).forEach(changeStreamDocument -> {
try {
handleDocumentChange(changeStreamDocument);
} catch (CellBaseException e) {
LOGGER.warn("Exception from handle document change function (database = {}, collection = {}): {}",
collection.getNamespace().getDatabaseName(), collection.getNamespace().getCollectionName(),
Arrays.toString(e.getStackTrace()));
}
});
} catch (Exception e) {
LOGGER.error("Failed to watch collection (database = {}, collection = {}): {}",
collection.getNamespace().getDatabaseName(), collection.getNamespace().getCollectionName(),
Arrays.toString(e.getStackTrace()));
try {
// Sleep 5 sec before retrying to avoid tight loop
Thread.sleep(5000);
} catch (InterruptedException ie) {
// Restore interrupt status
Thread.currentThread().interrupt();
break; // Exit loop if interrupted
}
}
});
}
}).start();
}
}
Expand All @@ -114,18 +126,19 @@ public static DataReleaseSingleton getInstance() {

// Method to load data from MongoDB and cache it
private void loadData(String dbname) throws CellBaseException {
String species = speciesMap.get(dbname);
String assembly = assemblyMap.get(dbname);
DatabaseInfo dbInfo = getDatabaseInfo(dbname);

String species = dbInfo.getSpecies();
String assembly = dbInfo.getAssembly();
ReleaseMongoDBAdaptor releaseMongoDBAdaptor = managerFactory.getDataReleaseManager(species, assembly).getReleaseDBAdaptor();
List<DataRelease> dataReleases = releaseMongoDBAdaptor.getAll().getResults();
if (CollectionUtils.isNotEmpty(dataReleases)) {
cachedData.put(dbname, new HashMap<>());
for (DataRelease dataRelease : dataReleases) {
Map<String, MongoDBCollection> collectionMap = new HashMap<>();
for (Map.Entry<String, String> entry : dataRelease.getCollections().entrySet()) {
collectionMap.put(entry.getKey(), releaseMongoDBAdaptor.mongoDataStore.getCollection(entry.getValue()));
collectionMap.put(entry.getKey(), releaseMongoDBAdaptor.getMongoDataStore().getCollection(entry.getValue()));
}
cachedData.get(dbname).put(dataRelease.getRelease(), collectionMap);
dbInfo.getCacheData().put(dataRelease.getRelease(), collectionMap);
}
}
}
Expand All @@ -139,42 +152,41 @@ public void checkDataRelease(String dbname, int release) throws CellBaseExceptio
}

public void checkDataRelease(String dbname, int release, String data) throws CellBaseException {
DatabaseInfo dbInfo = getDatabaseInfo(dbname);

// Lock and load data if necessary
if (!cachedData.containsKey(dbname)) {
// If the data release is invalid, throw an exception
String msg = UNKOWN_DATABASE_MSG_PREFIX + dbname;
throw new CellBaseException(msg);
}
rwLockMap.get(dbname).writeLock().lock();
dbInfo.getRwLock().writeLock().lock();
try {
if (!cachedData.get(dbname).containsKey(release)
|| (StringUtils.isNotEmpty(data) && !cachedData.get(dbname).get(release).containsKey(data))) {
if (!dbInfo.getCacheData().containsKey(release)
|| (StringUtils.isNotEmpty(data) && !dbInfo.getCacheData().get(release).containsKey(data))) {
// Load the data releases from the MongoDB collection for that database name
loadData(dbname);

// Check after loading
if (!cachedData.get(dbname).containsKey(release)) {
if (!dbInfo.getCacheData().containsKey(release)) {
// If the release is invalid, throw an exception
String msg = INVALID_RELEASE_MSG_PREFIX + release + ". The available data releases are: "
+ cachedData.get(dbname).keySet();
+ dbInfo.getCacheData().keySet();
throw new CellBaseException(msg);
}
if (StringUtils.isNotEmpty(data) && !cachedData.get(dbname).get(release).containsKey(data)) {
if (StringUtils.isNotEmpty(data) && !dbInfo.getCacheData().get(release).containsKey(data)) {
// If the data is invalid, throw an exception
String msg = INVALID_DATA_MSG_PREFIX + " '" + data + "', it's not present in release " + release
+ ". The available data are: " + cachedData.get(dbname).get(release).keySet();
+ ". The available data are: " + dbInfo.getCacheData().get(release).keySet();
throw new CellBaseException(msg);
}
}
} finally {
rwLockMap.get(dbname).writeLock().unlock();
dbInfo.getRwLock().writeLock().unlock();
}
}

// Method to get collection name based on the data and the release
public MongoDBCollection getMongoDBCollection(String dbname, String data, int release) throws CellBaseException {
checkDataRelease(dbname, release, data);
return cachedData.get(dbname).get(release).get(data);

DatabaseInfo dbInfo = getDatabaseInfo(dbname);
return dbInfo.getCacheData().get(release).get(data);
}

private void handleDocumentChange(ChangeStreamDocument<Document> changeStreamDocument) throws CellBaseException {
Expand All @@ -183,17 +195,24 @@ private void handleDocumentChange(ChangeStreamDocument<Document> changeStreamDoc
String collectionName = changeStreamDocument.getNamespace().getCollectionName();
LOGGER.info("Collection {} of database {} has been updated", collectionName, dbname);

DatabaseInfo dbInfo = getDatabaseInfo(dbname);

// Handle the change event
if (!cachedData.containsKey(dbname)) {
// If the data release is invalid, throw an exception
String msg = UNKOWN_DATABASE_MSG_PREFIX + dbname;
throw new CellBaseException(msg);
}
rwLockMap.get(dbname).writeLock().lock();
dbInfo.getRwLock().writeLock().lock();
try {
loadData(dbname);
} finally {
rwLockMap.get(dbname).writeLock().unlock();
dbInfo.getRwLock().writeLock().unlock();
}
}

private DatabaseInfo getDatabaseInfo(String dbname) throws CellBaseException {
if (!dbInfoMap.containsKey(dbname)) {
// Unknown database
String msg = UNKOWN_DATABASE_MSG_PREFIX + dbname;
throw new CellBaseException(msg);
}

return dbInfoMap.get(dbname);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright 2015-2020 OpenCB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.opencb.cellbase.lib.impl.core.singleton;

import org.opencb.commons.datastore.mongodb.MongoDBCollection;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class DatabaseInfo {
private String dbName;
private String species;
private String assembly;
private ReentrantReadWriteLock rwLock;
private Map<Integer, Map<String, MongoDBCollection>> cacheData;

public DatabaseInfo() {
this.rwLock = new ReentrantReadWriteLock();
this.cacheData = new HashMap<>();
}

public DatabaseInfo(String dbName, String species, String assembly, ReentrantReadWriteLock rwLock, Map<Integer,
Map<String, MongoDBCollection>> cacheData) {
this.dbName = dbName;
this.species = species;
this.assembly = assembly;
this.rwLock = rwLock;
this.cacheData = cacheData;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("DatabaseInfo{");
sb.append("dbName='").append(dbName).append('\'');
sb.append(", species='").append(species).append('\'');
sb.append(", assembly='").append(assembly).append('\'');
sb.append(", rwLock=").append(rwLock);
sb.append(", cacheData=").append(cacheData);
sb.append('}');
return sb.toString();
}

public String getDbName() {
return dbName;
}

public DatabaseInfo setDbName(String dbName) {
this.dbName = dbName;
return this;
}

public String getSpecies() {
return species;
}

public DatabaseInfo setSpecies(String species) {
this.species = species;
return this;
}

public String getAssembly() {
return assembly;
}

public DatabaseInfo setAssembly(String assembly) {
this.assembly = assembly;
return this;
}

public ReentrantReadWriteLock getRwLock() {
return rwLock;
}

public DatabaseInfo setRwLock(ReentrantReadWriteLock rwLock) {
this.rwLock = rwLock;
return this;
}

public Map<Integer, Map<String, MongoDBCollection>> getCacheData() {
return cacheData;
}

public DatabaseInfo setCacheData(Map<Integer, Map<String, MongoDBCollection>> cacheData) {
this.cacheData = cacheData;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.opencb.cellbase.core.config.SpeciesConfiguration;
import org.opencb.cellbase.core.exception.CellBaseException;
import org.opencb.cellbase.core.utils.SpeciesUtils;
import org.opencb.cellbase.lib.impl.core.DataReleaseSingleton;
import org.opencb.cellbase.lib.impl.core.singleton.DataReleaseSingleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opencb.cellbase.core.result.CellBaseDataResult;
import org.opencb.cellbase.lib.GenericMongoDBAdaptorTest;
import org.opencb.cellbase.lib.db.MongoDBManager;
import org.opencb.cellbase.lib.impl.core.singleton.DataReleaseSingleton;
import org.opencb.cellbase.lib.managers.GeneManager;
import org.opencb.commons.datastore.mongodb.MongoDBCollection;
import org.opencb.commons.datastore.mongodb.MongoDataStore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.opencb.cellbase.core.result.CellBaseDataResponse;
import org.opencb.cellbase.core.result.CellBaseDataResult;
import org.opencb.cellbase.core.utils.SpeciesUtils;
import org.opencb.cellbase.lib.impl.core.DataReleaseSingleton;
import org.opencb.cellbase.lib.impl.core.singleton.DataReleaseSingleton;
import org.opencb.cellbase.lib.managers.CellBaseManagerFactory;
import org.opencb.cellbase.lib.managers.DataReleaseManager;
import org.opencb.cellbase.lib.managers.MetaManager;
Expand Down

0 comments on commit 8665fa5

Please sign in to comment.