diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 33ba8186..d99fa343 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,8 +1,6 @@ -image: ${DEVOPS_REGISTRY}usgs/centos:8 stages: - build - - javadoc .check_code: after_script: @@ -45,13 +43,13 @@ Java 11: - .check_code image: ${DEVOPS_REGISTRY}usgs/java:11-jdk -Java Latest: - extends: - - .check_code - image: ${DEVOPS_REGISTRY}usgs/java:latest-jdk +# Java Latest: +# extends: +# - .check_code +# image: ${DEVOPS_REGISTRY}usgs/java:latest-jdk Javadoc: - image: ${DEVOPS_REGISTRY}usgs/java:latest-jdk + image: ${DEVOPS_REGISTRY}usgs/java:11-jdk script: - ./gradlew javadoc - stage: javadoc + stage: build diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 00000000..8ab1e097 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,42 @@ +# Contributing + +Contributions are welcome from the community. + +## Bug Reports and Suggested Enhancements + +Bug Reports and Suggested Enhancements can be [submitted on the issues page][1]. + +Before creating a new issue, please take a moment to search and make sure a +similar issue does not already exist. If one does exist, you can comment +(most simply even with a :+1: (`:+1:`) to show your support for that issue. + +## Development + +This project uses [Gradle][4] to manage dependencies and build the project. + +- Clone project + + > Note: This project uses a [forking workflow][2]. + > Creating a fork is not required to get started, but is recommended. + +- Install dependencies + + - Java 8, 11, or 17 + +- Build project and run tests + + ``` + ./gradlew build + ``` + +### Merge Requests + +If you have direct contributions you would like considered for incorporation +into the project you can [fork this repository][2] and +[create a merge request][3] for review. + + +[1]: https://code.usgs.gov/ghsc/hazdev/pdl/issues +[2]: https://docs.gitlab.com/ee/user/project/repository/forking_workflow.html +[3]: https://docs.gitlab.com/ee/user/project/merge_requests/creating_merge_requests.html +[4]: https://gradle.org/ diff --git a/DISCLAIMER.md b/DISCLAIMER.md new file mode 100644 index 00000000..fb9a9721 --- /dev/null +++ b/DISCLAIMER.md @@ -0,0 +1,10 @@ +# Disclaimer + +This software is preliminary or provisional and is subject to revision. It is +being provided to meet the need for timely best science. The software has not +received final approval by the U.S. Geological Survey (USGS). No warranty, +expressed or implied, is made by the USGS or the U.S. Government as to the +functionality of the software and related material nor shall the fact of release +constitute any such warranty. The software is provided on the condition that +neither the USGS nor the U.S. Government shall be held liable for any damages +resulting from the authorized or unauthorized use of the software. diff --git a/LICENSE.md b/LICENSE.md index 8b34ba5f..06f4e3b5 100644 --- a/LICENSE.md +++ b/LICENSE.md @@ -1,35 +1,56 @@ -Unless otherwise noted, This software is in the public domain because it -contains materials that originally came from the United States Geological -Survey, an agency of the United States Department of Interior. For more -information, see the official USGS copyright policy at -http://www.usgs.gov/visual-id/credit_usgs.html#copyright +License +======= +Unless otherwise noted, This project is in the public domain in the United +States because it contains materials that originally came from the United +States Geological Survey, an agency of the United States Department of +Interior. For more information, see the official USGS copyright policy at +https://www.usgs.gov/information-policies-and-instructions/copyrights-and-credits -Dependent libraries found in the "lib" directory are distributed under the +Additionally, we waive copyright and related rights in the work +worldwide through the CC0 1.0 Universal public domain dedication. + + +CC0 1.0 Universal Summary +------------------------- + +This is a human-readable summary of the +[Legal Code (read the full text)][1]. + + +### No Copyright + +The person who associated a work with this deed has dedicated the work to +the public domain by waiving all of his or her rights to the work worldwide +under copyright law, including all related and neighboring rights, to the +extent allowed by law. + +You can copy, modify, distribute and perform the work, even for commercial +purposes, all without asking permission. + + +### Other Information + +In no way are the patent or trademark rights of any person affected by CC0, +nor are the rights that other persons may have in the work or in how the +work is used, such as publicity or privacy rights. + +Unless expressly stated otherwise, the person who associated a work with +this deed makes no warranties about the work, and disclaims liability for +all uses of the work, to the fullest extent permitted by applicable law. +When using or citing the work, you should not imply endorsement by the +author or the affirmer. + + +### Liraries + +Libraries found in the "lib" directory are distributed under the open source (or open source-like) licenses/agreements. Appropriate license agreements for each library can be found in the "lib" directory. Other dependencies can be found in `build.gradle`. - - eqmessageutils.jar (https://github.com/usgs/eqmessageutils) + - MessageUtils.jar (https://github.com/usgs/eqmessageutils) - QWFileOutClient.jar (http://www.isti2.com/QWIDS/current_dist/QWIDS_EIDS_Summary.html#Open) - - sqlitejdbc-v056.jar (https://github.com/xerial/sqlite-jdbc) - cap-library-r11.jar (https://github.com/google/cap-library) - - -Disclaimers ------------ -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. - -Information provided by this software may be preliminary or provisional and is -subject to revision. It is being provided to meet the need for timely best -science. The information has not received final approval by the U.S. Geological -Survey (USGS) and is provided on the condition that neither the USGS nor the -U.S. Government shall be held liable for any damages resulting from the -authorized or unauthorized use of the information. +[1]: https://creativecommons.org/publicdomain/zero/1.0/legalcode diff --git a/build.gradle b/build.gradle index 8ce487a1..6e53bbeb 100644 --- a/build.gradle +++ b/build.gradle @@ -40,7 +40,7 @@ dependencies { implementation "org.glassfish.tyrus.bundles:tyrus-standalone-client:1.17" // database drivers runtimeOnly "mysql:mysql-connector-java:5.1.47" - runtimeOnly "org.xerial:sqlite-jdbc:3.23.1" + runtimeOnly "org.xerial:sqlite-jdbc:3.39.2.1" // ssh keys implementation "ch.ethz.ganymed:ganymed-ssh2:262" // nats @@ -72,7 +72,7 @@ tasks.withType(JavaCompile) { // coverage reports jacoco { - toolVersion "0.8.7" + toolVersion "0.8.8" } jacocoTestReport { reports { diff --git a/code.json b/code.json index e455fd80..dce29d81 100644 --- a/code.json +++ b/code.json @@ -3,31 +3,31 @@ "name": "Product Distribution Layer", "organization": "U.S. Geological Survey", "description": "Distribution system used for derived earthquake information", - "version": "v2.7.12", + "version": "v2.8.0", "status": "Production", "permissions": { "usageType": "openSource", "licenses": [ { "name": "Public Domain, CC0-1.0", - "URL": "https://github.com/usgs/pdl/blob/master/LICENSE.md" + "URL": "https://code.usgs.gov/ghsc/hazdev/pdl/-/raw/master/LICENSE.md" } ] }, "homepageURL": "https://usgs.github.io/pdl/", - "downloadURL": "https://github.com/usgs/pdl/archive/master.zip", - "disclaimerURL": "https://github.com/usgs/pdl/blob/master/LICENSE.md", - "repositoryURL": "https://github.com/usgs/pdl.git", + "downloadURL": "https://code.usgs.gov/ghsc/hazdev/pdl/-/archive/master/pdl-master.zip", + "disclaimerURL": "https://code.usgs.gov/ghsc/hazdev/pdl/-/raw/master/DISCLAIMER.md", + "repositoryURL": "https://code.usgs.gov/ghsc/hazdev/pdl.git", "vcs": "git", "laborHours": 0, "tags": ["earthquake", "distribution", "quakeml"], "languages": ["java"], "contact": { - "name": "Jeremy Fee", - "email": "jmfee@usgs.gov" + "name": "Hazards Development Team", + "email": "gs-haz_dev_team_group@usgs.gov" }, "date": { - "metadataLastUpdated": "2022-09-16" + "metadataLastUpdated": "2022-10-11" } } ] diff --git a/docs/userguide/configuration.html b/docs/userguide/configuration.html index fee84505..638dbc34 100644 --- a/docs/userguide/configuration.html +++ b/docs/userguide/configuration.html @@ -11,6 +11,7 @@

Configuration

« Back to User Guide

+

Navigation

  • Indexer Components @@ -38,6 +40,7 @@

    Navigation

  • Reliable Listener Configuration
  • +

    Format

    The configuration file uses an INI style syntax for properties. Any lines that begin with a semi-colon are treated as comments and @@ -232,6 +235,7 @@

    Global Properties

    Distribution Components

    +

    NotificationIndex

    A notification index is used by Hubs and Clients to track available products.

    @@ -243,7 +247,56 @@

    NotificationIndex

    The SQLite database file. Default is "pd_index.db".
    + +
    gov.usgs.earthquake.aws.JsonNotificationIndex
    +
    +
    +
    driver
    +
    + JDBC Driver class for connection. + (Optional, default org.sqlite.JDBC) +
    + +
    table
    +
    + Database table name. + (Optional, default notification) +
    + +
    url
    +
    + JDBC Connection URL. + (Optional, default jdbc:sqlite:json_notification_index.db) +
    +
    +
    + +

    + The JsonNotificationIndex schema can be created automatically when using Sqlite. + For mysql or other databases, it is recommended to create the schema + separately and run with a user with only SELECT,INSERT,UPDATE, and DELETE + permissions. +

    + +
    +CREATE TABLE IF NOT EXISTS notification
    + (id INTEGER PRIMARY KEY AUTO_INCREMENT
    + , created VARCHAR(255)
    + , expires VARCHAR(255)
    + , source VARCHAR(255)
    + , type VARCHAR(255)
    + , code VARCHAR(255)
    + , updatetime BIGINT
    + , url TEXT
    + , data LONGTEXT
    + , KEY source_index (source)
    + , KEY type_index (type)
    + , KEY code_index (code)
    + , KEY expires_index (expires)
    + ) ENGINE=innodb CHARSET=utf8;
    +		
    +

    NotificationListener

    Listeners are used by Hubs and Clients

    @@ -512,6 +565,7 @@

    Listener Types

    +

    NotificationReceiver

    Receivers are used by Hubs and Clients.

    @@ -599,7 +653,10 @@

    Receiver Types

    url
    -
    Base URL for hub.
    +
    Base URL for hub. + + e.g. wss://earthquake.usgs.gov/pdl/west/subscribe +
    trackingFileName
    File where current position in hub notifications is tracked.
    @@ -618,7 +675,7 @@

    Receiver Types

    initialCatchUpAge
    - (Optional, default 7.0 days) how far back in time to start when + (Optional, default 7.0 days, max 30.0) how far back in time to start when connecting to server. subsequent attempts start at current position, which is tracked.
    @@ -854,7 +911,56 @@

    ProductStorage

    + +
    gov.usgs.earthquake.aws.JsonProductStorage
    +
    +

    + A JsonProductStorage stores Json product metadata in a database, + instead of the filesystem, and works best with an AwsProductReceiver. +

    + +
    +
    driver
    +
    + JDBC Driver class for connection. + (Optional, default org.sqlite.JDBC) +
    + +
    table
    +
    + Database table name. + (Optional, default product) +
    + +
    url
    +
    + JDBC Connection URL. + (Optional, default jdbc:sqlite:json_product_index.db) +
    +
    + +

    + The JsonProductStorage schema can be created automatically when using Sqlite. + For mysql or other databases, it is recommended to create the schema + separately and run with a user with only SELECT,INSERT,UPDATE, and DELETE + permissions. +

    + +
    +CREATE TABLE IF NOT EXISTS product
    + (id INTEGER PRIMARY KEY AUTO_INCREMENT
    + , source VARCHAR(255)
    + , type VARCHAR(255)
    + , code VARCHAR(255)
    + , updatetime BIGINT
    + , data LONGTEXT
    + , UNIQUE KEY product_index (source, type, code, updatetime)
    + ) ENGINE=innodb CHARSET=utf8;
    +			
    +
    + +

    StorageListener

    Listeners are notified when products are added-to, or removed from the ProductStorage.

    @@ -897,6 +1003,7 @@

    StorageListener

    +

    ProductKey

    Keys are used to verify signed products, and more specifically by FileProductStorage.

    @@ -919,6 +1026,54 @@

    ProductKey

    +

    TrackingIndex

    +

    The TrackingIndex is used to store component state in a database.

    + +
    +
    gov.usgs.earthquake.aws.TrackingIndex
    +
    +

    + Used by an AwsProductReceiver. +

    + +
    +
    driver
    +
    + JDBC Driver class for connection. + (Optional, default org.sqlite.JDBC) +
    + +
    table
    +
    + Database table name. + (Optional, default tracking) +
    + +
    url
    +
    + JDBC Connection URL. + (Optional, default jdbc:sqlite:json_product_index.db) +
    +
    + +

    + The TrackingIndex schema can be created automatically when using Sqlite. + For mysql or other databases, it is recommended to create the schema + separately and run PDL with a user with only SELECT,INSERT,UPDATE, and DELETE + permissions. +

    + +
    +CREATE TABLE IF NOT EXISTS tracking
    +	(id INTEGER PRIMARY KEY AUTO_INCREMENT
    +	, created VARCHAR(255)
    +	, name VARCHAR(255)
    +	, data LONGTEXT
    +	, UNIQUE KEY name_index (name)
    +	) ENGINE=innodb CHARSET=utf8;
    +			
    +
    +

    Indexer Components

    diff --git a/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java b/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java index 1ab76cea..bfca41a6 100644 --- a/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java +++ b/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java @@ -273,44 +273,57 @@ public synchronized void addNotification(Notification notification) */ @Override public synchronized void removeNotification(Notification notification) throws Exception { - // all notifications - Instant expires = notification.getExpirationDate().toInstant(); - ProductId id = notification.getProductId(); - // json only - Instant created = null; - Product product = null; - // url only - URL url = null; - if (notification instanceof JsonNotification) { - JsonNotification jsonNotification = (JsonNotification) notification; - created = jsonNotification.created; - product = jsonNotification.product; - } else if (notification instanceof URLNotification) { - url = ((URLNotification) notification).getProductURL(); - } + final List notifications = new ArrayList<>(); + notifications.add(notification); + this.removeNotifications(notifications); + } + + /** + * Remove notifications from index. + * + * Tracker URLs are ignored. + * @param notifications + * notifications to be removed from index + * @throws Exception if error occurs + */ + @Override + public synchronized void removeNotifications(List notifications) throws Exception { // prepare statement final String sql = "DELETE FROM " + this.table + " WHERE created=? AND expires=? AND source=? AND type=? AND code=?" - + " AND updatetime=? AND url=? AND data" - + (product == null ? " IS NULL" : "=?"); + + " AND updatetime=? AND url=?"; beginTransaction(); try (final PreparedStatement statement = getConnection().prepareStatement(sql)) { try { statement.setQueryTimeout(60); - // set parameters - statement.setString(1, created != null ? created.toString() : ""); - statement.setString(2, expires.toString()); - statement.setString(3, id.getSource()); - statement.setString(4, id.getType()); - statement.setString(5, id.getCode()); - statement.setLong(6, id.getUpdateTime().getTime()); - statement.setString(7, url != null ? url.toString() : ""); - if (product != null) { - statement.setString(8, - new JsonProduct().getJsonObject(product).toString()); + + for (Notification notification : notifications) { + // all notifications + Instant expires = notification.getExpirationDate().toInstant(); + ProductId id = notification.getProductId(); + // json only + Instant created = null; + // url only + URL url = null; + if (notification instanceof JsonNotification) { + JsonNotification jsonNotification = (JsonNotification) notification; + created = jsonNotification.created; + } else if (notification instanceof URLNotification) { + url = ((URLNotification) notification).getProductURL(); + } + + // set parameters + statement.setString(1, created != null ? created.toString() : ""); + statement.setString(2, expires.toString()); + statement.setString(3, id.getSource()); + statement.setString(4, id.getType()); + statement.setString(5, id.getCode()); + statement.setLong(6, id.getUpdateTime().getTime()); + statement.setString(7, url != null ? url.toString() : ""); + statement.addBatch(); } // execute - statement.executeUpdate(); + statement.executeBatch(); commitTransaction(); } catch (SQLException e) { LOGGER.log(Level.WARNING, "Exception removing notification", e); @@ -440,7 +453,9 @@ public synchronized List findNotifications( sql = "SELECT DISTINCT" + " '' as created, expires, source, type, code, updateTime" + ", '' as url, null as data" - + " FROM " + this.table; + + " FROM " + this.table + + " WHERE expires > ?"; + values.add(Instant.now().toString()); } // prepare statement beginTransaction(); @@ -478,7 +493,7 @@ public synchronized List findNotifications( */ @Override public synchronized List findExpiredNotifications() throws Exception { - final String sql = "SELECT * FROM " + this.table + " WHERE expires <= ? LIMIT 1000"; + final String sql = "SELECT * FROM " + this.table + " WHERE expires <= ? LIMIT 5000"; // prepare statement beginTransaction(); try (final PreparedStatement statement = getConnection().prepareStatement(sql)) { @@ -572,7 +587,9 @@ public synchronized List getMissingNotifications( + " '' as created, t.expires, t.source, t.type, t.code, t.updateTime" + ", '' as url, null as data" + " FROM " + this.table + " t" - + " WHERE NOT EXISTS (" + // only missing if not expired + + " WHERE t.expires > ?" + + " AND NOT EXISTS (" + "SELECT * FROM " + otherTable + " WHERE source=t.source AND type=t.type" + " AND code=t.code AND updatetime=t.updateTime" @@ -582,6 +599,10 @@ public synchronized List getMissingNotifications( try (final PreparedStatement statement = getConnection().prepareStatement(sql)) { try { statement.setQueryTimeout(1800); + + // set parameters + statement.setString(1, Instant.now().toString()); + // execute and commit if successful final List notifications = getNotifications(statement); commitTransaction(); diff --git a/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationListener.java b/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationListener.java index 1ddc08e7..f192d48a 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationListener.java +++ b/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationListener.java @@ -33,7 +33,7 @@ * @see gov.usgs.earthquake.product.AbstractListener */ public class DefaultNotificationListener extends AbstractListener implements - NotificationListener { + NotificationListener, NotificationIndexCleanup.Listener { /** Logging object. */ private static final Logger LOGGER = Logger @@ -73,6 +73,9 @@ public class DefaultNotificationListener extends AbstractListener implements /** Timer that schedules sender cleanup task. */ private Timer cleanupTimer = null; + /** Notification index cleanup. */ + private NotificationIndexCleanup notificationCleanup = null; + /** How many products to process at the same time. */ private int concurrentProducts = 1; @@ -266,43 +269,26 @@ protected void onAfterProcessNotification(final Notification notification) * @param notification to be removed * @throws Exception if error occurs */ - protected void onExpiredNotification(final Notification notification) + @Override + public void onExpiredNotification(final Notification notification) throws Exception { // nothing to do } - /** + /** * Periodic cleanup task. * * Called every cleanupInterval milliseconds. */ - public void cleanup() { + public void cleanup() throws Exception { LOGGER.finer("[" + getName() + "] running listener cleanup"); - try { - if (notificationIndex != null) { - Iterator iter = notificationIndex - .findExpiredNotifications().iterator(); - while (iter.hasNext()) { - Notification notification = iter.next(); - - // let subclasses remove other stuff first - onExpiredNotification(notification); - - // remove expired notification from index - notificationIndex.removeNotification(notification); - - if (LOGGER.isLoggable(Level.FINEST)) { - LOGGER.finest("[" - + getName() - + "] removed expired notification from sender index " - + notification.getProductId().toString()); - } - } - } - } catch (Exception e) { - LOGGER.log(Level.WARNING, "Exception during listener cleanup", e); + if (this.notificationCleanup == null) { + this.notificationCleanup = new NotificationIndexCleanup(this.notificationIndex, this); + this.notificationCleanup.startup(); + } else { + this.notificationCleanup.wakeUp(); } - }; + } @Override public void startup() throws Exception { @@ -320,7 +306,7 @@ public void run() { cleanup(); } catch (Exception e) { LOGGER.log(Level.WARNING, "[" + getName() - + "] exception during sender cleanup", e); + + "] exception during cleanup", e); } } }, 0, cleanupInterval); @@ -330,6 +316,15 @@ public void run() { @Override public void shutdown() throws Exception { super.shutdown(); + if (this.notificationCleanup != null) { + try { + this.notificationCleanup.shutdown(); + } catch (Exception e) { + LOGGER.log(Level.INFO, "[" + getName() + "] exception stopping notification cleanup", e); + } finally { + this.notificationCleanup = null; + } + } try { this.notificationIndex.shutdown(); } catch (Exception e) { diff --git a/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationReceiver.java b/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationReceiver.java index 743a8d4e..f119fca4 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationReceiver.java +++ b/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationReceiver.java @@ -50,7 +50,7 @@ * allocated one thread to process notifications from this queue. */ public class DefaultNotificationReceiver extends DefaultConfigurable implements - NotificationReceiver { + NotificationReceiver, NotificationIndexCleanup.Listener { /** Logging object. */ private static final Logger LOGGER = Logger @@ -119,6 +119,9 @@ public class DefaultNotificationReceiver extends DefaultConfigurable implements /** Timer that schedules receiver cleanup task. */ private Timer receiverCleanupTimer = new Timer(); + /** Notification cleanup */ + private NotificationIndexCleanup notificationCleanup = null; + private int connectTimeout = Integer.parseInt(DEFAULT_CONNECT_TIMEOUT); private int readTimeout = Integer.parseInt(DEFAULT_READ_TIMEOUT); @@ -235,29 +238,36 @@ public String getListenerQueueStatus() { * removed. * * @throws Exception - * if productStorage or notificationIndex throw an Exception. + * if NotificationIndexCleanup throws an Exception. */ public void removeExpiredNotifications() throws Exception { LOGGER.fine("[" + getName() + "] running receiver cleanup"); - Iterator iter = notificationIndex - .findExpiredNotifications().iterator(); - while (iter.hasNext()) { - Notification notification = iter.next(); - if (!(notification instanceof URLNotification)) { - // if it isn't a url notification, it's also in storage - productStorage.removeProduct(notification.getProductId()); - if (LOGGER.isLoggable(Level.FINEST)) { - LOGGER.finest("[" + getName() - + "] removed expired product from receiver cache " - + notification.getProductId().toString()); - } - } + // use NotificationIndexCleanup to manage cleanup in separate thread + if (this.notificationCleanup == null) { + this.notificationCleanup = new NotificationIndexCleanup(this.notificationIndex, this); + this.notificationCleanup.startup(); + } else { + this.notificationCleanup.wakeUp(); + } + } - // remove expired notification from index - notificationIndex.removeNotification(notification); + /** + * Callback from the NotificationIndexCleanup thread. + * + * Checks if Notification refers to a product in storage, + * which should also be removed. + * + * @param notification + * expired notification about to be removed. + * @throws Exception + */ + public void onExpiredNotification(final Notification notification) throws Exception { + if (!(notification instanceof URLNotification)) { + // if it isn't a url notification, it's also in storage + productStorage.removeProduct(notification.getProductId()); if (LOGGER.isLoggable(Level.FINEST)) { LOGGER.finest("[" + getName() - + "] removed expired notification from receiver index " + + "] removed expired product from receiver cache " + notification.getProductId().toString()); } } @@ -593,7 +603,13 @@ public void configure(Config config) throws Exception { public void shutdown() throws Exception { receiverCleanupTimer.cancel(); - + if (notificationCleanup != null) { + try { + notificationCleanup.shutdown(); + notificationCleanup = null; + } catch (Exception ignore) { + } + } try { notifier.shutdown(); } catch (Exception ignore) { diff --git a/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationSender.java b/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationSender.java index 9af3a0a2..9c4f8505 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationSender.java +++ b/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationSender.java @@ -178,7 +178,7 @@ protected void onAfterProcessNotification(final Notification notification) { * @throws Exception if something goes wrong */ @Override - protected void onExpiredNotification(final Notification notification) throws Exception{ + public void onExpiredNotification(final Notification notification) throws Exception{ List notifications = getNotificationIndex() .findNotifications(notification.getProductId()); if (notifications.size() <= 1) { diff --git a/src/main/java/gov/usgs/earthquake/distribution/ExecutorListenerNotifier.java b/src/main/java/gov/usgs/earthquake/distribution/ExecutorListenerNotifier.java index 37487376..d13ba496 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/ExecutorListenerNotifier.java +++ b/src/main/java/gov/usgs/earthquake/distribution/ExecutorListenerNotifier.java @@ -236,8 +236,10 @@ public void startup() throws Exception { NotificationIndex listenerIndex = ((DefaultNotificationListener) gracefulListeners.get(0)) .getNotificationIndex(); - if (listenerIndex instanceof JsonNotificationIndex) { - // get intersection + if (listenerIndex instanceof JsonNotificationIndex + && !((JsonNotificationIndex) listenerIndex).getDriver().contains("sqlite") + ) { + // get intersection when potentially sharing database try { allNotifications = ((JsonNotificationIndex) index).getMissingNotifications( diff --git a/src/main/java/gov/usgs/earthquake/distribution/HeartbeatListener.java b/src/main/java/gov/usgs/earthquake/distribution/HeartbeatListener.java index 09061705..128a6a31 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/HeartbeatListener.java +++ b/src/main/java/gov/usgs/earthquake/distribution/HeartbeatListener.java @@ -200,7 +200,8 @@ public String formatHeartbeatOutput() { * purge heartbeat key/values older than storageTimeout, also purging empty * heartbeats */ - public void cleanup() { + @Override + public void cleanup() throws Exception { super.cleanup(); if (this.storageTimeout == 0) { diff --git a/src/main/java/gov/usgs/earthquake/distribution/JDBCNotificationIndex.java b/src/main/java/gov/usgs/earthquake/distribution/JDBCNotificationIndex.java index 7c780389..b7c797a1 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/JDBCNotificationIndex.java +++ b/src/main/java/gov/usgs/earthquake/distribution/JDBCNotificationIndex.java @@ -655,6 +655,24 @@ public synchronized void removeNotification(Notification notification) } } + /** + * Remove notifications from the index. + * + * All matching notifications should be removed from the index. + * + * @param notifications + * the notifications to remove. + * @throws Exception + * if an error occurs while removing the notifications. + * @see gov.usgs.earthquake.distribution.NotificationIndex + */ + public synchronized void removeNotifications(List notifications) + throws Exception { + for (Notification notification : notifications) { + this.removeNotification(notification); + } + } + /** * Search the index for notifications matching id. * diff --git a/src/main/java/gov/usgs/earthquake/distribution/NotificationIndex.java b/src/main/java/gov/usgs/earthquake/distribution/NotificationIndex.java index 6a642daa..c9adb044 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/NotificationIndex.java +++ b/src/main/java/gov/usgs/earthquake/distribution/NotificationIndex.java @@ -10,7 +10,7 @@ /** * Stores and retrieves Notifications. - * + * * This is typically used by a NotificationReceiver to track its Notifications, * but may also be used by NotificationListeners. Each object should maintain a * separate NotificationIndex. @@ -19,10 +19,10 @@ public interface NotificationIndex extends Configurable { /** * Add a notification to the index. - * + * * If an identical notification is already in the index, the implementation * may choose whether or not to store the duplicate information. - * + * * @param notification * the notification to add. * @throws Exception @@ -33,9 +33,9 @@ public void addNotification(final Notification notification) /** * Remove a notification from the index. - * + * * All matching notifications should be removed from the index. - * + * * @param notification * the notification to remove. * @throws Exception @@ -44,11 +44,22 @@ public void addNotification(final Notification notification) public void removeNotification(final Notification notification) throws Exception; + /** + * Remove notifications from index. + * + * Tracker URLs are ignored. + * @param notifications + * notifications to be removed from index + * @throws Exception if error occurs + */ + public void removeNotifications(List notifications) + throws Exception; + /** * Search the index for notifications matching id. - * + * * If more than one notification matches, all should be returned. - * + * * @param id * the ProductId to find. * @return a list of matching notifications. @@ -61,13 +72,13 @@ public List findNotifications(final ProductId id) /** * Search the index for notifications matching the sources, types, and * codes. - * + * * Only one notification for each unique ProductId * (source+type+code+updateTime) should be returned. If sources, types, * and/or codes are null, that parameter should be considered a wildcard. If * sources, types, and codes are all null, a notification for each unique * ProductId in the index should be returned. - * + * * @param source * sources to include, or all if null. * @param type @@ -84,13 +95,13 @@ public List findNotifications(final String source, /** * Search the index for notifications matching the sources, types, and * codes. - * + * * Only one notification for each unique ProductId * (source+type+code+updateTime) should be returned. If sources, types, * and/or codes are null, that parameter should be considered a wildcard. If * sources, types, and codes are all null, a notification for each unique * ProductId in the index should be returned. - * + * * @param sources * sources to include, or all if null. * @param types @@ -103,12 +114,12 @@ public List findNotifications(final String source, */ public List findNotifications(List sources, List types, List codes) throws Exception; - + /** * Search the index for expired notifications. - * + * * All expired notifications, even if duplicate, should be returned. - * + * * @return a list of expired notifications. * @throws Exception * if an error occurs while searching the index. diff --git a/src/main/java/gov/usgs/earthquake/distribution/NotificationIndexCleanup.java b/src/main/java/gov/usgs/earthquake/distribution/NotificationIndexCleanup.java new file mode 100644 index 00000000..df2649f3 --- /dev/null +++ b/src/main/java/gov/usgs/earthquake/distribution/NotificationIndexCleanup.java @@ -0,0 +1,150 @@ +package gov.usgs.earthquake.distribution; + +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * NotificationIndexCleanup manages cleaning up expired notifications. + * + * Uses background thread to remove expired notifications while they exist, + * then uses wait/notify to pause until shutdown() or wakeUp() methods are + * called. + * + * NOTE: this class does not schedule periodic cleanup, and the wakeUp() method + * must be called periodically. + */ +public class NotificationIndexCleanup implements Runnable { + + private static final Logger LOGGER = Logger.getLogger(NotificationIndexCleanup.class.getName()); + + public final NotificationIndex index; + + // listener that can take additional actions during cleanup + public final Listener listener; + + // object used to synchronize state access between threads + public final Object syncObject = new Object(); + // thread where cleanup loop runs + public Thread cleanupThread = null; + // whether thread should stop running + private boolean stopThread = false; + + public NotificationIndexCleanup(final NotificationIndex index, final Listener listener) { + this.index = index; + this.listener = listener; + } + + /** + * Notification cleanup thread loop. + * + * This method blocks and should probably not be called by you. + */ + public void run() { + final String indexName = this.index.getName(); + + LOGGER.finer(() -> "[" + indexName + "] NotificationIndexCleanup starting"); + // run until thread stopped + while (!stopThread) { + List expiredNotifications = null; + synchronized (syncObject) { + try { + expiredNotifications = this.index.findExpiredNotifications(); + } catch (Exception e) { + LOGGER.log(Level.INFO, e, () -> "[" + indexName + "] exception finding expired notifications"); + } + if (expiredNotifications == null || expiredNotifications.size() == 0) { + // Wait for expired notifications to process + try { + syncObject.wait(); + } catch (InterruptedException ignore) { + // signal from another thread (stopThread checked above) + continue; + } + } + } + + // remove batch of expired notifications + final List removed = new ArrayList<>(expiredNotifications.size()); + if (this.listener == null) { + removed.addAll(expiredNotifications); + } else { + // notify listener, remove only those successfully processed by listener + for (final Notification expired : expiredNotifications) { + synchronized (syncObject) { + if (stopThread) { + break; + } + } + try { + this.listener.onExpiredNotification(expired); + removed.add(expired); + } catch (Exception e) { + LOGGER.log(Level.WARNING, e, () -> "[" + indexName + "] Listener exception processing expired notification"); + } + } + } + try { + // remove in batch + this.index.removeNotifications(removed); + LOGGER.fine(() -> "[" + indexName + "] Removed " + removed.size() + " expired notifications"); + } catch (Exception e) { + LOGGER.log(Level.WARNING, e, () -> "[" + indexName + "] Exception removing expired notifications"); + } + } + LOGGER.finer(() -> "[" + indexName + "] NotificationIndexCleanup exiting"); + this.cleanupThread = null; + } + + /** + * Start cleanup process. + * + * @throws Exception + */ + public void startup() throws Exception { + synchronized (syncObject) { + if (this.cleanupThread != null) { + throw new IllegalStateException("Already started"); + } + // start thread + stopThread = false; + this.cleanupThread = new Thread(this); + } + this.cleanupThread.start(); + } + + /** + * Stop cleanup process. + * + * @throws Exception + */ + public void shutdown() throws Exception { + synchronized (syncObject) { + if (this.cleanupThread == null) { + throw new IllegalStateException("Already stopped"); + } + // stop thread + stopThread = true; + this.cleanupThread.interrupt(); + } + this.cleanupThread.join(); + } + + /** + * Wake up the background thread if it is waiting. + */ + public void wakeUp() { + synchronized (syncObject) { + syncObject.notify(); + } + } + + /** + * Interface for cleanup listeners to take additional steps before a + * notification is removed. + */ + public static interface Listener { + public void onExpiredNotification(final Notification expired) throws Exception; + } +} diff --git a/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java b/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java index e2763f09..85918809 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java +++ b/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java @@ -64,7 +64,7 @@ public class ProductClient extends DefaultConfigurable implements ProductClientMBean, Bootstrappable { /** The "release" version number. */ - public static final String RELEASE_VERSION = "Version 2.7.12 2022-09-16"; + public static final String RELEASE_VERSION = "Version 2.8.0 2022-10-11"; /** Property name used on products for current RELEASE_VERSION. */ public static final String PDL_CLIENT_VERSION_PROPERTY = "pdl-client-version"; diff --git a/src/main/java/gov/usgs/earthquake/indexer/ExtentIndexerListener.java b/src/main/java/gov/usgs/earthquake/indexer/ExtentIndexerListener.java index c9e2deba..16fd6c7f 100644 --- a/src/main/java/gov/usgs/earthquake/indexer/ExtentIndexerListener.java +++ b/src/main/java/gov/usgs/earthquake/indexer/ExtentIndexerListener.java @@ -47,7 +47,9 @@ public void configure(Config config) throws Exception { */ @Override protected void onBeforeProcessThreadStart() throws Exception { - setLastIndexId(((ExtentIndex) productIndex).getLastExtentIndexId()); + long lastIndexId = ((ExtentIndex) productIndex).getLastExtentIndexId(); + LOGGER.fine("[" + getName() + "] last index id " + lastIndexId); + setLastIndexId(lastIndexId); } /** @@ -63,12 +65,12 @@ public void processProduct(ProductSummary product) throws Exception { ExtentSummary extent = new ExtentSummary(product); if (extent.isValid()) { - LOGGER.log(Level.FINE, "[" + getName() + "] adding product " + LOGGER.log(Level.FINEST, "[" + getName() + "] adding product " + product.getId().toString() + " to extent table"); ((ExtentIndex) productIndex).addExtentSummary(extent); } else { - LOGGER.log(Level.FINE, "[" + getName() + "] product " + LOGGER.log(Level.FINEST, "[" + getName() + "] product " + product.getId().toString() + " has no extent information; won't add to extent table"); } @@ -76,4 +78,4 @@ public void processProduct(ProductSummary product) throws Exception { setLastIndexId(product.getIndexId()); } -} \ No newline at end of file +} diff --git a/src/main/java/gov/usgs/earthquake/indexer/JDBCProductIndex.java b/src/main/java/gov/usgs/earthquake/indexer/JDBCProductIndex.java index 08406551..66801ea2 100644 --- a/src/main/java/gov/usgs/earthquake/indexer/JDBCProductIndex.java +++ b/src/main/java/gov/usgs/earthquake/indexer/JDBCProductIndex.java @@ -376,7 +376,7 @@ public synchronized List getUnassociatedProducts( final List clauseList = buildProductClauses(query); // Add the unassociated quantifier to the clause list clauseList.add("eventId IS NULL"); - final String sql = buildProductQuery(clauseList); + final String sql = buildProductQuery(query, clauseList); try ( final PreparedStatement statement = getConnection().prepareStatement(sql); @@ -430,10 +430,10 @@ public synchronized List getProducts(ProductIndexQuery query) */ public synchronized List getProducts(ProductIndexQuery query, final boolean loadDetails) throws Exception { - final List clauseList = buildProductClauses(query); - final String sql = buildProductQuery(clauseList); + final String sql = buildProductQuery(query); final List products = new LinkedList(); + LOGGER.finer("Executing query " + sql); try ( final PreparedStatement statement = getConnection().prepareStatement(sql); ) { @@ -998,51 +998,50 @@ else if (query.getResultType() == ProductIndexQuery.RESULT_TYPE_SUPERSEDED) { } /** - * Create the full SELECT query for the products table using the clauseList - * as the WHERE clause + * Create the full SELECT query for the products table using the default clauseList. * - * @param clauseList - * List of Strings to be AND'd together in the WHERE clause - * @param orderby - * Complete ORDER BY clause to be added after the WHERE clause + * @param query + * Query to build. * @return String containing the full SELECT query + * @see #buildProductClauses(ProductIndexQuery) */ - protected String buildProductQuery(List clauseList, String orderby) { - // Join all the clauses into a WHERE clause - StringBuilder whereClause = new StringBuilder(); - String and = " AND "; - boolean first = true; - for (String clause : clauseList) { - if (!first) { - whereClause.append(and); - } else { - first = false; - } - whereClause.append(clause); - } + protected String buildProductQuery(final ProductIndexQuery query) { - String query_prefix = String - .format("SELECT * FROM %s p", SUMMARY_TABLE); - String query_suffix = ""; - if (whereClause.length() > 0) { - query_suffix = String.format(" WHERE %s", whereClause.toString()); - } - String query_text = query_prefix + query_suffix + " " + orderby; - - return query_text; + final List clauseList = buildProductClauses(query); + return buildProductQuery(query, clauseList); } /** - * Create the full SELECT query for the products table using the clauseList - * as the WHERE clause. This method is a wrapper for - * {@link #buildProductQuery(List, String)} with an empty - * orderby string + * Create the full SELECT query for the products table using a custom clauseList. * + * @param query + * Query to build. * @param clauseList List of clauses for WHERE * @return String containing the full SELECT query */ - protected String buildProductQuery(List clauseList) { - return buildProductQuery(clauseList, ""); + protected String buildProductQuery(final ProductIndexQuery query, final List clauseList) { + final StringBuffer sql = new StringBuffer(); + + sql.append("SELECT * FROM " + SUMMARY_TABLE + " p"); + + // optional where + if (clauseList.size() > 0) { + sql.append(" WHERE ").append(String.join(" AND ", clauseList)); + } + + // optional order by + String queryOrderBy = query.getOrderBy(); + if (queryOrderBy != null) { + sql.append(" ORDER BY ").append(queryOrderBy); + } + + // limit is after order by + Integer queryLimit = query.getLimit(); + if (queryLimit != null) { + sql.append(" LIMIT ").append(queryLimit); + } + + return sql.toString(); } /** diff --git a/src/main/java/gov/usgs/earthquake/indexer/ReliableIndexerListener.java b/src/main/java/gov/usgs/earthquake/indexer/ReliableIndexerListener.java index 44d19699..dbace358 100644 --- a/src/main/java/gov/usgs/earthquake/indexer/ReliableIndexerListener.java +++ b/src/main/java/gov/usgs/earthquake/indexer/ReliableIndexerListener.java @@ -37,7 +37,7 @@ public class ReliableIndexerListener extends DefaultIndexerListener implements R protected static final Logger LOGGER = Logger .getLogger(ReliableIndexerListener.class.getName()); - private static final int PRODUCTS_PER_QUERY = 10; + private static final int PRODUCTS_PER_QUERY = 1000; private boolean stopThread = false; private long lastIndexId = -1; @@ -123,8 +123,10 @@ public void run() { for(ProductSummary summary : productList) { LOGGER.log(Level.FINEST,"[" + getName() + "] preparing to process product " + summary.getIndexId()); //Check for shutdown every iteration so we don't hog shutdown time - if (stopThread) { - break; + synchronized(syncObject) { + if (stopThread) { + break; + } } try { //Process the product types we're told to in configuration @@ -161,7 +163,10 @@ public void run() { public void startup() throws Exception{ super.startup(); this.onBeforeProcessThreadStart(); - this.processThread = new Thread(this); + synchronized(syncObject) { + stopThread = false; + this.processThread = new Thread(this); + } this.processThread.start(); } @@ -174,9 +179,9 @@ public void startup() throws Exception{ public void shutdown() throws Exception { try { LOGGER.log(Level.FINEST,"[" + getName() + "] trying to shut down..."); - stopThread = true; //When the thread is ready, tell it to stop synchronized (syncObject) { + stopThread = true; this.processThread.interrupt(); } this.processThread.join(); @@ -270,7 +275,7 @@ public List getNextProducts() throws Exception{ */ public void processProduct(final ProductSummary product) throws Exception { //Do stuff - LOGGER.log(Level.FINE,"[" + getName() + "] processing product " + product.getId()); + LOGGER.log(Level.FINER,"[" + getName() + "] processing product " + product.getId()); } diff --git a/src/main/java/gov/usgs/util/logging/SimpleLogFormatter.java b/src/main/java/gov/usgs/util/logging/SimpleLogFormatter.java index 6e0558e2..9b1387a4 100644 --- a/src/main/java/gov/usgs/util/logging/SimpleLogFormatter.java +++ b/src/main/java/gov/usgs/util/logging/SimpleLogFormatter.java @@ -16,9 +16,9 @@ /** * Simple(r) log formatter for java.util.logging messages. - * + * * Outputs unique dates once, with all messages sharing that time tab indented below. - * + * * Example Format: *
      * Wed Sep 30 19:31:48 GMT 2009
    @@ -31,7 +31,7 @@
      * INFO    [logging_client] received urn:earthquake-usgs-gov:shakemap-scraper:global:2009medd:1
      * INFO    [shakemap] received urn:earthquake-usgs-gov:shakemap-scraper:global:2009medd:1
      * 
    - * + * */ public class SimpleLogFormatter extends Formatter { @@ -47,7 +47,7 @@ public SimpleLogFormatter() { /** * Format a LogRecord for output. - * + * * @param record * LogRecord to format. * @return formatted LogRecord as String. @@ -72,7 +72,7 @@ public final String format(final LogRecord record) { // add log message buf.append(record.getLevel().toString()); buf.append("\tthread=").append(record.getThreadID()); - buf.append("\t").append(record.getMessage()); + buf.append("\t").append(formatMessage(record)); buf.append("\n"); // output any associated exception diff --git a/src/test/java/gov/usgs/earthquake/aws/JsonNotificationIndexTest.java b/src/test/java/gov/usgs/earthquake/aws/JsonNotificationIndexTest.java index dd567112..d661b193 100644 --- a/src/test/java/gov/usgs/earthquake/aws/JsonNotificationIndexTest.java +++ b/src/test/java/gov/usgs/earthquake/aws/JsonNotificationIndexTest.java @@ -15,7 +15,6 @@ import java.sql.ResultSet; import java.util.ArrayList; import java.util.Date; -import java.util.Iterator; import java.util.List; import org.junit.After; @@ -355,19 +354,18 @@ public void testFindNotificationsByDataLists() throws Exception { // functionality. List found = null; - Iterator iter = null; List sources = null; List types = null; List codes = null; // Check the null null null case. found = index.findNotifications(sources, types, codes); - + final Date now = new Date(); List all = index .getNotifications(_query_allNotifications); - iter = all.iterator(); - while (iter.hasNext()) { - Assert.assertTrue(contains(found, iter.next())); + for (final Notification notification : all) { + Assert.assertTrue(contains(found, notification) + || notification.getExpirationDate().getTime() < now.getTime()); } // Check a search by sources only diff --git a/src/test/java/gov/usgs/earthquake/distribution/NotificationIndexCleanupTest.java b/src/test/java/gov/usgs/earthquake/distribution/NotificationIndexCleanupTest.java new file mode 100644 index 00000000..3602224d --- /dev/null +++ b/src/test/java/gov/usgs/earthquake/distribution/NotificationIndexCleanupTest.java @@ -0,0 +1,181 @@ +package gov.usgs.earthquake.distribution; + +import java.util.ArrayList; +import java.util.Date; +import java.util.LinkedList; +import java.util.List; + +import org.junit.Assert; +import org.junit.Test; + +import gov.usgs.earthquake.product.ProductId; + +public class NotificationIndexCleanupTest { + + /** + * Test thread continues removing expired notifications before waiting. + * + * @throws Exception + */ + @Test + public void testNotificationCleanup() throws Exception{ + final MockNotificationIndex index = new MockNotificationIndex(); + final NotificationIndexCleanup cleanup = new NotificationIndexCleanup(index, null); + // thread should start and wait + cleanup.startup(); + Assert.assertEquals(index.removedNotifications.size(), 0); + + // use this notification multiple times for testing + final Notification notification = new DefaultNotification( + new ProductId("source", "type", "code"), + new Date(), + null + ); + final List notifications = new LinkedList<>(); + notifications.add(notification); + notifications.add(notification); + + // run cleanup thread, should remove 2 notifications + index.expiredNotificationsReturns.add(notifications); + cleanup.wakeUp(); + Thread.sleep(5L); + Assert.assertEquals(index.removedNotifications.size(), 2); + + // run with multiple findExpiredNotifications returns, should remove all + index.removedNotifications.clear(); + index.expiredNotificationsReturns.add(notifications); + index.expiredNotificationsReturns.add(notifications); + cleanup.wakeUp(); + Thread.sleep(5L); + Assert.assertEquals(index.removedNotifications.size(), 4); + } + + /** + * Test listener is notified before notification is removed. + * + * @throws Exception + */ + @Test + public void testNotificationCleanupListener() throws Exception{ + final MockNotificationIndex index = new MockNotificationIndex(); + // capture notifications listener receives + final LinkedList listenerNotifications = new LinkedList<>(); + final NotificationIndexCleanup cleanup = new NotificationIndexCleanup(index, (Notification expired) -> { + listenerNotifications.add(expired); + // check size before return + Assert.assertTrue(listenerNotifications.size() > index.removedNotifications.size()); + }); + // thread should start and wait + cleanup.startup(); + Assert.assertEquals(0, index.removedNotifications.size()); + Assert.assertEquals(0, listenerNotifications.size()); + // use this notification multiple times for testing + final Notification notification = new DefaultNotification( + new ProductId("source", "type", "code"), + new Date(), + null + ); + final List notifications = new LinkedList<>(); + notifications.add(notification); + notifications.add(notification); + + // run cleanup thread, should remove 2 notifications + index.expiredNotificationsReturns.add(notifications); + cleanup.wakeUp(); + Thread.sleep(5L); + Assert.assertEquals(2, index.removedNotifications.size()); + Assert.assertEquals(2, listenerNotifications.size()); + // run with multiple findExpiredNotifications returns, should remove all + index.removedNotifications.clear(); + listenerNotifications.clear(); + index.expiredNotificationsReturns.add(notifications); + index.expiredNotificationsReturns.add(notifications); + cleanup.wakeUp(); + Thread.sleep(5L); + Assert.assertEquals(4, index.removedNotifications.size()); + Assert.assertEquals(4, listenerNotifications.size()); + } + + /** + * Test notification is not removed when listener throws exception. + * + * @throws Exception + */ + @Test + public void testNotificationCleanupListenerException() throws Exception{ + final MockNotificationIndex index = new MockNotificationIndex(); + // capture notifications listener receives + final LinkedList listenerNotifications = new LinkedList<>(); + final LinkedList listenerThrow = new LinkedList<>(); + final NotificationIndexCleanup cleanup = new NotificationIndexCleanup(index, (Notification expired) -> { + if (listenerThrow.size() > 0) { + throw new Exception("listener error"); + } + listenerNotifications.add(expired); + }); + // thread should start and wait + cleanup.startup(); + Assert.assertEquals(0, index.removedNotifications.size()); + Assert.assertEquals(0, listenerNotifications.size()); + // use this notification multiple times for testing + final Notification notification = new DefaultNotification( + new ProductId("source", "type", "code"), + new Date(), + null + ); + final List notifications = new LinkedList<>(); + notifications.add(notification); + notifications.add(notification); + + // run with listener throwing exceptions + index.expiredNotificationsReturns.add(notifications); + listenerThrow.add(true); + cleanup.wakeUp(); + Thread.sleep(5L); + Assert.assertEquals(0, index.removedNotifications.size()); + Assert.assertEquals(0, listenerNotifications.size()); + // run with multiple findExpiredNotifications returns, should remove all + index.expiredNotificationsReturns.add(notifications); + listenerThrow.clear(); + cleanup.wakeUp(); + Thread.sleep(5L); + Assert.assertEquals(2, index.removedNotifications.size()); + Assert.assertEquals(2, listenerNotifications.size()); + } + + /** Class for testing NotificationIndexCleanup interaction with a NotificationIndex */ + public static class MockNotificationIndex extends JDBCNotificationIndex { + public List> expiredNotificationsReturns = new LinkedList<>(); + public List removedNotifications = new LinkedList<>(); + + public MockNotificationIndex() throws Exception { + super(); + } + + // not using actual database connection for this test + public void startup() throws Exception {} + public void shutdown() throws Exception {} + + /** + * Mock the find expired notification method to return lists from expiredNotificationsReturns. + */ + public List findExpiredNotifications() throws Exception { + if (expiredNotificationsReturns == null || expiredNotificationsReturns.size() == 0) { + return new ArrayList(); + } + return expiredNotificationsReturns.remove(0); + } + + @Override + public void removeNotification(final Notification notification) throws Exception { + removedNotifications.add(notification); + } + + @Override + public void removeNotifications(final List notifications) throws Exception { + removedNotifications.addAll(notifications); + } + + } + +} diff --git a/src/test/java/gov/usgs/earthquake/indexer/JDBCProductIndexTest.java b/src/test/java/gov/usgs/earthquake/indexer/JDBCProductIndexTest.java index f5826e23..3330de36 100644 --- a/src/test/java/gov/usgs/earthquake/indexer/JDBCProductIndexTest.java +++ b/src/test/java/gov/usgs/earthquake/indexer/JDBCProductIndexTest.java @@ -258,6 +258,30 @@ public void getEventsTest() throws Exception { Assert.assertNotNull(event.getIndexId()); } + /** + * Query with limit includes limit clause. + * @throws Exception + */ + @Test + public void limitQueryTest() throws Exception { + query = new ProductIndexQuery(); + query.setLimit(123); + String sql = index.buildProductQuery(query); + Assert.assertTrue(sql.contains(" LIMIT 123")); + } + + /** + * Query with limit includes limit clause. + * @throws Exception + */ + @Test + public void orderByQueryTest() throws Exception { + query = new ProductIndexQuery(); + query.setOrderBy(JDBCProductIndex.SUMMARY_PRODUCT_INDEX_ID); + String sql = index.buildProductQuery(query); + Assert.assertTrue(sql.contains(" ORDER BY id")); + } + /** * Open a connection to a mysql database called productIndex on localhost * with the user: "test" and the password: "test". This test will