Skip to content

Commit

Permalink
Merge branch '2.7.5-rc' into 'master'
Browse files Browse the repository at this point in the history
2.7.5 rc

See merge request ghsc/hazdev/pdl!135
  • Loading branch information
jmfee-usgs committed Jan 4, 2021
2 parents 713c5e5 + 366d8c1 commit 933be54
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 15 deletions.
4 changes: 2 additions & 2 deletions code.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"name": "Product Distribution Layer",
"organization": "U.S. Geological Survey",
"description": "Distribution system used for derived earthquake information",
"version": "v2.7.4",
"version": "v2.7.5",
"status": "Production",
"permissions": {
"usageType": "openSource",
Expand All @@ -27,7 +27,7 @@
"email": "jmfee@usgs.gov"
},
"date": {
"metadataLastUpdated": "2020-12-23"
"metadataLastUpdated": "2021-01-04"
}
}
]
13 changes: 13 additions & 0 deletions docs/userguide/configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -604,11 +604,24 @@ <h4>Receiver Types</h4>
<dt>trackingFileName</dt>
<dd>File where current position in hub notifications is tracked.</dd>

<dt>trackingIndex</dt>
<dd>
Custom tracking index.
(by default created is a sqlite database at trackingFileName)
</dd>

<dt>connectAttempts</dt>
<dd>(Optional, default 5) Number to attempts to connect before pausing</dd>

<dt>connectTimeout</dt>
<dd>(Optional, default 1000ms) timeout for each connect attempt.</dd>

<dt>initialCatchUpAge</dt>
<dd>
(Optional, default 7.0 days) how far back in time to start when
connecting to server. subsequent attempts start at current position,
which is tracked.
</dd>
</dl>
</dd>

Expand Down
20 changes: 17 additions & 3 deletions src/main/java/gov/usgs/earthquake/aws/AwsProductReceiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ public class AwsProductReceiver extends DefaultNotificationReceiver implements R
public static final String TRACKING_FILE_NAME_PROPERTY = "trackingFileName";
public static final String CONNECT_ATTEMPTS_PROPERTY = "connectAttempts";
public static final String CONNECT_TIMEOUT_PROPERTY = "connectTimeout";
public static final String INITIAL_CATCHUP_AGE_PROPERTY = "initialCatchUpAge";

public static final String DEFAULT_TRACKING_FILE_NAME = "data/AwsReceiver.json";
public static final String DEFAULT_CONNECT_ATTEMPTS = "5";
public static final String DEFAULT_CONNECT_TIMEOUT = "1000";
public static final String DEFAULT_INITIAL_CATCHUP_AGE = "7.0";

private URI uri;
private String trackingFileName;
Expand All @@ -58,6 +60,10 @@ public class AwsProductReceiver extends DefaultNotificationReceiver implements R

/* µs timestamp of last message that has been processed */
protected Instant createdAfter = null;

/** How far back to check when first connecting. */
protected double initialCatchUpAge = Double.valueOf(DEFAULT_INITIAL_CATCHUP_AGE);

/* last broadcast message that has been processed (used for catch up) */
protected JsonNotification lastBroadcast = null;
protected Long lastBroadcastId = null;
Expand All @@ -80,8 +86,12 @@ public void configure(Config config) throws Exception {
super.configure(config);

uri = new URI(config.getProperty(URI_PROPERTY));
attempts = Integer.parseInt(config.getProperty(CONNECT_ATTEMPTS_PROPERTY, DEFAULT_CONNECT_ATTEMPTS));
timeout = Long.parseLong(config.getProperty(CONNECT_TIMEOUT_PROPERTY, DEFAULT_CONNECT_TIMEOUT));
attempts = Integer.parseInt(
config.getProperty(CONNECT_ATTEMPTS_PROPERTY, DEFAULT_CONNECT_ATTEMPTS));
timeout = Long.parseLong(
config.getProperty(CONNECT_TIMEOUT_PROPERTY, DEFAULT_CONNECT_TIMEOUT));
initialCatchUpAge = Double.valueOf(
config.getProperty(INITIAL_CATCHUP_AGE_PROPERTY, DEFAULT_INITIAL_CATCHUP_AGE));

final String trackingIndexName = config.getProperty(TRACKING_INDEX_PROPERTY);
if (trackingIndexName != null) {
Expand Down Expand Up @@ -338,7 +348,8 @@ public void run() {
protected void sendProductsCreatedAfter() throws IOException {
// set default for created after
if (this.createdAfter == null) {
this.createdAfter = Instant.now().minusSeconds(7 * 86400);
this.createdAfter = Instant.now().minusSeconds(
Math.round(initialCatchUpAge * 86400));
}
String request = Json.createObjectBuilder()
.add("action", "products_created_after")
Expand Down Expand Up @@ -393,6 +404,9 @@ protected void stopCatchUp() {
* Stop background thread for catch up process.
*/
protected void stopCatchUpThread() {
if (catchUpThread == null) {
return;
}
// stop catch up thread
try {
synchronized (catchUpSync) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ public synchronized List<Notification> findNotifications(
beginTransaction();
try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
try {
statement.setQueryTimeout(1000);
statement.setQueryTimeout(1800);

// set parameters
for (int i = 0, len=values.size(); i < len; i++) {
Expand Down Expand Up @@ -429,7 +429,7 @@ public synchronized List<Notification> findNotifications(
beginTransaction();
try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
try {
statement.setQueryTimeout(1000);
statement.setQueryTimeout(1800);

// set parameters
for (int i = 0, len=values.size(); i < len; i++) {
Expand Down Expand Up @@ -465,7 +465,7 @@ public synchronized List<Notification> findExpiredNotifications() throws Excepti
beginTransaction();
try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
try {
statement.setQueryTimeout(1000);
statement.setQueryTimeout(1800);

// set parameters
statement.setString(1, Instant.now().toString());
Expand Down Expand Up @@ -562,7 +562,7 @@ public synchronized List<Notification> getMissingNotifications(
beginTransaction();
try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
try {
statement.setQueryTimeout(1000);
statement.setQueryTimeout(1800);
// execute and commit if successful
final List<Notification> notifications = getNotifications(statement);
commitTransaction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class ProductClient extends DefaultConfigurable implements
ProductClientMBean, Bootstrappable {

/** The "release" version number. */
public static final String RELEASE_VERSION = "Version 2.7.4 2020-12-23";
public static final String RELEASE_VERSION = "Version 2.7.5 2021-01-04";

/** Property name used on products for current RELEASE_VERSION. */
public static final String PDL_CLIENT_VERSION_PROPERTY = "pdl-client-version";
Expand Down
23 changes: 20 additions & 3 deletions src/main/java/gov/usgs/earthquake/indexer/Indexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -420,11 +420,28 @@ protected boolean hasProductBeenIndexed(final ProductId id) {
if (readProductIndex == productIndex) {
// synchronize on this if read and product index are same
synchronized (indexProductSync) {
return readProductIndex.hasProduct(id);
readProductIndex.beginTransaction();
try {
boolean hasProduct = readProductIndex.hasProduct(id);
readProductIndex.commitTransaction();
return hasProduct;
} catch (Exception e) {
readProductIndex.rollbackTransaction();
}
}
} else {
// otherwise readProductIndex provides own synchronization
return readProductIndex.hasProduct(id);
// otherwise synchronize on readProductIndex
// transaction reconnects if needed
synchronized (readProductIndex) {
readProductIndex.beginTransaction();
try {
boolean hasProduct = readProductIndex.hasProduct(id);
readProductIndex.commitTransaction();
return hasProduct;
} catch (Exception e) {
readProductIndex.rollbackTransaction();
}
}
}
} catch (Exception wtf) {
LOGGER.log(Level.WARNING, "[" + getName()
Expand Down
20 changes: 18 additions & 2 deletions src/main/java/gov/usgs/earthquake/product/io/JsonProduct.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,26 @@ public JsonObjectBuilder getGeometryJson(final Product product) throws Exception
final BigDecimal latitude = product.getLatitude();
final BigDecimal longitude = product.getLongitude();
final BigDecimal depth = product.getDepth();
if (latitude != null || longitude != null) {
if (latitude != null || longitude != null || depth != null) {
final JsonArrayBuilder coordinates = Json.createArrayBuilder();
if (latitude != null) {
coordinates.add(latitude);
} else {
coordinates.addNull();
}
if (longitude != null) {
coordinates.add(longitude);
} else {
coordinates.addNull();
}
if (depth != null) {
coordinates.add(depth);
} else {
coordinates.addNull();
}
return Json.createObjectBuilder()
.add("type", "Point")
.add("coordinates", Json.createArrayBuilder().add(longitude).add(latitude).add(depth));
.add("coordinates", coordinates);
}
return null;
}
Expand Down

0 comments on commit 933be54

Please sign in to comment.