diff --git a/code.json b/code.json index 6cfe0366..8a8e3133 100644 --- a/code.json +++ b/code.json @@ -3,7 +3,7 @@ "name": "Product Distribution Layer", "organization": "U.S. Geological Survey", "description": "Distribution system used for derived earthquake information", - "version": "v2.7.4", + "version": "v2.7.5", "status": "Production", "permissions": { "usageType": "openSource", @@ -27,7 +27,7 @@ "email": "jmfee@usgs.gov" }, "date": { - "metadataLastUpdated": "2020-12-23" + "metadataLastUpdated": "2021-01-04" } } ] diff --git a/docs/userguide/configuration.html b/docs/userguide/configuration.html index cff04088..fee84505 100644 --- a/docs/userguide/configuration.html +++ b/docs/userguide/configuration.html @@ -604,11 +604,24 @@

Receiver Types

trackingFileName
File where current position in hub notifications is tracked.
+
trackingIndex
+
+ Custom tracking index. + (by default created is a sqlite database at trackingFileName) +
+
connectAttempts
(Optional, default 5) Number to attempts to connect before pausing
connectTimeout
(Optional, default 1000ms) timeout for each connect attempt.
+ +
initialCatchUpAge
+
+ (Optional, default 7.0 days) how far back in time to start when + connecting to server. subsequent attempts start at current position, + which is tracked. +
diff --git a/src/main/java/gov/usgs/earthquake/aws/AwsProductReceiver.java b/src/main/java/gov/usgs/earthquake/aws/AwsProductReceiver.java index f19158ce..106fa91c 100644 --- a/src/main/java/gov/usgs/earthquake/aws/AwsProductReceiver.java +++ b/src/main/java/gov/usgs/earthquake/aws/AwsProductReceiver.java @@ -40,10 +40,12 @@ public class AwsProductReceiver extends DefaultNotificationReceiver implements R public static final String TRACKING_FILE_NAME_PROPERTY = "trackingFileName"; public static final String CONNECT_ATTEMPTS_PROPERTY = "connectAttempts"; public static final String CONNECT_TIMEOUT_PROPERTY = "connectTimeout"; + public static final String INITIAL_CATCHUP_AGE_PROPERTY = "initialCatchUpAge"; public static final String DEFAULT_TRACKING_FILE_NAME = "data/AwsReceiver.json"; public static final String DEFAULT_CONNECT_ATTEMPTS = "5"; public static final String DEFAULT_CONNECT_TIMEOUT = "1000"; + public static final String DEFAULT_INITIAL_CATCHUP_AGE = "7.0"; private URI uri; private String trackingFileName; @@ -58,6 +60,10 @@ public class AwsProductReceiver extends DefaultNotificationReceiver implements R /* µs timestamp of last message that has been processed */ protected Instant createdAfter = null; + + /** How far back to check when first connecting. */ + protected double initialCatchUpAge = Double.valueOf(DEFAULT_INITIAL_CATCHUP_AGE); + /* last broadcast message that has been processed (used for catch up) */ protected JsonNotification lastBroadcast = null; protected Long lastBroadcastId = null; @@ -80,8 +86,12 @@ public void configure(Config config) throws Exception { super.configure(config); uri = new URI(config.getProperty(URI_PROPERTY)); - attempts = Integer.parseInt(config.getProperty(CONNECT_ATTEMPTS_PROPERTY, DEFAULT_CONNECT_ATTEMPTS)); - timeout = Long.parseLong(config.getProperty(CONNECT_TIMEOUT_PROPERTY, DEFAULT_CONNECT_TIMEOUT)); + attempts = Integer.parseInt( + config.getProperty(CONNECT_ATTEMPTS_PROPERTY, DEFAULT_CONNECT_ATTEMPTS)); + timeout = Long.parseLong( + config.getProperty(CONNECT_TIMEOUT_PROPERTY, DEFAULT_CONNECT_TIMEOUT)); + initialCatchUpAge = Double.valueOf( + config.getProperty(INITIAL_CATCHUP_AGE_PROPERTY, DEFAULT_INITIAL_CATCHUP_AGE)); final String trackingIndexName = config.getProperty(TRACKING_INDEX_PROPERTY); if (trackingIndexName != null) { @@ -338,7 +348,8 @@ public void run() { protected void sendProductsCreatedAfter() throws IOException { // set default for created after if (this.createdAfter == null) { - this.createdAfter = Instant.now().minusSeconds(7 * 86400); + this.createdAfter = Instant.now().minusSeconds( + Math.round(initialCatchUpAge * 86400)); } String request = Json.createObjectBuilder() .add("action", "products_created_after") @@ -393,6 +404,9 @@ protected void stopCatchUp() { * Stop background thread for catch up process. */ protected void stopCatchUpThread() { + if (catchUpThread == null) { + return; + } // stop catch up thread try { synchronized (catchUpSync) { diff --git a/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java b/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java index 6eef3bb0..2d6668c3 100644 --- a/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java +++ b/src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java @@ -345,7 +345,7 @@ public synchronized List findNotifications( beginTransaction(); try (final PreparedStatement statement = getConnection().prepareStatement(sql)) { try { - statement.setQueryTimeout(1000); + statement.setQueryTimeout(1800); // set parameters for (int i = 0, len=values.size(); i < len; i++) { @@ -429,7 +429,7 @@ public synchronized List findNotifications( beginTransaction(); try (final PreparedStatement statement = getConnection().prepareStatement(sql)) { try { - statement.setQueryTimeout(1000); + statement.setQueryTimeout(1800); // set parameters for (int i = 0, len=values.size(); i < len; i++) { @@ -465,7 +465,7 @@ public synchronized List findExpiredNotifications() throws Excepti beginTransaction(); try (final PreparedStatement statement = getConnection().prepareStatement(sql)) { try { - statement.setQueryTimeout(1000); + statement.setQueryTimeout(1800); // set parameters statement.setString(1, Instant.now().toString()); @@ -562,7 +562,7 @@ public synchronized List getMissingNotifications( beginTransaction(); try (final PreparedStatement statement = getConnection().prepareStatement(sql)) { try { - statement.setQueryTimeout(1000); + statement.setQueryTimeout(1800); // execute and commit if successful final List notifications = getNotifications(statement); commitTransaction(); diff --git a/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java b/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java index ade3f17a..a1df371f 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java +++ b/src/main/java/gov/usgs/earthquake/distribution/ProductClient.java @@ -64,7 +64,7 @@ public class ProductClient extends DefaultConfigurable implements ProductClientMBean, Bootstrappable { /** The "release" version number. */ - public static final String RELEASE_VERSION = "Version 2.7.4 2020-12-23"; + public static final String RELEASE_VERSION = "Version 2.7.5 2021-01-04"; /** Property name used on products for current RELEASE_VERSION. */ public static final String PDL_CLIENT_VERSION_PROPERTY = "pdl-client-version"; diff --git a/src/main/java/gov/usgs/earthquake/indexer/Indexer.java b/src/main/java/gov/usgs/earthquake/indexer/Indexer.java index 3d4d4736..3c4befb9 100644 --- a/src/main/java/gov/usgs/earthquake/indexer/Indexer.java +++ b/src/main/java/gov/usgs/earthquake/indexer/Indexer.java @@ -420,11 +420,28 @@ protected boolean hasProductBeenIndexed(final ProductId id) { if (readProductIndex == productIndex) { // synchronize on this if read and product index are same synchronized (indexProductSync) { - return readProductIndex.hasProduct(id); + readProductIndex.beginTransaction(); + try { + boolean hasProduct = readProductIndex.hasProduct(id); + readProductIndex.commitTransaction(); + return hasProduct; + } catch (Exception e) { + readProductIndex.rollbackTransaction(); + } } } else { - // otherwise readProductIndex provides own synchronization - return readProductIndex.hasProduct(id); + // otherwise synchronize on readProductIndex + // transaction reconnects if needed + synchronized (readProductIndex) { + readProductIndex.beginTransaction(); + try { + boolean hasProduct = readProductIndex.hasProduct(id); + readProductIndex.commitTransaction(); + return hasProduct; + } catch (Exception e) { + readProductIndex.rollbackTransaction(); + } + } } } catch (Exception wtf) { LOGGER.log(Level.WARNING, "[" + getName() diff --git a/src/main/java/gov/usgs/earthquake/product/io/JsonProduct.java b/src/main/java/gov/usgs/earthquake/product/io/JsonProduct.java index 782f4f3e..20d3676f 100644 --- a/src/main/java/gov/usgs/earthquake/product/io/JsonProduct.java +++ b/src/main/java/gov/usgs/earthquake/product/io/JsonProduct.java @@ -161,10 +161,26 @@ public JsonObjectBuilder getGeometryJson(final Product product) throws Exception final BigDecimal latitude = product.getLatitude(); final BigDecimal longitude = product.getLongitude(); final BigDecimal depth = product.getDepth(); - if (latitude != null || longitude != null) { + if (latitude != null || longitude != null || depth != null) { + final JsonArrayBuilder coordinates = Json.createArrayBuilder(); + if (latitude != null) { + coordinates.add(latitude); + } else { + coordinates.addNull(); + } + if (longitude != null) { + coordinates.add(longitude); + } else { + coordinates.addNull(); + } + if (depth != null) { + coordinates.add(depth); + } else { + coordinates.addNull(); + } return Json.createObjectBuilder() .add("type", "Point") - .add("coordinates", Json.createArrayBuilder().add(longitude).add(latitude).add(depth)); + .add("coordinates", coordinates); } return null; }