Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set Status.PRESENT after synchronous preload #950

Open
wants to merge 5 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions src/main/java/com/conveyal/r5/analyst/NetworkPreloader.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public NetworkPreloader(TransportNetworkCache transportNetworkCache) {
this.transportNetworkCache = transportNetworkCache;
}

public LoaderState<TransportNetwork> preloadData (AnalysisWorkerTask task) {
public LoaderState<TransportNetwork> preload (AnalysisWorkerTask task) {
if (task.scenario != null) {
transportNetworkCache.rememberScenario(task.scenario);
}
Expand All @@ -94,10 +94,11 @@ public LoaderState<TransportNetwork> preloadData (AnalysisWorkerTask task) {
* similar tasks will make interleaved calls to setProgress (with superficial map synchronization). Other than
* causing a value to briefly revert from PRESENT to BUILDING this doesn't seem deeply problematic.
* This is provided specifically for regional tasks, to ensure that they remain in preloading mode while all this
* data is prepared.
* data is prepared.
* Any exceptions that occur while building the network will escape this method, leaving the status as BUILDING.
*/
public TransportNetwork synchronousPreload (AnalysisWorkerTask task) {
return buildValue(Key.forTask(task));
public TransportNetwork preloadBlocking (AnalysisWorkerTask task) {
return getBlocking(Key.forTask(task));
}

@Override
Expand Down Expand Up @@ -140,7 +141,7 @@ protected TransportNetwork buildValue(Key key) {
linkedPointSet.getEgressCostTable(progressListener);
}
}
// Finished building all needed inputs for analysis, return the completed network to the AsyncLoader code.
abyrd marked this conversation as resolved.
Show resolved Hide resolved
// Finished building all needed inputs for analysis, return the completed network
return scenarioNetwork;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ public static void sleepSeconds (int seconds) {
protected byte[] handleAndSerializeOneSinglePointTask (TravelTimeSurfaceTask task) throws IOException {
LOG.debug("Handling single-point task {}", task.toString());
// Get all the data needed to run one analysis task, or at least begin preparing it.
final AsyncLoader.LoaderState<TransportNetwork> networkLoaderState = networkPreloader.preloadData(task);
final AsyncLoader.LoaderState<TransportNetwork> networkLoaderState = networkPreloader.preload(task);

// If loading is not complete, bail out of this function.
// Ideally we'd stall briefly using something like Future.get(timeout) in case loading finishes quickly.
Expand Down Expand Up @@ -462,7 +462,7 @@ protected void handleOneRegionalTask (RegionalTask task) throws Throwable {
// Note we're completely bypassing the async loader here and relying on the older nested LoadingCaches.
// If those are ever removed, the async loader will need a synchronous mode with per-path blocking (kind of
// reinventing the wheel of LoadingCache) or we'll need to make preparation for regional tasks async.
TransportNetwork transportNetwork = networkPreloader.synchronousPreload(task);
TransportNetwork transportNetwork = networkPreloader.preloadBlocking(task);

// If we are generating a static site, there must be a single metadata file for an entire batch of results.
// Arbitrarily we create this metadata as part of the first task in the job.
Expand Down
45 changes: 25 additions & 20 deletions src/main/java/com/conveyal/r5/util/AsyncLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* "value is present in map".
*
* Potential problem: if we try to return immediately saying whether the needed data are available,
* there are some cases where preparing the reqeusted object might take only a few hundred milliseconds or less.
* there are some cases where preparing the requested object might take only a few hundred milliseconds or less.
* In that case then we don't want the caller to have to re-poll. In this case a Future.get() with timeout is good.
*
* Created by abyrd on 2018-09-14
Expand Down Expand Up @@ -97,10 +97,23 @@ public String toString() {
}
}

/**
* This has been factored out of the executor runnables so subclasses can force a blocking (non-async) load.
* Any exceptions that occur while building the value will escape this method, leaving the status as BUILDING.
*/
protected V getBlocking (K key) {
V value = buildValue(key);
synchronized (map) {
map.put(key, new LoaderState(Status.PRESENT, "Loaded", 100, value));
}
return value;
}

/**
* Attempt to fetch the value for the supplied key.
* If the value is not yet present, and not yet being computed / fetched, enqueue a task to do so.
* Return a response that reports status, and may or may not contain the value.
* Any exception that occurs while building the value is caught and associated with the key with a status of ERROR.
*/
public LoaderState<V> get (K key) {
LoaderState<V> state = null;
Expand All @@ -109,7 +122,7 @@ public LoaderState<V> get (K key) {
state = map.get(key);
if (state == null) {
// Only enqueue a task to load the value for this key if another call hasn't already done it.
state = new LoaderState<V>(Status.WAITING, "Enqueued task...", 0, null);
state = new LoaderState<>(Status.WAITING, "Enqueued task...", 0, null);
map.put(key, state);
enqueueLoadTask = true;
}
Expand All @@ -120,16 +133,16 @@ public LoaderState<V> get (K key) {
// Enqueue task outside the above block (synchronizing the fewest lines possible).
if (enqueueLoadTask) {
executor.execute(() -> {
setProgress(key, 0, "Starting...");
try {
V value = buildValue(key);
synchronized (map) {
map.put(key, new LoaderState(Status.PRESENT, null, 100, value));
}
setProgress(key, 0, "Starting...");
getBlocking(key);
} catch (Throwable t) {
// It's essential to trap Throwable rather than just Exception. Otherwise the executor
// threads can be killed by any Error that happens, stalling the executor.
setError(key, t);
// threads can be killed by any Error that happens, stalling the executor. The below permanently
// associates an error with the key. No further attempt will ever be made to create the value.
synchronized (map) {
map.put(key, new LoaderState(t));
}
LOG.error("Async load failed: " + ExceptionUtils.stackTraceString(t));
}
});
Expand All @@ -139,12 +152,13 @@ public LoaderState<V> get (K key) {

/**
* Override this method in concrete subclasses to specify the logic to build/calculate/fetch a value.
* Implementations may call setProgress to report progress on long operations.
* Implementations may call setProgress to report progress on long operations; if they do so, any callers of this
* method are responsible for also calling setComplete() to ensure loaded objects are marked as PRESENT.
* Throw an exception to indicate an error has occurred and the building process cannot complete.
* It's not entirely clear this should return a value - might be better to call setValue within the overridden
* method, just as we call setProgress or setError.
*/
protected abstract V buildValue(K key) throws Exception;
protected abstract V buildValue(K key);

/**
* Call this method inside the buildValue method to indicate progress.
Expand All @@ -155,13 +169,4 @@ public void setProgress(K key, int percentComplete, String message) {
}
}

/**
* Call this method inside the buildValue method to indicate that an unrecoverable error has happened.
* FIXME this will permanently associate an error with the key. No further attempt will ever be made to create the value.
*/
protected void setError (K key, Throwable throwable) {
synchronized (map) {
map.put(key, new LoaderState(throwable));
}
}
}
Loading