From b96b9a5afa26d5f3f25829c7d1c587dcea4b6712 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Fri, 23 Sep 2022 18:08:05 -0600 Subject: [PATCH 01/23] Create new class to run cleanup until complete in separate thread, with periodic wakeup --- .../DefaultNotificationListener.java | 52 +++--- .../DefaultNotificationReceiver.java | 54 ++++-- .../DefaultNotificationSender.java | 2 +- .../distribution/HeartbeatListener.java | 3 +- .../NotificationIndexCleanup.java | 130 +++++++++++++ .../NotificationIndexCleanupTest.java | 175 ++++++++++++++++++ 6 files changed, 366 insertions(+), 50 deletions(-) create mode 100644 src/main/java/gov/usgs/earthquake/distribution/NotificationIndexCleanup.java create mode 100644 src/test/java/gov/usgs/earthquake/distribution/NotificationIndexCleanupTest.java diff --git a/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationListener.java b/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationListener.java index 1ddc08e7..aed7b1de 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationListener.java +++ b/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationListener.java @@ -33,7 +33,7 @@ * @see gov.usgs.earthquake.product.AbstractListener */ public class DefaultNotificationListener extends AbstractListener implements - NotificationListener { + NotificationListener, NotificationIndexCleanup.Listener { /** Logging object. */ private static final Logger LOGGER = Logger @@ -73,6 +73,9 @@ public class DefaultNotificationListener extends AbstractListener implements /** Timer that schedules sender cleanup task. */ private Timer cleanupTimer = null; + /** Notification index cleanup. */ + private NotificationIndexCleanup notificationCleanup = null; + /** How many products to process at the same time. */ private int concurrentProducts = 1; @@ -266,43 +269,26 @@ protected void onAfterProcessNotification(final Notification notification) * @param notification to be removed * @throws Exception if error occurs */ - protected void onExpiredNotification(final Notification notification) + @Override + public void onExpiredNotification(final Notification notification) throws Exception { // nothing to do } - /** + /** * Periodic cleanup task. * * Called every cleanupInterval milliseconds. */ - public void cleanup() { + public void cleanup() throws Exception { LOGGER.finer("[" + getName() + "] running listener cleanup"); - try { - if (notificationIndex != null) { - Iterator iter = notificationIndex - .findExpiredNotifications().iterator(); - while (iter.hasNext()) { - Notification notification = iter.next(); - - // let subclasses remove other stuff first - onExpiredNotification(notification); - - // remove expired notification from index - notificationIndex.removeNotification(notification); - - if (LOGGER.isLoggable(Level.FINEST)) { - LOGGER.finest("[" - + getName() - + "] removed expired notification from sender index " - + notification.getProductId().toString()); - } - } - } - } catch (Exception e) { - LOGGER.log(Level.WARNING, "Exception during listener cleanup", e); + if (this.notificationCleanup == null) { + this.notificationCleanup = new NotificationIndexCleanup(this.notificationIndex, this); + this.notificationCleanup.startup(); + } else { + this.notificationCleanup.wakeUp(); } - }; + } @Override public void startup() throws Exception { @@ -320,7 +306,7 @@ public void run() { cleanup(); } catch (Exception e) { LOGGER.log(Level.WARNING, "[" + getName() - + "] exception during sender cleanup", e); + + "] exception during cleanup", e); } } }, 0, cleanupInterval); @@ -335,6 +321,14 @@ public void shutdown() throws Exception { } catch (Exception e) { // ignore } + if (this.notificationCleanup != null) { + try { + this.notificationCleanup.shutdown(); + } catch (Exception ignore) { + } finally { + this.notificationCleanup = null; + } + } try { this.cleanupTimer.cancel(); } catch (Exception e) { diff --git a/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationReceiver.java b/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationReceiver.java index 743a8d4e..f119fca4 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationReceiver.java +++ b/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationReceiver.java @@ -50,7 +50,7 @@ * allocated one thread to process notifications from this queue. */ public class DefaultNotificationReceiver extends DefaultConfigurable implements - NotificationReceiver { + NotificationReceiver, NotificationIndexCleanup.Listener { /** Logging object. */ private static final Logger LOGGER = Logger @@ -119,6 +119,9 @@ public class DefaultNotificationReceiver extends DefaultConfigurable implements /** Timer that schedules receiver cleanup task. */ private Timer receiverCleanupTimer = new Timer(); + /** Notification cleanup */ + private NotificationIndexCleanup notificationCleanup = null; + private int connectTimeout = Integer.parseInt(DEFAULT_CONNECT_TIMEOUT); private int readTimeout = Integer.parseInt(DEFAULT_READ_TIMEOUT); @@ -235,29 +238,36 @@ public String getListenerQueueStatus() { * removed. * * @throws Exception - * if productStorage or notificationIndex throw an Exception. + * if NotificationIndexCleanup throws an Exception. */ public void removeExpiredNotifications() throws Exception { LOGGER.fine("[" + getName() + "] running receiver cleanup"); - Iterator iter = notificationIndex - .findExpiredNotifications().iterator(); - while (iter.hasNext()) { - Notification notification = iter.next(); - if (!(notification instanceof URLNotification)) { - // if it isn't a url notification, it's also in storage - productStorage.removeProduct(notification.getProductId()); - if (LOGGER.isLoggable(Level.FINEST)) { - LOGGER.finest("[" + getName() - + "] removed expired product from receiver cache " - + notification.getProductId().toString()); - } - } + // use NotificationIndexCleanup to manage cleanup in separate thread + if (this.notificationCleanup == null) { + this.notificationCleanup = new NotificationIndexCleanup(this.notificationIndex, this); + this.notificationCleanup.startup(); + } else { + this.notificationCleanup.wakeUp(); + } + } - // remove expired notification from index - notificationIndex.removeNotification(notification); + /** + * Callback from the NotificationIndexCleanup thread. + * + * Checks if Notification refers to a product in storage, + * which should also be removed. + * + * @param notification + * expired notification about to be removed. + * @throws Exception + */ + public void onExpiredNotification(final Notification notification) throws Exception { + if (!(notification instanceof URLNotification)) { + // if it isn't a url notification, it's also in storage + productStorage.removeProduct(notification.getProductId()); if (LOGGER.isLoggable(Level.FINEST)) { LOGGER.finest("[" + getName() - + "] removed expired notification from receiver index " + + "] removed expired product from receiver cache " + notification.getProductId().toString()); } } @@ -593,7 +603,13 @@ public void configure(Config config) throws Exception { public void shutdown() throws Exception { receiverCleanupTimer.cancel(); - + if (notificationCleanup != null) { + try { + notificationCleanup.shutdown(); + notificationCleanup = null; + } catch (Exception ignore) { + } + } try { notifier.shutdown(); } catch (Exception ignore) { diff --git a/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationSender.java b/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationSender.java index 9af3a0a2..9c4f8505 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationSender.java +++ b/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationSender.java @@ -178,7 +178,7 @@ protected void onAfterProcessNotification(final Notification notification) { * @throws Exception if something goes wrong */ @Override - protected void onExpiredNotification(final Notification notification) throws Exception{ + public void onExpiredNotification(final Notification notification) throws Exception{ List notifications = getNotificationIndex() .findNotifications(notification.getProductId()); if (notifications.size() <= 1) { diff --git a/src/main/java/gov/usgs/earthquake/distribution/HeartbeatListener.java b/src/main/java/gov/usgs/earthquake/distribution/HeartbeatListener.java index 09061705..128a6a31 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/HeartbeatListener.java +++ b/src/main/java/gov/usgs/earthquake/distribution/HeartbeatListener.java @@ -200,7 +200,8 @@ public String formatHeartbeatOutput() { * purge heartbeat key/values older than storageTimeout, also purging empty * heartbeats */ - public void cleanup() { + @Override + public void cleanup() throws Exception { super.cleanup(); if (this.storageTimeout == 0) { diff --git a/src/main/java/gov/usgs/earthquake/distribution/NotificationIndexCleanup.java b/src/main/java/gov/usgs/earthquake/distribution/NotificationIndexCleanup.java new file mode 100644 index 00000000..3f89f1c5 --- /dev/null +++ b/src/main/java/gov/usgs/earthquake/distribution/NotificationIndexCleanup.java @@ -0,0 +1,130 @@ +package gov.usgs.earthquake.distribution; + +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * NotificationIndexCleanup manages cleaning up expired notifications. + * + * Uses background thread to remove expired notifications while they exist, + * then uses wait/notify to pause until shutdown() or wakeUp() methods are called. + * + * NOTE: this class does not schedule periodic cleanup, and the wakeUp() method + * must be called periodically. + */ +public class NotificationIndexCleanup implements Runnable { + + private static final Logger LOGGER = Logger.getLogger(NotificationIndexCleanup.class.getName()); + + public final NotificationIndex index; + + // listener that can take additional actions during cleanup + public final Listener listener; + // object used to wake up cleanup thread + public final Object syncObject = new Object(); + // thread where cleanup loop runs + public Thread cleanupThread = null; + + public NotificationIndexCleanup(final NotificationIndex index, final Listener listener) { + this.index = index; + this.listener = listener; + } + + /** + * Notification cleanup thread loop. + * + * This method blocks and should probably not be called by you. + */ + public void run() { + LOGGER.finer("NotificationIndexCleanup starting for " + this.index.getName()); + while (true) { + // check here in case no exception was raised + if (Thread.interrupted()) { + // interrupt = shutdown + break; + } + int removed = 0; + try { + for (final Notification expired : this.index.findExpiredNotifications()) { + try { + if (this.listener != null) { + this.listener.onExpiredNotification(expired); + } + this.index.removeNotification(expired); + removed++; + } catch (Exception e) { + LOGGER.log(Level.FINE, "Exception removing expired notification from " + this.index.getName(), e); + } + } + LOGGER.fine("Removed " + removed + " expired notifications from " + this.index.getName()); + } catch (InterruptedException e) { + // interrupt = shutdown + break; + } catch (Exception e) { + LOGGER.log(Level.FINE, "Exception finding expired notifications from " + this.index.getName(), e); + } + try { + if (removed > 0) { + // keep removing expired notifications, don't wait for next interval + // but pause to let other threads access notification index + Thread.sleep(1L); + continue; + } + // wait for next interval before checking again + synchronized (syncObject) { + syncObject.wait(); + } + } catch (InterruptedException e) { + // interrupt = shutdown + break; + } + } + LOGGER.finer("NotificationIndexCleanup exiting for " + this.index.getName()); + this.cleanupThread = null; + } + + /** + * Start cleanup process. + * + * @throws Exception + */ + public void startup() throws Exception { + if (this.cleanupThread != null) { + throw new IllegalStateException("Already started"); + } + // start thread + this.cleanupThread = new Thread(this); + this.cleanupThread.start(); + } + + /** + * Stop cleanup process. + * + * @throws Exception + */ + public void shutdown() throws Exception { + if (this.cleanupThread == null) { + throw new IllegalStateException("Already stopped"); + } + // stop thread + this.cleanupThread.interrupt(); + this.cleanupThread.join(); + } + + /** + * Wake up the background thread if it is waiting. + */ + public void wakeUp() { + synchronized(syncObject) { + syncObject.notify(); + } + } + + /** + * Interface for cleanup listeners to take additional steps before a + * notification is removed. + */ + public static interface Listener { + public void onExpiredNotification(final Notification expired) throws Exception; + } +} diff --git a/src/test/java/gov/usgs/earthquake/distribution/NotificationIndexCleanupTest.java b/src/test/java/gov/usgs/earthquake/distribution/NotificationIndexCleanupTest.java new file mode 100644 index 00000000..8f724f1a --- /dev/null +++ b/src/test/java/gov/usgs/earthquake/distribution/NotificationIndexCleanupTest.java @@ -0,0 +1,175 @@ +package gov.usgs.earthquake.distribution; + +import java.util.ArrayList; +import java.util.Date; +import java.util.LinkedList; +import java.util.List; + +import org.junit.Assert; +import org.junit.Test; + +import gov.usgs.earthquake.product.ProductId; + +public class NotificationIndexCleanupTest { + + /** + * Test thread continues removing expired notifications before waiting. + * + * @throws Exception + */ + @Test + public void testNotificationCleanup() throws Exception{ + final MockNotificationIndex index = new MockNotificationIndex(); + final NotificationIndexCleanup cleanup = new NotificationIndexCleanup(index, null); + // thread should start and wait + cleanup.startup(); + Assert.assertEquals(index.removedNotifications.size(), 0); + + // use this notification multiple times for testing + final Notification notification = new DefaultNotification( + new ProductId("source", "type", "code"), + new Date(), + null + ); + final List notifications = new LinkedList<>(); + notifications.add(notification); + notifications.add(notification); + + // run cleanup thread, should remove 2 notifications + index.expiredNotificationsReturns.add(notifications); + cleanup.wakeUp(); + Thread.sleep(1L); + Assert.assertEquals(index.removedNotifications.size(), 2); + + // run with multiple findExpiredNotifications returns, should remove all + index.removedNotifications.clear(); + index.expiredNotificationsReturns.add(notifications); + index.expiredNotificationsReturns.add(notifications); + cleanup.wakeUp(); + Thread.sleep(2L); + Assert.assertEquals(index.removedNotifications.size(), 4); + } + + /** + * Test listener is notified before notification is removed. + * + * @throws Exception + */ + @Test + public void testNotificationCleanupListener() throws Exception{ + final MockNotificationIndex index = new MockNotificationIndex(); + // capture notifications listener receives + final LinkedList listenerNotifications = new LinkedList<>(); + final NotificationIndexCleanup cleanup = new NotificationIndexCleanup(index, (Notification expired) -> { + listenerNotifications.add(expired); + // check size before return + Assert.assertEquals(listenerNotifications.size() - 1, index.removedNotifications.size()); + }); + // thread should start and wait + cleanup.startup(); + Assert.assertEquals(0, index.removedNotifications.size()); + Assert.assertEquals(0, listenerNotifications.size()); + // use this notification multiple times for testing + final Notification notification = new DefaultNotification( + new ProductId("source", "type", "code"), + new Date(), + null + ); + final List notifications = new LinkedList<>(); + notifications.add(notification); + notifications.add(notification); + + // run cleanup thread, should remove 2 notifications + index.expiredNotificationsReturns.add(notifications); + cleanup.wakeUp(); + Thread.sleep(5L); + Assert.assertEquals(2, index.removedNotifications.size()); + Assert.assertEquals(2, listenerNotifications.size()); + // run with multiple findExpiredNotifications returns, should remove all + index.removedNotifications.clear(); + listenerNotifications.clear(); + index.expiredNotificationsReturns.add(notifications); + index.expiredNotificationsReturns.add(notifications); + cleanup.wakeUp(); + Thread.sleep(5L); + Assert.assertEquals(4, index.removedNotifications.size()); + Assert.assertEquals(4, listenerNotifications.size()); + } + + /** + * Test notification is not removed when listener throws exception. + * + * @throws Exception + */ + @Test + public void testNotificationCleanupListenerException() throws Exception{ + final MockNotificationIndex index = new MockNotificationIndex(); + // capture notifications listener receives + final LinkedList listenerNotifications = new LinkedList<>(); + final LinkedList listenerThrow = new LinkedList<>(); + final NotificationIndexCleanup cleanup = new NotificationIndexCleanup(index, (Notification expired) -> { + if (listenerThrow.size() > 0) { + throw new Exception("listener error"); + } + listenerNotifications.add(expired); + }); + // thread should start and wait + cleanup.startup(); + Assert.assertEquals(0, index.removedNotifications.size()); + Assert.assertEquals(0, listenerNotifications.size()); + // use this notification multiple times for testing + final Notification notification = new DefaultNotification( + new ProductId("source", "type", "code"), + new Date(), + null + ); + final List notifications = new LinkedList<>(); + notifications.add(notification); + notifications.add(notification); + + // run with listener throwing exceptions + index.expiredNotificationsReturns.add(notifications); + listenerThrow.add(true); + cleanup.wakeUp(); + Thread.sleep(1L); + Assert.assertEquals(0, index.removedNotifications.size()); + Assert.assertEquals(0, listenerNotifications.size()); + // run with multiple findExpiredNotifications returns, should remove all + index.expiredNotificationsReturns.add(notifications); + listenerThrow.clear(); + cleanup.wakeUp(); + Thread.sleep(1L); + Assert.assertEquals(2, index.removedNotifications.size()); + Assert.assertEquals(2, listenerNotifications.size()); + } + + /** Class for testing NotificationIndexCleanup interaction with a NotificationIndex */ + public static class MockNotificationIndex extends JDBCNotificationIndex { + public List> expiredNotificationsReturns = new LinkedList<>(); + public List removedNotifications = new LinkedList<>(); + + public MockNotificationIndex() throws Exception { + super(); + } + + // not using actual database connection for this test + public void startup() throws Exception {} + public void shutdown() throws Exception {} + + /** + * Mock the find expired notification method to return lists from expiredNotificationsReturns. + */ + public List findExpiredNotifications() throws Exception { + if (expiredNotificationsReturns == null || expiredNotificationsReturns.size() == 0) { + return new ArrayList(); + } + return expiredNotificationsReturns.remove(0); + } + + @Override + public void removeNotification(final Notification notification) throws Exception { + removedNotifications.add(notification); + } + } + +} From ecfc3dfd66364d651409870376f4b56723d04097 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Fri, 23 Sep 2022 18:08:38 -0600 Subject: [PATCH 02/23] JsonNotificationIndex: only return non-expired notifications for getMissing --- .../gov/usgs/earthquake/aws/JsonNotificationIndex.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java b/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java index 1ab76cea..bc63eefd 100644 --- a/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java +++ b/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java @@ -572,7 +572,9 @@ public synchronized List getMissingNotifications( + " '' as created, t.expires, t.source, t.type, t.code, t.updateTime" + ", '' as url, null as data" + " FROM " + this.table + " t" - + " WHERE NOT EXISTS (" + // only missing if not expired + + " WHERE t.expires >= ?" + + " AND NOT EXISTS (" + "SELECT * FROM " + otherTable + " WHERE source=t.source AND type=t.type" + " AND code=t.code AND updatetime=t.updateTime" @@ -582,6 +584,10 @@ public synchronized List getMissingNotifications( try (final PreparedStatement statement = getConnection().prepareStatement(sql)) { try { statement.setQueryTimeout(1800); + + // set parameters + statement.setString(1, Instant.now().toString()); + // execute and commit if successful final List notifications = getNotifications(statement); commitTransaction(); From 901aac87e424c050ff4481bae94e736b19f4db63 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Mon, 26 Sep 2022 18:04:20 -0600 Subject: [PATCH 03/23] JsonNotificationIndex, update logging and optimize queries --- .../usgs/earthquake/aws/JsonNotificationIndex.java | 14 +++++++------- .../earthquake/aws/JsonNotificationIndexTest.java | 10 ++++------ 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java b/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java index bc63eefd..7e613a4c 100644 --- a/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java +++ b/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java @@ -292,7 +292,9 @@ public synchronized void removeNotification(Notification notification) throws Ex final String sql = "DELETE FROM " + this.table + " WHERE created=? AND expires=? AND source=? AND type=? AND code=?" + " AND updatetime=? AND url=? AND data" - + (product == null ? " IS NULL" : "=?"); + // created is _very_ specific and is set when product is not null + // skip overhead of embedding product in query + + (product == null ? " IS NULL" : " IS NOT NULL"); beginTransaction(); try (final PreparedStatement statement = getConnection().prepareStatement(sql)) { try { @@ -305,10 +307,6 @@ public synchronized void removeNotification(Notification notification) throws Ex statement.setString(5, id.getCode()); statement.setLong(6, id.getUpdateTime().getTime()); statement.setString(7, url != null ? url.toString() : ""); - if (product != null) { - statement.setString(8, - new JsonProduct().getJsonObject(product).toString()); - } // execute statement.executeUpdate(); commitTransaction(); @@ -440,7 +438,9 @@ public synchronized List findNotifications( sql = "SELECT DISTINCT" + " '' as created, expires, source, type, code, updateTime" + ", '' as url, null as data" - + " FROM " + this.table; + + " FROM " + this.table + + " WHERE expires > ?"; + values.add(Instant.now().toString()); } // prepare statement beginTransaction(); @@ -573,7 +573,7 @@ public synchronized List getMissingNotifications( + ", '' as url, null as data" + " FROM " + this.table + " t" // only missing if not expired - + " WHERE t.expires >= ?" + + " WHERE t.expires > ?" + " AND NOT EXISTS (" + "SELECT * FROM " + otherTable + " WHERE source=t.source AND type=t.type" diff --git a/src/test/java/gov/usgs/earthquake/aws/JsonNotificationIndexTest.java b/src/test/java/gov/usgs/earthquake/aws/JsonNotificationIndexTest.java index dd567112..d661b193 100644 --- a/src/test/java/gov/usgs/earthquake/aws/JsonNotificationIndexTest.java +++ b/src/test/java/gov/usgs/earthquake/aws/JsonNotificationIndexTest.java @@ -15,7 +15,6 @@ import java.sql.ResultSet; import java.util.ArrayList; import java.util.Date; -import java.util.Iterator; import java.util.List; import org.junit.After; @@ -355,19 +354,18 @@ public void testFindNotificationsByDataLists() throws Exception { // functionality. List found = null; - Iterator iter = null; List sources = null; List types = null; List codes = null; // Check the null null null case. found = index.findNotifications(sources, types, codes); - + final Date now = new Date(); List all = index .getNotifications(_query_allNotifications); - iter = all.iterator(); - while (iter.hasNext()) { - Assert.assertTrue(contains(found, iter.next())); + for (final Notification notification : all) { + Assert.assertTrue(contains(found, notification) + || notification.getExpirationDate().getTime() < now.getTime()); } // Check a search by sources only From 19757f65959c337c235fe226cd5f86db949a33e2 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Mon, 26 Sep 2022 18:06:29 -0600 Subject: [PATCH 04/23] ReliableIndexerListener, update logging and synchronized access --- .../indexer/ExtentIndexerListener.java | 10 ++++++---- .../indexer/ReliableIndexerListener.java | 17 +++++++++++------ 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/src/main/java/gov/usgs/earthquake/indexer/ExtentIndexerListener.java b/src/main/java/gov/usgs/earthquake/indexer/ExtentIndexerListener.java index c9e2deba..16fd6c7f 100644 --- a/src/main/java/gov/usgs/earthquake/indexer/ExtentIndexerListener.java +++ b/src/main/java/gov/usgs/earthquake/indexer/ExtentIndexerListener.java @@ -47,7 +47,9 @@ public void configure(Config config) throws Exception { */ @Override protected void onBeforeProcessThreadStart() throws Exception { - setLastIndexId(((ExtentIndex) productIndex).getLastExtentIndexId()); + long lastIndexId = ((ExtentIndex) productIndex).getLastExtentIndexId(); + LOGGER.fine("[" + getName() + "] last index id " + lastIndexId); + setLastIndexId(lastIndexId); } /** @@ -63,12 +65,12 @@ public void processProduct(ProductSummary product) throws Exception { ExtentSummary extent = new ExtentSummary(product); if (extent.isValid()) { - LOGGER.log(Level.FINE, "[" + getName() + "] adding product " + LOGGER.log(Level.FINEST, "[" + getName() + "] adding product " + product.getId().toString() + " to extent table"); ((ExtentIndex) productIndex).addExtentSummary(extent); } else { - LOGGER.log(Level.FINE, "[" + getName() + "] product " + LOGGER.log(Level.FINEST, "[" + getName() + "] product " + product.getId().toString() + " has no extent information; won't add to extent table"); } @@ -76,4 +78,4 @@ public void processProduct(ProductSummary product) throws Exception { setLastIndexId(product.getIndexId()); } -} \ No newline at end of file +} diff --git a/src/main/java/gov/usgs/earthquake/indexer/ReliableIndexerListener.java b/src/main/java/gov/usgs/earthquake/indexer/ReliableIndexerListener.java index 44d19699..dbace358 100644 --- a/src/main/java/gov/usgs/earthquake/indexer/ReliableIndexerListener.java +++ b/src/main/java/gov/usgs/earthquake/indexer/ReliableIndexerListener.java @@ -37,7 +37,7 @@ public class ReliableIndexerListener extends DefaultIndexerListener implements R protected static final Logger LOGGER = Logger .getLogger(ReliableIndexerListener.class.getName()); - private static final int PRODUCTS_PER_QUERY = 10; + private static final int PRODUCTS_PER_QUERY = 1000; private boolean stopThread = false; private long lastIndexId = -1; @@ -123,8 +123,10 @@ public void run() { for(ProductSummary summary : productList) { LOGGER.log(Level.FINEST,"[" + getName() + "] preparing to process product " + summary.getIndexId()); //Check for shutdown every iteration so we don't hog shutdown time - if (stopThread) { - break; + synchronized(syncObject) { + if (stopThread) { + break; + } } try { //Process the product types we're told to in configuration @@ -161,7 +163,10 @@ public void run() { public void startup() throws Exception{ super.startup(); this.onBeforeProcessThreadStart(); - this.processThread = new Thread(this); + synchronized(syncObject) { + stopThread = false; + this.processThread = new Thread(this); + } this.processThread.start(); } @@ -174,9 +179,9 @@ public void startup() throws Exception{ public void shutdown() throws Exception { try { LOGGER.log(Level.FINEST,"[" + getName() + "] trying to shut down..."); - stopThread = true; //When the thread is ready, tell it to stop synchronized (syncObject) { + stopThread = true; this.processThread.interrupt(); } this.processThread.join(); @@ -270,7 +275,7 @@ public List getNextProducts() throws Exception{ */ public void processProduct(final ProductSummary product) throws Exception { //Do stuff - LOGGER.log(Level.FINE,"[" + getName() + "] processing product " + product.getId()); + LOGGER.log(Level.FINER,"[" + getName() + "] processing product " + product.getId()); } From d10d527818f0e5000107bf03c249de96ad447690 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Mon, 26 Sep 2022 18:34:53 -0600 Subject: [PATCH 05/23] Update NotificationIndexCleanup loop to be more like reliable indexer, update logging --- .../DefaultNotificationListener.java | 13 ++- .../NotificationIndexCleanup.java | 106 ++++++++++-------- 2 files changed, 66 insertions(+), 53 deletions(-) diff --git a/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationListener.java b/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationListener.java index aed7b1de..f192d48a 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationListener.java +++ b/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationListener.java @@ -316,19 +316,20 @@ public void run() { @Override public void shutdown() throws Exception { super.shutdown(); - try { - this.notificationIndex.shutdown(); - } catch (Exception e) { - // ignore - } if (this.notificationCleanup != null) { try { this.notificationCleanup.shutdown(); - } catch (Exception ignore) { + } catch (Exception e) { + LOGGER.log(Level.INFO, "[" + getName() + "] exception stopping notification cleanup", e); } finally { this.notificationCleanup = null; } } + try { + this.notificationIndex.shutdown(); + } catch (Exception e) { + // ignore + } try { this.cleanupTimer.cancel(); } catch (Exception e) { diff --git a/src/main/java/gov/usgs/earthquake/distribution/NotificationIndexCleanup.java b/src/main/java/gov/usgs/earthquake/distribution/NotificationIndexCleanup.java index 3f89f1c5..aa36e9b8 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/NotificationIndexCleanup.java +++ b/src/main/java/gov/usgs/earthquake/distribution/NotificationIndexCleanup.java @@ -1,5 +1,6 @@ package gov.usgs.earthquake.distribution; +import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; @@ -7,7 +8,8 @@ * NotificationIndexCleanup manages cleaning up expired notifications. * * Uses background thread to remove expired notifications while they exist, - * then uses wait/notify to pause until shutdown() or wakeUp() methods are called. + * then uses wait/notify to pause until shutdown() or wakeUp() methods are + * called. * * NOTE: this class does not schedule periodic cleanup, and the wakeUp() method * must be called periodically. @@ -20,10 +22,13 @@ public class NotificationIndexCleanup implements Runnable { // listener that can take additional actions during cleanup public final Listener listener; - // object used to wake up cleanup thread + + // object used to synchronize state access between threads public final Object syncObject = new Object(); // thread where cleanup loop runs public Thread cleanupThread = null; + // whether thread should stop running + private boolean stopThread = false; public NotificationIndexCleanup(final NotificationIndex index, final Listener listener) { this.index = index; @@ -36,50 +41,51 @@ public NotificationIndexCleanup(final NotificationIndex index, final Listener li * This method blocks and should probably not be called by you. */ public void run() { - LOGGER.finer("NotificationIndexCleanup starting for " + this.index.getName()); - while (true) { - // check here in case no exception was raised - if (Thread.interrupted()) { - // interrupt = shutdown - break; - } - int removed = 0; - try { - for (final Notification expired : this.index.findExpiredNotifications()) { + final String indexName = this.index.getName(); + + LOGGER.finer(() -> "[" + indexName + "] NotificationIndexCleanup starting"); + // run until thread stopped + while (!stopThread) { + List expiredNotifications = null; + synchronized (syncObject) { + try { + expiredNotifications = this.index.findExpiredNotifications(); + } catch (Exception e) { + LOGGER.log(Level.INFO, e, () -> "[" + indexName + "] exception finding expired notifications"); + } + if (expiredNotifications == null || expiredNotifications.size() == 0) { + // Wait for expired notifications to process try { - if (this.listener != null) { - this.listener.onExpiredNotification(expired); - } - this.index.removeNotification(expired); - removed++; - } catch (Exception e) { - LOGGER.log(Level.FINE, "Exception removing expired notification from " + this.index.getName(), e); + syncObject.wait(); + } catch (InterruptedException ignore) { + // signal from another thread (stopThread checked above) + continue; } } - LOGGER.fine("Removed " + removed + " expired notifications from " + this.index.getName()); - } catch (InterruptedException e) { - // interrupt = shutdown - break; - } catch (Exception e) { - LOGGER.log(Level.FINE, "Exception finding expired notifications from " + this.index.getName(), e); } - try { - if (removed > 0) { - // keep removing expired notifications, don't wait for next interval - // but pause to let other threads access notification index - Thread.sleep(1L); - continue; - } - // wait for next interval before checking again + + // remove batch of expired notifications + int removed = 0; + for (final Notification expired : expiredNotifications) { synchronized (syncObject) { - syncObject.wait(); + if (stopThread) { + break; + } + } + try { + if (this.listener != null) { + this.listener.onExpiredNotification(expired); + } + this.index.removeNotification(expired); + removed++; + } catch (Exception e) { + LOGGER.log(Level.WARNING, e, () -> "[" + indexName + "] Exception removing expired notification"); } - } catch (InterruptedException e) { - // interrupt = shutdown - break; } + final int total = removed; + LOGGER.fine(() -> "[" + indexName + "] Removed " + total + " expired notifications"); } - LOGGER.finer("NotificationIndexCleanup exiting for " + this.index.getName()); + LOGGER.finer(() -> "[" + indexName + "] NotificationIndexCleanup exiting"); this.cleanupThread = null; } @@ -89,11 +95,14 @@ public void run() { * @throws Exception */ public void startup() throws Exception { - if (this.cleanupThread != null) { - throw new IllegalStateException("Already started"); + synchronized (syncObject) { + if (this.cleanupThread != null) { + throw new IllegalStateException("Already started"); + } + // start thread + stopThread = false; + this.cleanupThread = new Thread(this); } - // start thread - this.cleanupThread = new Thread(this); this.cleanupThread.start(); } @@ -103,11 +112,14 @@ public void startup() throws Exception { * @throws Exception */ public void shutdown() throws Exception { - if (this.cleanupThread == null) { - throw new IllegalStateException("Already stopped"); + synchronized (syncObject) { + if (this.cleanupThread == null) { + throw new IllegalStateException("Already stopped"); + } + // stop thread + stopThread = true; + this.cleanupThread.interrupt(); } - // stop thread - this.cleanupThread.interrupt(); this.cleanupThread.join(); } @@ -115,7 +127,7 @@ public void shutdown() throws Exception { * Wake up the background thread if it is waiting. */ public void wakeUp() { - synchronized(syncObject) { + synchronized (syncObject) { syncObject.notify(); } } From 49ccbda7aad59c6ba616d08ae52bb7784ec70831 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Mon, 26 Sep 2022 19:07:26 -0600 Subject: [PATCH 06/23] Update SimpleLogFormatter to use Formatter.formatMessage to format parameters --- .../java/gov/usgs/util/logging/SimpleLogFormatter.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/java/gov/usgs/util/logging/SimpleLogFormatter.java b/src/main/java/gov/usgs/util/logging/SimpleLogFormatter.java index 6e0558e2..9b1387a4 100644 --- a/src/main/java/gov/usgs/util/logging/SimpleLogFormatter.java +++ b/src/main/java/gov/usgs/util/logging/SimpleLogFormatter.java @@ -16,9 +16,9 @@ /** * Simple(r) log formatter for java.util.logging messages. - * + * * Outputs unique dates once, with all messages sharing that time tab indented below. - * + * * Example Format: *
  * Wed Sep 30 19:31:48 GMT 2009
@@ -31,7 +31,7 @@
  * INFO    [logging_client] received urn:earthquake-usgs-gov:shakemap-scraper:global:2009medd:1
  * INFO    [shakemap] received urn:earthquake-usgs-gov:shakemap-scraper:global:2009medd:1
  * 
- * + * */ public class SimpleLogFormatter extends Formatter { @@ -47,7 +47,7 @@ public SimpleLogFormatter() { /** * Format a LogRecord for output. - * + * * @param record * LogRecord to format. * @return formatted LogRecord as String. @@ -72,7 +72,7 @@ public final String format(final LogRecord record) { // add log message buf.append(record.getLevel().toString()); buf.append("\tthread=").append(record.getThreadID()); - buf.append("\t").append(record.getMessage()); + buf.append("\t").append(formatMessage(record)); buf.append("\n"); // output any associated exception From 4e4aa57cb8266594515b3e2499ed7f77d82cc423 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Mon, 26 Sep 2022 21:01:44 -0600 Subject: [PATCH 07/23] JDBCProductIndex, update how query is built and actually use order by and limit query props --- .../earthquake/indexer/JDBCProductIndex.java | 73 +++++++++---------- .../indexer/JDBCProductIndexTest.java | 24 ++++++ 2 files changed, 60 insertions(+), 37 deletions(-) diff --git a/src/main/java/gov/usgs/earthquake/indexer/JDBCProductIndex.java b/src/main/java/gov/usgs/earthquake/indexer/JDBCProductIndex.java index 08406551..66801ea2 100644 --- a/src/main/java/gov/usgs/earthquake/indexer/JDBCProductIndex.java +++ b/src/main/java/gov/usgs/earthquake/indexer/JDBCProductIndex.java @@ -376,7 +376,7 @@ public synchronized List getUnassociatedProducts( final List clauseList = buildProductClauses(query); // Add the unassociated quantifier to the clause list clauseList.add("eventId IS NULL"); - final String sql = buildProductQuery(clauseList); + final String sql = buildProductQuery(query, clauseList); try ( final PreparedStatement statement = getConnection().prepareStatement(sql); @@ -430,10 +430,10 @@ public synchronized List getProducts(ProductIndexQuery query) */ public synchronized List getProducts(ProductIndexQuery query, final boolean loadDetails) throws Exception { - final List clauseList = buildProductClauses(query); - final String sql = buildProductQuery(clauseList); + final String sql = buildProductQuery(query); final List products = new LinkedList(); + LOGGER.finer("Executing query " + sql); try ( final PreparedStatement statement = getConnection().prepareStatement(sql); ) { @@ -998,51 +998,50 @@ else if (query.getResultType() == ProductIndexQuery.RESULT_TYPE_SUPERSEDED) { } /** - * Create the full SELECT query for the products table using the clauseList - * as the WHERE clause + * Create the full SELECT query for the products table using the default clauseList. * - * @param clauseList - * List of Strings to be AND'd together in the WHERE clause - * @param orderby - * Complete ORDER BY clause to be added after the WHERE clause + * @param query + * Query to build. * @return String containing the full SELECT query + * @see #buildProductClauses(ProductIndexQuery) */ - protected String buildProductQuery(List clauseList, String orderby) { - // Join all the clauses into a WHERE clause - StringBuilder whereClause = new StringBuilder(); - String and = " AND "; - boolean first = true; - for (String clause : clauseList) { - if (!first) { - whereClause.append(and); - } else { - first = false; - } - whereClause.append(clause); - } + protected String buildProductQuery(final ProductIndexQuery query) { - String query_prefix = String - .format("SELECT * FROM %s p", SUMMARY_TABLE); - String query_suffix = ""; - if (whereClause.length() > 0) { - query_suffix = String.format(" WHERE %s", whereClause.toString()); - } - String query_text = query_prefix + query_suffix + " " + orderby; - - return query_text; + final List clauseList = buildProductClauses(query); + return buildProductQuery(query, clauseList); } /** - * Create the full SELECT query for the products table using the clauseList - * as the WHERE clause. This method is a wrapper for - * {@link #buildProductQuery(List, String)} with an empty - * orderby string + * Create the full SELECT query for the products table using a custom clauseList. * + * @param query + * Query to build. * @param clauseList List of clauses for WHERE * @return String containing the full SELECT query */ - protected String buildProductQuery(List clauseList) { - return buildProductQuery(clauseList, ""); + protected String buildProductQuery(final ProductIndexQuery query, final List clauseList) { + final StringBuffer sql = new StringBuffer(); + + sql.append("SELECT * FROM " + SUMMARY_TABLE + " p"); + + // optional where + if (clauseList.size() > 0) { + sql.append(" WHERE ").append(String.join(" AND ", clauseList)); + } + + // optional order by + String queryOrderBy = query.getOrderBy(); + if (queryOrderBy != null) { + sql.append(" ORDER BY ").append(queryOrderBy); + } + + // limit is after order by + Integer queryLimit = query.getLimit(); + if (queryLimit != null) { + sql.append(" LIMIT ").append(queryLimit); + } + + return sql.toString(); } /** diff --git a/src/test/java/gov/usgs/earthquake/indexer/JDBCProductIndexTest.java b/src/test/java/gov/usgs/earthquake/indexer/JDBCProductIndexTest.java index f5826e23..3330de36 100644 --- a/src/test/java/gov/usgs/earthquake/indexer/JDBCProductIndexTest.java +++ b/src/test/java/gov/usgs/earthquake/indexer/JDBCProductIndexTest.java @@ -258,6 +258,30 @@ public void getEventsTest() throws Exception { Assert.assertNotNull(event.getIndexId()); } + /** + * Query with limit includes limit clause. + * @throws Exception + */ + @Test + public void limitQueryTest() throws Exception { + query = new ProductIndexQuery(); + query.setLimit(123); + String sql = index.buildProductQuery(query); + Assert.assertTrue(sql.contains(" LIMIT 123")); + } + + /** + * Query with limit includes limit clause. + * @throws Exception + */ + @Test + public void orderByQueryTest() throws Exception { + query = new ProductIndexQuery(); + query.setOrderBy(JDBCProductIndex.SUMMARY_PRODUCT_INDEX_ID); + String sql = index.buildProductQuery(query); + Assert.assertTrue(sql.contains(" ORDER BY id")); + } + /** * Open a connection to a mysql database called productIndex on localhost * with the user: "test" and the password: "test". This test will From 0eaa14bbef2506c8017166b446d42947ae4455bd Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Mon, 26 Sep 2022 21:15:15 -0600 Subject: [PATCH 08/23] Update CONTRIBUTING, DISCLAIMER, and LICENSE --- CONTRIBUTING.md | 42 ++++++++++++++++++++++++++++ DISCLAIMER.md | 10 +++++++ LICENSE.md | 73 +++++++++++++++++++++++++++++++------------------ 3 files changed, 99 insertions(+), 26 deletions(-) create mode 100644 CONTRIBUTING.md create mode 100644 DISCLAIMER.md diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 00000000..8ab1e097 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,42 @@ +# Contributing + +Contributions are welcome from the community. + +## Bug Reports and Suggested Enhancements + +Bug Reports and Suggested Enhancements can be [submitted on the issues page][1]. + +Before creating a new issue, please take a moment to search and make sure a +similar issue does not already exist. If one does exist, you can comment +(most simply even with a :+1: (`:+1:`) to show your support for that issue. + +## Development + +This project uses [Gradle][4] to manage dependencies and build the project. + +- Clone project + + > Note: This project uses a [forking workflow][2]. + > Creating a fork is not required to get started, but is recommended. + +- Install dependencies + + - Java 8, 11, or 17 + +- Build project and run tests + + ``` + ./gradlew build + ``` + +### Merge Requests + +If you have direct contributions you would like considered for incorporation +into the project you can [fork this repository][2] and +[create a merge request][3] for review. + + +[1]: https://code.usgs.gov/ghsc/hazdev/pdl/issues +[2]: https://docs.gitlab.com/ee/user/project/repository/forking_workflow.html +[3]: https://docs.gitlab.com/ee/user/project/merge_requests/creating_merge_requests.html +[4]: https://gradle.org/ diff --git a/DISCLAIMER.md b/DISCLAIMER.md new file mode 100644 index 00000000..fb9a9721 --- /dev/null +++ b/DISCLAIMER.md @@ -0,0 +1,10 @@ +# Disclaimer + +This software is preliminary or provisional and is subject to revision. It is +being provided to meet the need for timely best science. The software has not +received final approval by the U.S. Geological Survey (USGS). No warranty, +expressed or implied, is made by the USGS or the U.S. Government as to the +functionality of the software and related material nor shall the fact of release +constitute any such warranty. The software is provided on the condition that +neither the USGS nor the U.S. Government shall be held liable for any damages +resulting from the authorized or unauthorized use of the software. diff --git a/LICENSE.md b/LICENSE.md index 8b34ba5f..06f4e3b5 100644 --- a/LICENSE.md +++ b/LICENSE.md @@ -1,35 +1,56 @@ -Unless otherwise noted, This software is in the public domain because it -contains materials that originally came from the United States Geological -Survey, an agency of the United States Department of Interior. For more -information, see the official USGS copyright policy at -http://www.usgs.gov/visual-id/credit_usgs.html#copyright +License +======= +Unless otherwise noted, This project is in the public domain in the United +States because it contains materials that originally came from the United +States Geological Survey, an agency of the United States Department of +Interior. For more information, see the official USGS copyright policy at +https://www.usgs.gov/information-policies-and-instructions/copyrights-and-credits -Dependent libraries found in the "lib" directory are distributed under the +Additionally, we waive copyright and related rights in the work +worldwide through the CC0 1.0 Universal public domain dedication. + + +CC0 1.0 Universal Summary +------------------------- + +This is a human-readable summary of the +[Legal Code (read the full text)][1]. + + +### No Copyright + +The person who associated a work with this deed has dedicated the work to +the public domain by waiving all of his or her rights to the work worldwide +under copyright law, including all related and neighboring rights, to the +extent allowed by law. + +You can copy, modify, distribute and perform the work, even for commercial +purposes, all without asking permission. + + +### Other Information + +In no way are the patent or trademark rights of any person affected by CC0, +nor are the rights that other persons may have in the work or in how the +work is used, such as publicity or privacy rights. + +Unless expressly stated otherwise, the person who associated a work with +this deed makes no warranties about the work, and disclaims liability for +all uses of the work, to the fullest extent permitted by applicable law. +When using or citing the work, you should not imply endorsement by the +author or the affirmer. + + +### Liraries + +Libraries found in the "lib" directory are distributed under the open source (or open source-like) licenses/agreements. Appropriate license agreements for each library can be found in the "lib" directory. Other dependencies can be found in `build.gradle`. - - eqmessageutils.jar (https://github.com/usgs/eqmessageutils) + - MessageUtils.jar (https://github.com/usgs/eqmessageutils) - QWFileOutClient.jar (http://www.isti2.com/QWIDS/current_dist/QWIDS_EIDS_Summary.html#Open) - - sqlitejdbc-v056.jar (https://github.com/xerial/sqlite-jdbc) - cap-library-r11.jar (https://github.com/google/cap-library) - - -Disclaimers ------------ -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. - -Information provided by this software may be preliminary or provisional and is -subject to revision. It is being provided to meet the need for timely best -science. The information has not received final approval by the U.S. Geological -Survey (USGS) and is provided on the condition that neither the USGS nor the -U.S. Government shall be held liable for any damages resulting from the -authorized or unauthorized use of the information. +[1]: https://creativecommons.org/publicdomain/zero/1.0/legalcode From 0ddb999e1da0ddd75157a6eabab62bfec123ed22 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Mon, 26 Sep 2022 21:16:13 -0600 Subject: [PATCH 09/23] Update code.json, and bump version to v2.8.0 --- code.json | 16 ++++++++-------- .../earthquake/distribution/ProductClient.java | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/code.json b/code.json index e455fd80..8d5b8ec5 100644 --- a/code.json +++ b/code.json @@ -3,31 +3,31 @@ "name": "Product Distribution Layer", "organization": "U.S. Geological Survey", "description": "Distribution system used for derived earthquake information", - "version": "v2.7.12", + "version": "v2.8.0", "status": "Production", "permissions": { "usageType": "openSource", "licenses": [ { "name": "Public Domain, CC0-1.0", - "URL": "https://github.com/usgs/pdl/blob/master/LICENSE.md" + "URL": "https://code.usgs.gov/ghsc/hazdev/pdl/-/raw/master/LICENSE.md" } ] }, "homepageURL": "https://usgs.github.io/pdl/", - "downloadURL": "https://github.com/usgs/pdl/archive/master.zip", - "disclaimerURL": "https://github.com/usgs/pdl/blob/master/LICENSE.md", - "repositoryURL": "https://github.com/usgs/pdl.git", + "downloadURL": "https://code.usgs.gov/ghsc/hazdev/pdl/-/archive/master/pdl-master.zip", + "disclaimerURL": "https://code.usgs.gov/ghsc/hazdev/pdl/-/raw/master/DISCLAIMER.md", + "repositoryURL": "https://code.usgs.gov/ghsc/hazdev/pdl.git", "vcs": "git", "laborHours": 0, "tags": ["earthquake", "distribution", "quakeml"], "languages": ["java"], "contact": { - "name": "Jeremy Fee", - "email": "jmfee@usgs.gov" + "name": "Hazards Development Team", + "email": "gs-haz_dev_team_group@usgs.gov" }, "date": { - "metadataLastUpdated": "2022-09-16" + "metadataLastUpdated": "2022-09-27" } } ] diff --git a/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java b/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java index e2763f09..baac2ab8 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java +++ b/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java @@ -64,7 +64,7 @@ public class ProductClient extends DefaultConfigurable implements ProductClientMBean, Bootstrappable { /** The "release" version number. */ - public static final String RELEASE_VERSION = "Version 2.7.12 2022-09-16"; + public static final String RELEASE_VERSION = "Version 2.8.0 2022-09-27"; /** Property name used on products for current RELEASE_VERSION. */ public static final String PDL_CLIENT_VERSION_PROPERTY = "pdl-client-version"; From 2b1de8514e959dafc9bfc8dc19f13d141114cbb6 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Tue, 27 Sep 2022 10:00:55 -0600 Subject: [PATCH 10/23] NotificationIndexCleanup, remove expired notifications in batches instead of one at a time --- .../earthquake/aws/JsonNotificationIndex.java | 72 +++++++++++-------- .../distribution/JDBCNotificationIndex.java | 18 +++++ .../distribution/NotificationIndex.java | 38 ++++++---- .../NotificationIndexCleanup.java | 36 ++++++---- .../NotificationIndexCleanupTest.java | 8 ++- 5 files changed, 114 insertions(+), 58 deletions(-) diff --git a/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java b/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java index 7e613a4c..058fa7b4 100644 --- a/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java +++ b/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java @@ -273,42 +273,56 @@ public synchronized void addNotification(Notification notification) */ @Override public synchronized void removeNotification(Notification notification) throws Exception { - // all notifications - Instant expires = notification.getExpirationDate().toInstant(); - ProductId id = notification.getProductId(); - // json only - Instant created = null; - Product product = null; - // url only - URL url = null; - if (notification instanceof JsonNotification) { - JsonNotification jsonNotification = (JsonNotification) notification; - created = jsonNotification.created; - product = jsonNotification.product; - } else if (notification instanceof URLNotification) { - url = ((URLNotification) notification).getProductURL(); - } + final List notifications = new ArrayList<>(); + notifications.add(notification); + this.removeNotifications(notifications); + } + + /** + * Remove notifications from index. + * + * Tracker URLs are ignored. + * @param notification to be removed from index + * @throws Exception if error occurs + */ + @Override + public synchronized void removeNotifications(List notifications) throws Exception { // prepare statement final String sql = "DELETE FROM " + this.table + " WHERE created=? AND expires=? AND source=? AND type=? AND code=?" - + " AND updatetime=? AND url=? AND data" - // created is _very_ specific and is set when product is not null - // skip overhead of embedding product in query - + (product == null ? " IS NULL" : " IS NOT NULL"); + + " AND updatetime=? AND url=?"; beginTransaction(); try (final PreparedStatement statement = getConnection().prepareStatement(sql)) { try { statement.setQueryTimeout(60); - // set parameters - statement.setString(1, created != null ? created.toString() : ""); - statement.setString(2, expires.toString()); - statement.setString(3, id.getSource()); - statement.setString(4, id.getType()); - statement.setString(5, id.getCode()); - statement.setLong(6, id.getUpdateTime().getTime()); - statement.setString(7, url != null ? url.toString() : ""); + + for (Notification notification : notifications) { + // all notifications + Instant expires = notification.getExpirationDate().toInstant(); + ProductId id = notification.getProductId(); + // json only + Instant created = null; + // url only + URL url = null; + if (notification instanceof JsonNotification) { + JsonNotification jsonNotification = (JsonNotification) notification; + created = jsonNotification.created; + } else if (notification instanceof URLNotification) { + url = ((URLNotification) notification).getProductURL(); + } + + // set parameters + statement.setString(1, created != null ? created.toString() : ""); + statement.setString(2, expires.toString()); + statement.setString(3, id.getSource()); + statement.setString(4, id.getType()); + statement.setString(5, id.getCode()); + statement.setLong(6, id.getUpdateTime().getTime()); + statement.setString(7, url != null ? url.toString() : ""); + statement.addBatch(); + } // execute - statement.executeUpdate(); + statement.executeBatch(); commitTransaction(); } catch (SQLException e) { LOGGER.log(Level.WARNING, "Exception removing notification", e); @@ -478,7 +492,7 @@ public synchronized List findNotifications( */ @Override public synchronized List findExpiredNotifications() throws Exception { - final String sql = "SELECT * FROM " + this.table + " WHERE expires <= ? LIMIT 1000"; + final String sql = "SELECT * FROM " + this.table + " WHERE expires <= ? LIMIT 5000"; // prepare statement beginTransaction(); try (final PreparedStatement statement = getConnection().prepareStatement(sql)) { diff --git a/src/main/java/gov/usgs/earthquake/distribution/JDBCNotificationIndex.java b/src/main/java/gov/usgs/earthquake/distribution/JDBCNotificationIndex.java index 7c780389..b7c797a1 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/JDBCNotificationIndex.java +++ b/src/main/java/gov/usgs/earthquake/distribution/JDBCNotificationIndex.java @@ -655,6 +655,24 @@ public synchronized void removeNotification(Notification notification) } } + /** + * Remove notifications from the index. + * + * All matching notifications should be removed from the index. + * + * @param notifications + * the notifications to remove. + * @throws Exception + * if an error occurs while removing the notifications. + * @see gov.usgs.earthquake.distribution.NotificationIndex + */ + public synchronized void removeNotifications(List notifications) + throws Exception { + for (Notification notification : notifications) { + this.removeNotification(notification); + } + } + /** * Search the index for notifications matching id. * diff --git a/src/main/java/gov/usgs/earthquake/distribution/NotificationIndex.java b/src/main/java/gov/usgs/earthquake/distribution/NotificationIndex.java index 6a642daa..b89c40e8 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/NotificationIndex.java +++ b/src/main/java/gov/usgs/earthquake/distribution/NotificationIndex.java @@ -10,7 +10,7 @@ /** * Stores and retrieves Notifications. - * + * * This is typically used by a NotificationReceiver to track its Notifications, * but may also be used by NotificationListeners. Each object should maintain a * separate NotificationIndex. @@ -19,10 +19,10 @@ public interface NotificationIndex extends Configurable { /** * Add a notification to the index. - * + * * If an identical notification is already in the index, the implementation * may choose whether or not to store the duplicate information. - * + * * @param notification * the notification to add. * @throws Exception @@ -33,9 +33,9 @@ public void addNotification(final Notification notification) /** * Remove a notification from the index. - * + * * All matching notifications should be removed from the index. - * + * * @param notification * the notification to remove. * @throws Exception @@ -44,11 +44,21 @@ public void addNotification(final Notification notification) public void removeNotification(final Notification notification) throws Exception; + /** + * Remove notifications from index. + * + * Tracker URLs are ignored. + * @param notification to be removed from index + * @throws Exception if error occurs + */ + public void removeNotifications(List notifications) + throws Exception; + /** * Search the index for notifications matching id. - * + * * If more than one notification matches, all should be returned. - * + * * @param id * the ProductId to find. * @return a list of matching notifications. @@ -61,13 +71,13 @@ public List findNotifications(final ProductId id) /** * Search the index for notifications matching the sources, types, and * codes. - * + * * Only one notification for each unique ProductId * (source+type+code+updateTime) should be returned. If sources, types, * and/or codes are null, that parameter should be considered a wildcard. If * sources, types, and codes are all null, a notification for each unique * ProductId in the index should be returned. - * + * * @param source * sources to include, or all if null. * @param type @@ -84,13 +94,13 @@ public List findNotifications(final String source, /** * Search the index for notifications matching the sources, types, and * codes. - * + * * Only one notification for each unique ProductId * (source+type+code+updateTime) should be returned. If sources, types, * and/or codes are null, that parameter should be considered a wildcard. If * sources, types, and codes are all null, a notification for each unique * ProductId in the index should be returned. - * + * * @param sources * sources to include, or all if null. * @param types @@ -103,12 +113,12 @@ public List findNotifications(final String source, */ public List findNotifications(List sources, List types, List codes) throws Exception; - + /** * Search the index for expired notifications. - * + * * All expired notifications, even if duplicate, should be returned. - * + * * @return a list of expired notifications. * @throws Exception * if an error occurs while searching the index. diff --git a/src/main/java/gov/usgs/earthquake/distribution/NotificationIndexCleanup.java b/src/main/java/gov/usgs/earthquake/distribution/NotificationIndexCleanup.java index aa36e9b8..df2649f3 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/NotificationIndexCleanup.java +++ b/src/main/java/gov/usgs/earthquake/distribution/NotificationIndexCleanup.java @@ -1,5 +1,6 @@ package gov.usgs.earthquake.distribution; +import java.util.ArrayList; import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; @@ -65,25 +66,32 @@ public void run() { } // remove batch of expired notifications - int removed = 0; - for (final Notification expired : expiredNotifications) { - synchronized (syncObject) { - if (stopThread) { - break; + final List removed = new ArrayList<>(expiredNotifications.size()); + if (this.listener == null) { + removed.addAll(expiredNotifications); + } else { + // notify listener, remove only those successfully processed by listener + for (final Notification expired : expiredNotifications) { + synchronized (syncObject) { + if (stopThread) { + break; + } } - } - try { - if (this.listener != null) { + try { this.listener.onExpiredNotification(expired); + removed.add(expired); + } catch (Exception e) { + LOGGER.log(Level.WARNING, e, () -> "[" + indexName + "] Listener exception processing expired notification"); } - this.index.removeNotification(expired); - removed++; - } catch (Exception e) { - LOGGER.log(Level.WARNING, e, () -> "[" + indexName + "] Exception removing expired notification"); } } - final int total = removed; - LOGGER.fine(() -> "[" + indexName + "] Removed " + total + " expired notifications"); + try { + // remove in batch + this.index.removeNotifications(removed); + LOGGER.fine(() -> "[" + indexName + "] Removed " + removed.size() + " expired notifications"); + } catch (Exception e) { + LOGGER.log(Level.WARNING, e, () -> "[" + indexName + "] Exception removing expired notifications"); + } } LOGGER.finer(() -> "[" + indexName + "] NotificationIndexCleanup exiting"); this.cleanupThread = null; diff --git a/src/test/java/gov/usgs/earthquake/distribution/NotificationIndexCleanupTest.java b/src/test/java/gov/usgs/earthquake/distribution/NotificationIndexCleanupTest.java index 8f724f1a..6c092a83 100644 --- a/src/test/java/gov/usgs/earthquake/distribution/NotificationIndexCleanupTest.java +++ b/src/test/java/gov/usgs/earthquake/distribution/NotificationIndexCleanupTest.java @@ -63,7 +63,7 @@ public void testNotificationCleanupListener() throws Exception{ final NotificationIndexCleanup cleanup = new NotificationIndexCleanup(index, (Notification expired) -> { listenerNotifications.add(expired); // check size before return - Assert.assertEquals(listenerNotifications.size() - 1, index.removedNotifications.size()); + Assert.assertTrue(listenerNotifications.size() > index.removedNotifications.size()); }); // thread should start and wait cleanup.startup(); @@ -170,6 +170,12 @@ public List findExpiredNotifications() throws Exception { public void removeNotification(final Notification notification) throws Exception { removedNotifications.add(notification); } + + @Override + public void removeNotifications(final List notifications) throws Exception { + removedNotifications.addAll(notifications); + } + } } From 3c7d420f98cad9c22d05e63dfa0934a849175f0d Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Tue, 27 Sep 2022 10:02:27 -0600 Subject: [PATCH 11/23] Fix javadoc warnings --- .../java/gov/usgs/earthquake/aws/JsonNotificationIndex.java | 3 ++- .../gov/usgs/earthquake/distribution/NotificationIndex.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java b/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java index 058fa7b4..bfca41a6 100644 --- a/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java +++ b/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java @@ -282,7 +282,8 @@ public synchronized void removeNotification(Notification notification) throws Ex * Remove notifications from index. * * Tracker URLs are ignored. - * @param notification to be removed from index + * @param notifications + * notifications to be removed from index * @throws Exception if error occurs */ @Override diff --git a/src/main/java/gov/usgs/earthquake/distribution/NotificationIndex.java b/src/main/java/gov/usgs/earthquake/distribution/NotificationIndex.java index b89c40e8..c9adb044 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/NotificationIndex.java +++ b/src/main/java/gov/usgs/earthquake/distribution/NotificationIndex.java @@ -48,7 +48,8 @@ public void removeNotification(final Notification notification) * Remove notifications from index. * * Tracker URLs are ignored. - * @param notification to be removed from index + * @param notifications + * notifications to be removed from index * @throws Exception if error occurs */ public void removeNotifications(List notifications) From c5232220bf9a18e08c77149b94b7766aa6c0ea39 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Tue, 27 Sep 2022 10:30:49 -0600 Subject: [PATCH 12/23] Update jacoco version, pipeline images --- .gitlab-ci.yml | 14 +++++++------- build.gradle | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 33ba8186..bf722528 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,4 +1,4 @@ -image: ${DEVOPS_REGISTRY}usgs/centos:8 +image: ${CI_REGISTRY}/devops/images/usgs/java:11-jdk stages: - build @@ -22,7 +22,7 @@ stages: paths: - .gradle/caches - .gradle/wrapper - image: ${DEVOPS_REGISTRY}usgs/java:11-jdk + image: ${CI_REGISTRY}/devops/images/usgs/java:11-jdk script: # run gradle and javadoc - export GRADLE_USER_HOME="${CI_PROJECT_DIR}/.gradle" @@ -38,20 +38,20 @@ stages: Java 8: extends: - .check_code - image: ${DEVOPS_REGISTRY}usgs/java:8-jdk + image: ${CI_REGISTRY}/devops/images/usgs/java:8-jdk Java 11: extends: - .check_code - image: ${DEVOPS_REGISTRY}usgs/java:11-jdk + image: ${CI_REGISTRY}/devops/images/usgs/java:11-jdk -Java Latest: +Java 17: extends: - .check_code - image: ${DEVOPS_REGISTRY}usgs/java:latest-jdk + image: ${CI_REGISTRY}/devops/images/usgs/java:17-jdk Javadoc: - image: ${DEVOPS_REGISTRY}usgs/java:latest-jdk + image: ${CI_REGISTRY}/devops/images/usgs/java:17-jdk script: - ./gradlew javadoc stage: javadoc diff --git a/build.gradle b/build.gradle index 8ce487a1..4e3647f6 100644 --- a/build.gradle +++ b/build.gradle @@ -72,7 +72,7 @@ tasks.withType(JavaCompile) { // coverage reports jacoco { - toolVersion "0.8.7" + toolVersion "0.8.8" } jacocoTestReport { reports { From 4d0119a785bfa3003e5269ff71ac9534c24f11e2 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Tue, 27 Sep 2022 10:34:24 -0600 Subject: [PATCH 13/23] Update pipeline images --- .gitlab-ci.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index bf722528..1a820d5c 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,4 +1,4 @@ -image: ${CI_REGISTRY}/devops/images/usgs/java:11-jdk +image: ${CI_REGISTRY}/devops/images/usgs/java:11 stages: - build @@ -22,7 +22,7 @@ stages: paths: - .gradle/caches - .gradle/wrapper - image: ${CI_REGISTRY}/devops/images/usgs/java:11-jdk + image: ${CI_REGISTRY}/devops/images/usgs/java:11 script: # run gradle and javadoc - export GRADLE_USER_HOME="${CI_PROJECT_DIR}/.gradle" @@ -38,20 +38,20 @@ stages: Java 8: extends: - .check_code - image: ${CI_REGISTRY}/devops/images/usgs/java:8-jdk + image: ${DEVOPS_REGISTRY}usgs/java:8-jdk Java 11: extends: - .check_code - image: ${CI_REGISTRY}/devops/images/usgs/java:11-jdk + image: ${CI_REGISTRY}/devops/images/usgs/java:11 Java 17: extends: - .check_code - image: ${CI_REGISTRY}/devops/images/usgs/java:17-jdk + image: ${CI_REGISTRY}/devops/images/usgs/java:17 Javadoc: - image: ${CI_REGISTRY}/devops/images/usgs/java:17-jdk + image: ${CI_REGISTRY}/devops/images/usgs/java:17 script: - ./gradlew javadoc stage: javadoc From 358d24a5ab11b1dce3311c1c7dcda8d1cb0bdd1b Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Tue, 27 Sep 2022 10:39:23 -0600 Subject: [PATCH 14/23] Increase test sleep time --- .../earthquake/distribution/NotificationIndexCleanupTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/gov/usgs/earthquake/distribution/NotificationIndexCleanupTest.java b/src/test/java/gov/usgs/earthquake/distribution/NotificationIndexCleanupTest.java index 6c092a83..834c4e2f 100644 --- a/src/test/java/gov/usgs/earthquake/distribution/NotificationIndexCleanupTest.java +++ b/src/test/java/gov/usgs/earthquake/distribution/NotificationIndexCleanupTest.java @@ -131,14 +131,14 @@ public void testNotificationCleanupListenerException() throws Exception{ index.expiredNotificationsReturns.add(notifications); listenerThrow.add(true); cleanup.wakeUp(); - Thread.sleep(1L); + Thread.sleep(5L); Assert.assertEquals(0, index.removedNotifications.size()); Assert.assertEquals(0, listenerNotifications.size()); // run with multiple findExpiredNotifications returns, should remove all index.expiredNotificationsReturns.add(notifications); listenerThrow.clear(); cleanup.wakeUp(); - Thread.sleep(1L); + Thread.sleep(5L); Assert.assertEquals(2, index.removedNotifications.size()); Assert.assertEquals(2, listenerNotifications.size()); } From 3b3ac4aba9f3a712746b8d72aefe783d7ad50464 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Tue, 27 Sep 2022 10:43:50 -0600 Subject: [PATCH 15/23] Increase test sleep time --- .../earthquake/distribution/NotificationIndexCleanupTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/gov/usgs/earthquake/distribution/NotificationIndexCleanupTest.java b/src/test/java/gov/usgs/earthquake/distribution/NotificationIndexCleanupTest.java index 834c4e2f..3602224d 100644 --- a/src/test/java/gov/usgs/earthquake/distribution/NotificationIndexCleanupTest.java +++ b/src/test/java/gov/usgs/earthquake/distribution/NotificationIndexCleanupTest.java @@ -38,7 +38,7 @@ public void testNotificationCleanup() throws Exception{ // run cleanup thread, should remove 2 notifications index.expiredNotificationsReturns.add(notifications); cleanup.wakeUp(); - Thread.sleep(1L); + Thread.sleep(5L); Assert.assertEquals(index.removedNotifications.size(), 2); // run with multiple findExpiredNotifications returns, should remove all @@ -46,7 +46,7 @@ public void testNotificationCleanup() throws Exception{ index.expiredNotificationsReturns.add(notifications); index.expiredNotificationsReturns.add(notifications); cleanup.wakeUp(); - Thread.sleep(2L); + Thread.sleep(5L); Assert.assertEquals(index.removedNotifications.size(), 4); } From 423ef2523cd3dc81c5b9c34da87d495660bea9bf Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Tue, 27 Sep 2022 10:54:13 -0600 Subject: [PATCH 16/23] Use java 11 for pipeline javadoc --- .gitlab-ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 1a820d5c..b17805ce 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -51,7 +51,7 @@ Java 17: image: ${CI_REGISTRY}/devops/images/usgs/java:17 Javadoc: - image: ${CI_REGISTRY}/devops/images/usgs/java:17 + image: ${CI_REGISTRY}/devops/images/usgs/java:11 script: - ./gradlew javadoc stage: javadoc From 64fea990416f0f220bce2b6ec7e03a7810931d69 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Tue, 27 Sep 2022 10:56:50 -0600 Subject: [PATCH 17/23] Revert to pipeline images that run as root --- .gitlab-ci.yml | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index b17805ce..36d3b195 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,8 +1,6 @@ -image: ${CI_REGISTRY}/devops/images/usgs/java:11 stages: - build - - javadoc .check_code: after_script: @@ -22,7 +20,7 @@ stages: paths: - .gradle/caches - .gradle/wrapper - image: ${CI_REGISTRY}/devops/images/usgs/java:11 + image: ${DEVOPS_REGISTRY}usgs/java:11-jdk script: # run gradle and javadoc - export GRADLE_USER_HOME="${CI_PROJECT_DIR}/.gradle" @@ -43,15 +41,15 @@ Java 8: Java 11: extends: - .check_code - image: ${CI_REGISTRY}/devops/images/usgs/java:11 + image: ${DEVOPS_REGISTRY}usgs/java:11-jdk Java 17: extends: - .check_code - image: ${CI_REGISTRY}/devops/images/usgs/java:17 + image: ${DEVOPS_REGISTRY}usgs/java:17-jdk Javadoc: - image: ${CI_REGISTRY}/devops/images/usgs/java:11 + image: ${DEVOPS_REGISTRY}usgs/java:11-jdk script: - ./gradlew javadoc - stage: javadoc + stage: build From ff0bf25bc1d1619c310c222fa098b4fcdad9c681 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Tue, 27 Sep 2022 10:57:51 -0600 Subject: [PATCH 18/23] Use java latest image instead of 17 --- .gitlab-ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 36d3b195..5cd81e40 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -43,10 +43,10 @@ Java 11: - .check_code image: ${DEVOPS_REGISTRY}usgs/java:11-jdk -Java 17: +Java Latest: extends: - .check_code - image: ${DEVOPS_REGISTRY}usgs/java:17-jdk + image: ${DEVOPS_REGISTRY}usgs/java:latest-jdk Javadoc: image: ${DEVOPS_REGISTRY}usgs/java:11-jdk From 3a21661588b6ae35958de0f98e285559e6a186e4 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Tue, 27 Sep 2022 11:24:33 -0600 Subject: [PATCH 19/23] Update sqlite version to support ARM, skip JsonNotificationIndex check when sqlite --- build.gradle | 2 +- .../earthquake/distribution/ExecutorListenerNotifier.java | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/build.gradle b/build.gradle index 4e3647f6..6e53bbeb 100644 --- a/build.gradle +++ b/build.gradle @@ -40,7 +40,7 @@ dependencies { implementation "org.glassfish.tyrus.bundles:tyrus-standalone-client:1.17" // database drivers runtimeOnly "mysql:mysql-connector-java:5.1.47" - runtimeOnly "org.xerial:sqlite-jdbc:3.23.1" + runtimeOnly "org.xerial:sqlite-jdbc:3.39.2.1" // ssh keys implementation "ch.ethz.ganymed:ganymed-ssh2:262" // nats diff --git a/src/main/java/gov/usgs/earthquake/distribution/ExecutorListenerNotifier.java b/src/main/java/gov/usgs/earthquake/distribution/ExecutorListenerNotifier.java index 37487376..d13ba496 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/ExecutorListenerNotifier.java +++ b/src/main/java/gov/usgs/earthquake/distribution/ExecutorListenerNotifier.java @@ -236,8 +236,10 @@ public void startup() throws Exception { NotificationIndex listenerIndex = ((DefaultNotificationListener) gracefulListeners.get(0)) .getNotificationIndex(); - if (listenerIndex instanceof JsonNotificationIndex) { - // get intersection + if (listenerIndex instanceof JsonNotificationIndex + && !((JsonNotificationIndex) listenerIndex).getDriver().contains("sqlite") + ) { + // get intersection when potentially sharing database try { allNotifications = ((JsonNotificationIndex) index).getMissingNotifications( From 0566543ac7a164c2b572b8db6bf1177249385469 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Tue, 27 Sep 2022 12:32:51 -0600 Subject: [PATCH 20/23] Add JsonNotificationIndex, JsonProductIndex, and TrackingIndex to configuration.html --- docs/userguide/configuration.html | 159 +++++++++++++++++++++++++++++- 1 file changed, 157 insertions(+), 2 deletions(-) diff --git a/docs/userguide/configuration.html b/docs/userguide/configuration.html index fee84505..638dbc34 100644 --- a/docs/userguide/configuration.html +++ b/docs/userguide/configuration.html @@ -11,6 +11,7 @@

Configuration

« Back to User Guide

+

Navigation

  • Indexer Components @@ -38,6 +40,7 @@

    Navigation

  • Reliable Listener Configuration
  • +

    Format

    The configuration file uses an INI style syntax for properties. Any lines that begin with a semi-colon are treated as comments and @@ -232,6 +235,7 @@

    Global Properties

    Distribution Components

    +

    NotificationIndex

    A notification index is used by Hubs and Clients to track available products.

    @@ -243,7 +247,56 @@

    NotificationIndex

    The SQLite database file. Default is "pd_index.db".
    + +
    gov.usgs.earthquake.aws.JsonNotificationIndex
    +
    +
    +
    driver
    +
    + JDBC Driver class for connection. + (Optional, default org.sqlite.JDBC) +
    + +
    table
    +
    + Database table name. + (Optional, default notification) +
    + +
    url
    +
    + JDBC Connection URL. + (Optional, default jdbc:sqlite:json_notification_index.db) +
    +
    +
    + +

    + The JsonNotificationIndex schema can be created automatically when using Sqlite. + For mysql or other databases, it is recommended to create the schema + separately and run with a user with only SELECT,INSERT,UPDATE, and DELETE + permissions. +

    + +
    +CREATE TABLE IF NOT EXISTS notification
    + (id INTEGER PRIMARY KEY AUTO_INCREMENT
    + , created VARCHAR(255)
    + , expires VARCHAR(255)
    + , source VARCHAR(255)
    + , type VARCHAR(255)
    + , code VARCHAR(255)
    + , updatetime BIGINT
    + , url TEXT
    + , data LONGTEXT
    + , KEY source_index (source)
    + , KEY type_index (type)
    + , KEY code_index (code)
    + , KEY expires_index (expires)
    + ) ENGINE=innodb CHARSET=utf8;
    +		
    +

    NotificationListener

    Listeners are used by Hubs and Clients

    @@ -512,6 +565,7 @@

    Listener Types

    +

    NotificationReceiver

    Receivers are used by Hubs and Clients.

    @@ -599,7 +653,10 @@

    Receiver Types

    url
    -
    Base URL for hub.
    +
    Base URL for hub. + + e.g. wss://earthquake.usgs.gov/pdl/west/subscribe +
    trackingFileName
    File where current position in hub notifications is tracked.
    @@ -618,7 +675,7 @@

    Receiver Types

    initialCatchUpAge
    - (Optional, default 7.0 days) how far back in time to start when + (Optional, default 7.0 days, max 30.0) how far back in time to start when connecting to server. subsequent attempts start at current position, which is tracked.
    @@ -854,7 +911,56 @@

    ProductStorage

    + +
    gov.usgs.earthquake.aws.JsonProductStorage
    +
    +

    + A JsonProductStorage stores Json product metadata in a database, + instead of the filesystem, and works best with an AwsProductReceiver. +

    + +
    +
    driver
    +
    + JDBC Driver class for connection. + (Optional, default org.sqlite.JDBC) +
    + +
    table
    +
    + Database table name. + (Optional, default product) +
    + +
    url
    +
    + JDBC Connection URL. + (Optional, default jdbc:sqlite:json_product_index.db) +
    +
    + +

    + The JsonProductStorage schema can be created automatically when using Sqlite. + For mysql or other databases, it is recommended to create the schema + separately and run with a user with only SELECT,INSERT,UPDATE, and DELETE + permissions. +

    + +
    +CREATE TABLE IF NOT EXISTS product
    + (id INTEGER PRIMARY KEY AUTO_INCREMENT
    + , source VARCHAR(255)
    + , type VARCHAR(255)
    + , code VARCHAR(255)
    + , updatetime BIGINT
    + , data LONGTEXT
    + , UNIQUE KEY product_index (source, type, code, updatetime)
    + ) ENGINE=innodb CHARSET=utf8;
    +			
    +
    + +

    StorageListener

    Listeners are notified when products are added-to, or removed from the ProductStorage.

    @@ -897,6 +1003,7 @@

    StorageListener

    +

    ProductKey

    Keys are used to verify signed products, and more specifically by FileProductStorage.

    @@ -919,6 +1026,54 @@

    ProductKey

    +

    TrackingIndex

    +

    The TrackingIndex is used to store component state in a database.

    + +
    +
    gov.usgs.earthquake.aws.TrackingIndex
    +
    +

    + Used by an AwsProductReceiver. +

    + +
    +
    driver
    +
    + JDBC Driver class for connection. + (Optional, default org.sqlite.JDBC) +
    + +
    table
    +
    + Database table name. + (Optional, default tracking) +
    + +
    url
    +
    + JDBC Connection URL. + (Optional, default jdbc:sqlite:json_product_index.db) +
    +
    + +

    + The TrackingIndex schema can be created automatically when using Sqlite. + For mysql or other databases, it is recommended to create the schema + separately and run PDL with a user with only SELECT,INSERT,UPDATE, and DELETE + permissions. +

    + +
    +CREATE TABLE IF NOT EXISTS tracking
    +	(id INTEGER PRIMARY KEY AUTO_INCREMENT
    +	, created VARCHAR(255)
    +	, name VARCHAR(255)
    +	, data LONGTEXT
    +	, UNIQUE KEY name_index (name)
    +	) ENGINE=innodb CHARSET=utf8;
    +			
    +
    +

    Indexer Components

    From bf07a39bc06b03753c48b51ce28c248a4f60f33e Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Tue, 11 Oct 2022 08:44:33 -0600 Subject: [PATCH 21/23] Update release date for 2.8.0 to 2022-10-11 --- code.json | 2 +- .../java/gov/usgs/earthquake/distribution/ProductClient.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/code.json b/code.json index 8d5b8ec5..dce29d81 100644 --- a/code.json +++ b/code.json @@ -27,7 +27,7 @@ "email": "gs-haz_dev_team_group@usgs.gov" }, "date": { - "metadataLastUpdated": "2022-09-27" + "metadataLastUpdated": "2022-10-11" } } ] diff --git a/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java b/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java index baac2ab8..85918809 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java +++ b/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java @@ -64,7 +64,7 @@ public class ProductClient extends DefaultConfigurable implements ProductClientMBean, Bootstrappable { /** The "release" version number. */ - public static final String RELEASE_VERSION = "Version 2.8.0 2022-09-27"; + public static final String RELEASE_VERSION = "Version 2.8.0 2022-10-11"; /** Property name used on products for current RELEASE_VERSION. */ public static final String PDL_CLIENT_VERSION_PROPERTY = "pdl-client-version"; From a48a2d33d934b46964761b0820db466eacbb98fb Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Tue, 11 Oct 2022 09:06:02 -0600 Subject: [PATCH 22/23] Switch Java Latest to Java 17 --- .gitlab-ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 5cd81e40..e43d043e 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -43,10 +43,10 @@ Java 11: - .check_code image: ${DEVOPS_REGISTRY}usgs/java:11-jdk -Java Latest: +Java 17: extends: - .check_code - image: ${DEVOPS_REGISTRY}usgs/java:latest-jdk + image: code.usgs.gov:5001/devops/images/usgs/java:17 Javadoc: image: ${DEVOPS_REGISTRY}usgs/java:11-jdk From 783a14e5fc8040628f0d1345f39dbc83d6da8af6 Mon Sep 17 00:00:00 2001 From: Jeremy Fee Date: Tue, 11 Oct 2022 09:24:51 -0600 Subject: [PATCH 23/23] Remove Java Latest job for now, centos 7 based image no longer supports java>11 --- .gitlab-ci.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index e43d043e..d99fa343 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -43,10 +43,10 @@ Java 11: - .check_code image: ${DEVOPS_REGISTRY}usgs/java:11-jdk -Java 17: - extends: - - .check_code - image: code.usgs.gov:5001/devops/images/usgs/java:17 +# Java Latest: +# extends: +# - .check_code +# image: ${DEVOPS_REGISTRY}usgs/java:latest-jdk Javadoc: image: ${DEVOPS_REGISTRY}usgs/java:11-jdk