Skip to content

Commit

Permalink
lib: improve thread management in data release singleton, #TASK-6565
Browse files Browse the repository at this point in the history
  • Loading branch information
jtarraga committed Aug 13, 2024
1 parent 68a60f1 commit 62dd902
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,17 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public final class DataReleaseSingleton {

// Map where the key is dbname and the value is the DatabaseInfo
private Map<String, DatabaseInfo> dbInfoMap = new HashMap<>();

private ExecutorService executorService;
private CellBaseManagerFactory managerFactory;

private static DataReleaseSingleton instance;
Expand All @@ -58,6 +62,21 @@ public final class DataReleaseSingleton {

// Private constructor to prevent instantiation
private DataReleaseSingleton(CellBaseManagerFactory managerFactory) throws CellBaseException {
// Create the executor service with a 4-thread pool and add shutdown hook to terminate thread pool
this.executorService = Executors.newFixedThreadPool(4);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
LOGGER.info("Shutting down thread pool...");
executorService.shutdownNow();
try {
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
LOGGER.info("Thread pool did not terminate in the specified time.");
}
} catch (InterruptedException e) {
LOGGER.info("Thread pool termination interrupted.");
Thread.currentThread().interrupt();
}
}));

this.managerFactory = managerFactory;

// Support multi species and assemblies
Expand All @@ -67,7 +86,7 @@ private DataReleaseSingleton(CellBaseManagerFactory managerFactory) throws CellB
String databaseName = MongoDBManager.getDatabaseName(vertebrate.getId(), assembly.getName(), configuration.getVersion());
// This is necessary, before creating the database name the assembly is "cleaned", and we need to get the data release
// manager from the species and the assembly
DatabaseInfo dbInfo = new DatabaseInfo(databaseName, vertebrate.getId(), assembly.getName(), new ReentrantReadWriteLock(),
DatabaseInfo dbInfo = new DatabaseInfo(databaseName, vertebrate.getId(), assembly.getName(), new ReentrantLock(),
new HashMap<>());
dbInfoMap.put(databaseName, dbInfo);

Expand All @@ -78,33 +97,31 @@ private DataReleaseSingleton(CellBaseManagerFactory managerFactory) throws CellB
LOGGER.info("Setting listener for database {} and collection {}", database.getName(), collection.getNamespace()
.getCollectionName());
// Set up the change stream for the collection
new Thread(() -> {
while (true) {
executorService.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).forEach(changeStreamDocument -> {
try {
handleDocumentChange(changeStreamDocument);
} catch (CellBaseException e) {
LOGGER.warn("Exception from handle document change function (database = {}, collection = {}): {}",
collection.getNamespace().getDatabaseName(), collection.getNamespace().getCollectionName(),
Arrays.toString(e.getStackTrace()));
collection.getNamespace().getDatabaseName(), collection.getNamespace().getCollectionName(), e);
}
});
} catch (Exception e) {
LOGGER.error("Failed to watch collection (database = {}, collection = {}): {}",
collection.getNamespace().getDatabaseName(), collection.getNamespace().getCollectionName(),
Arrays.toString(e.getStackTrace()));
collection.getNamespace().getDatabaseName(), collection.getNamespace().getCollectionName(), e);
try {
// Sleep 5 sec before retrying to avoid tight loop
Thread.sleep(5000);
} catch (InterruptedException ie) {
// Restore interrupt status
// Restore interrupt status, and exit loop if interrupted
Thread.currentThread().interrupt();
break; // Exit loop if interrupted
break;
}
}
}
}).start();
});
}
}
}
Expand Down Expand Up @@ -154,30 +171,33 @@ public void checkDataRelease(String dbname, int release) throws CellBaseExceptio
public void checkDataRelease(String dbname, int release, String data) throws CellBaseException {
DatabaseInfo dbInfo = getDatabaseInfo(dbname);

// Lock and load data if necessary
dbInfo.getRwLock().writeLock().lock();
try {
if (!dbInfo.getCacheData().containsKey(release)
|| (StringUtils.isNotEmpty(data) && !dbInfo.getCacheData().get(release).containsKey(data))) {
// Load the data releases from the MongoDB collection for that database name
loadData(dbname);

// Check after loading
if (!dbInfo.getCacheData().containsKey(release)) {
// If the release is invalid, throw an exception
String msg = INVALID_RELEASE_MSG_PREFIX + release + ". The available data releases are: "
+ dbInfo.getCacheData().keySet();
throw new CellBaseException(msg);
}
if (StringUtils.isNotEmpty(data) && !dbInfo.getCacheData().get(release).containsKey(data)) {
// If the data is invalid, throw an exception
String msg = INVALID_DATA_MSG_PREFIX + " '" + data + "', it's not present in release " + release
+ ". The available data are: " + dbInfo.getCacheData().get(release).keySet();
throw new CellBaseException(msg);
if (!dbInfo.getCacheData().containsKey(release)
|| (StringUtils.isNotEmpty(data) && !dbInfo.getCacheData().get(release).containsKey(data))) {
try {
// Lock and load data if necessary
dbInfo.getLock().lock();
if (!dbInfo.getCacheData().containsKey(release)
|| (StringUtils.isNotEmpty(data) && !dbInfo.getCacheData().get(release).containsKey(data))) {
// Load the data releases from the MongoDB collection for that database name
loadData(dbname);
}
} finally {
dbInfo.getLock().unlock();
}

// Check after loading
if (!dbInfo.getCacheData().containsKey(release)) {
// If the release is invalid, throw an exception
String msg = INVALID_RELEASE_MSG_PREFIX + release + ". The available data releases are: "
+ dbInfo.getCacheData().keySet();
throw new CellBaseException(msg);
}
if (StringUtils.isNotEmpty(data) && !dbInfo.getCacheData().get(release).containsKey(data)) {
// If the data is invalid, throw an exception
String msg = INVALID_DATA_MSG_PREFIX + " '" + data + "', it's not present in release " + release
+ ". The available data are: " + dbInfo.getCacheData().get(release).keySet();
throw new CellBaseException(msg);
}
} finally {
dbInfo.getRwLock().writeLock().unlock();
}
}

Expand All @@ -198,11 +218,11 @@ private void handleDocumentChange(ChangeStreamDocument<Document> changeStreamDoc
DatabaseInfo dbInfo = getDatabaseInfo(dbname);

// Handle the change event
dbInfo.getRwLock().writeLock().lock();
dbInfo.getLock().lock();
try {
loadData(dbname);
} finally {
dbInfo.getRwLock().writeLock().unlock();
dbInfo.getLock().unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,26 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;

public class DatabaseInfo {
private String dbName;
private String species;
private String assembly;
private ReentrantReadWriteLock rwLock;
private ReentrantLock lock;
private Map<Integer, Map<String, MongoDBCollection>> cacheData;

public DatabaseInfo() {
this.rwLock = new ReentrantReadWriteLock();
this.lock = new ReentrantLock();
this.cacheData = new HashMap<>();
}

public DatabaseInfo(String dbName, String species, String assembly, ReentrantReadWriteLock rwLock, Map<Integer,
Map<String, MongoDBCollection>> cacheData) {
public DatabaseInfo(String dbName, String species, String assembly, ReentrantLock lock,
Map<Integer, Map<String, MongoDBCollection>> cacheData) {
this.dbName = dbName;
this.species = species;
this.assembly = assembly;
this.rwLock = rwLock;
this.lock = lock;
this.cacheData = cacheData;
}

Expand All @@ -49,7 +49,7 @@ public String toString() {
sb.append("dbName='").append(dbName).append('\'');
sb.append(", species='").append(species).append('\'');
sb.append(", assembly='").append(assembly).append('\'');
sb.append(", rwLock=").append(rwLock);
sb.append(", lock=").append(lock);
sb.append(", cacheData=").append(cacheData);
sb.append('}');
return sb.toString();
Expand Down Expand Up @@ -82,12 +82,12 @@ public DatabaseInfo setAssembly(String assembly) {
return this;
}

public ReentrantReadWriteLock getRwLock() {
return rwLock;
public ReentrantLock getLock() {
return lock;
}

public DatabaseInfo setRwLock(ReentrantReadWriteLock rwLock) {
this.rwLock = rwLock;
public DatabaseInfo setRwLock(ReentrantLock lock) {
this.lock = lock;
return this;
}

Expand Down

0 comments on commit 62dd902

Please sign in to comment.