diff --git a/src/main/java/com/conveyal/analysis/components/broker/Broker.java b/src/main/java/com/conveyal/analysis/components/broker/Broker.java index 7e613de43..8ad66b0a5 100644 --- a/src/main/java/com/conveyal/analysis/components/broker/Broker.java +++ b/src/main/java/com/conveyal/analysis/components/broker/Broker.java @@ -6,17 +6,12 @@ import com.conveyal.analysis.components.eventbus.EventBus; import com.conveyal.analysis.components.eventbus.RegionalAnalysisEvent; import com.conveyal.analysis.components.eventbus.WorkerEvent; -import com.conveyal.analysis.models.RegionalAnalysis; import com.conveyal.analysis.results.MultiOriginAssembler; -import com.conveyal.analysis.util.JsonUtil; import com.conveyal.file.FileStorage; -import com.conveyal.file.FileStorageKey; -import com.conveyal.file.FileUtils; import com.conveyal.r5.analyst.WorkerCategory; import com.conveyal.r5.analyst.cluster.RegionalTask; import com.conveyal.r5.analyst.cluster.RegionalWorkResult; import com.conveyal.r5.analyst.cluster.WorkerStatus; -import com.conveyal.r5.analyst.scenario.Scenario; import com.conveyal.r5.util.ExceptionUtils; import com.google.common.collect.ListMultimap; import com.google.common.collect.MultimapBuilder; @@ -27,8 +22,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -42,7 +35,6 @@ import static com.conveyal.analysis.components.eventbus.WorkerEvent.Action.REQUESTED; import static com.conveyal.analysis.components.eventbus.WorkerEvent.Role.REGIONAL; import static com.conveyal.analysis.components.eventbus.WorkerEvent.Role.SINGLE_POINT; -import static com.conveyal.file.FileCategory.BUNDLES; import static com.google.common.base.Preconditions.checkNotNull; /** @@ -98,6 +90,7 @@ public interface Config { private final Config config; // Component Dependencies + // Result assemblers return files that need to be permanently stored. private final FileStorage fileStorage; private final EventBus eventBus; private final WorkerLauncher workerLauncher; @@ -151,7 +144,7 @@ public interface Config { public TObjectLongMap recentlyRequestedWorkers = TCollections.synchronizedMap(new TObjectLongHashMap<>()); - public Broker (Config config, FileStorage fileStorage, EventBus eventBus, WorkerLauncher workerLauncher) { + public Broker(Config config, FileStorage fileStorage, EventBus eventBus, WorkerLauncher workerLauncher) { this.config = config; this.fileStorage = fileStorage; this.eventBus = eventBus; @@ -162,27 +155,14 @@ public Broker (Config config, FileStorage fileStorage, EventBus eventBus, Worker * Enqueue a set of tasks for a regional analysis. * Only a single task is passed in, which the broker will expand into all the individual tasks for a regional job. */ - public synchronized void enqueueTasksForRegionalJob (RegionalAnalysis regionalAnalysis) { - - // Make a copy of the regional task inside the RegionalAnalysis, replacing the scenario with a scenario ID. - RegionalTask templateTask = templateTaskFromRegionalAnalysis(regionalAnalysis); - - LOG.info("Enqueuing tasks for job {} using template task.", templateTask.jobId); - if (findJob(templateTask.jobId) != null) { - LOG.error("Someone tried to enqueue job {} but it already exists.", templateTask.jobId); - throw new RuntimeException("Enqueued duplicate job " + templateTask.jobId); + public synchronized void enqueueTasksForRegionalJob (Job job, MultiOriginAssembler assembler) { + LOG.info("Enqueuing tasks for job {} using template task.", job.jobId); + if (findJob(job.jobId) != null) { + LOG.error("Someone tried to enqueue job {} but it already exists.", job.jobId); + throw new RuntimeException("Enqueued duplicate job " + job.jobId); } - // Create the Job object to share with the MultiOriginAssembler, but defer adding this job to the Multimap of - // active jobs until we're sure the result assembler was constructed without any errors. Always add and remove - // the Job and corresponding MultiOriginAssembler as a unit in the same synchronized block of code (see #887). - WorkerTags workerTags = WorkerTags.fromRegionalAnalysis(regionalAnalysis); - Job job = new Job(templateTask, workerTags); - // Register the regional job so results received from multiple workers can be assembled into one file. - // If any parameters fail checks here, an exception may cause this method to exit early. - // TODO encapsulate MultiOriginAssemblers in a new Component - MultiOriginAssembler assembler = new MultiOriginAssembler(regionalAnalysis, job, fileStorage); - resultAssemblers.put(templateTask.jobId, assembler); + resultAssemblers.put(job.jobId, assembler); // A MultiOriginAssembler was successfully put in place. It's now safe to register and start the Job. jobs.put(job.workerCategory, job); @@ -193,56 +173,12 @@ public synchronized void enqueueTasksForRegionalJob (RegionalAnalysis regionalAn } if (workerCatalog.noWorkersAvailable(job.workerCategory, config.offline())) { - createOnDemandWorkerInCategory(job.workerCategory, workerTags); + createOnDemandWorkerInCategory(job.workerCategory, job.workerTags); } else { // Workers exist in this category, clear out any record that we're waiting for one to start up. recentlyRequestedWorkers.remove(job.workerCategory); } - eventBus.send(new RegionalAnalysisEvent(templateTask.jobId, STARTED).forUser(workerTags.user, workerTags.group)); - } - - /** - * The single RegionalTask object represents a lot of individual accessibility tasks at many different origin - * points, typically on a grid. Before passing that RegionalTask on to the Broker (which distributes tasks to - * workers and tracks progress), we remove the details of the scenario, substituting the scenario's unique ID - * to save time and bandwidth. This avoids repeatedly sending the scenario details to the worker in every task, - * as they are often quite voluminous. The workers will fetch the scenario once from S3 and cache it based on - * its ID only. We protectively clone this task because we're going to null out its scenario field, and don't - * want to affect the original object which contains all the scenario details. - * TODO Why is all this detail added after the Persistence call? - * We don't want to store all the details added below in Mongo? - */ - private RegionalTask templateTaskFromRegionalAnalysis (RegionalAnalysis regionalAnalysis) { - RegionalTask templateTask = regionalAnalysis.request.clone(); - // First replace the inline scenario with a scenario ID, storing the scenario for retrieval by workers. - Scenario scenario = templateTask.scenario; - templateTask.scenarioId = scenario.id; - // Null out the scenario in the template task, avoiding repeated serialization to the workers as massive JSON. - templateTask.scenario = null; - String fileName = String.format("%s_%s.json", regionalAnalysis.bundleId, scenario.id); - FileStorageKey fileStorageKey = new FileStorageKey(BUNDLES, fileName); - try { - File localScenario = FileUtils.createScratchFile("json"); - JsonUtil.objectMapper.writeValue(localScenario, scenario); - // FIXME this is using a network service in a method called from a synchronized broker method. - // Move file into storage before entering the synchronized block. - fileStorage.moveIntoStorage(fileStorageKey, localScenario); - } catch (IOException e) { - LOG.error("Error storing scenario for retrieval by workers.", e); - } - // Fill in all the fields in the template task that will remain the same across all tasks in a job. - // I am not sure why we are re-setting all these fields, it seems like they are already set when the task is - // initialized by AnalysisRequest.populateTask. But we'd want to thoroughly check that assumption before - // eliminating or moving these lines. - templateTask.jobId = regionalAnalysis._id; - templateTask.graphId = regionalAnalysis.bundleId; - templateTask.workerVersion = regionalAnalysis.workerVersion; - templateTask.height = regionalAnalysis.height; - templateTask.width = regionalAnalysis.width; - templateTask.north = regionalAnalysis.north; - templateTask.west = regionalAnalysis.west; - templateTask.zoom = regionalAnalysis.zoom; - return templateTask; + eventBus.send(new RegionalAnalysisEvent(job.jobId, STARTED).forUser(job.workerTags.user, job.workerTags.group)); } /** @@ -409,9 +345,7 @@ public synchronized Collection getAllJobStatuses () { TObjectIntMap workersPerJob = workerCatalog.activeWorkersPerJob(); Collection jobStatuses = new ArrayList<>(); for (Job job : jobs.values()) { - JobStatus jobStatus = new JobStatus(job); - jobStatus.activeWorkers = workersPerJob.get(job.jobId); - jobStatuses.add(jobStatus); + jobStatuses.add(new JobStatus(job, workersPerJob.get(job.jobId))); } return jobStatuses; } @@ -515,11 +449,20 @@ public void handleRegionalWorkResult(RegionalWorkResult workResult) { // results from spurious redeliveries, before the assembler is busy finalizing and uploading results. markTaskCompleted(job, workResult.taskId); } + // Unlike everything above, result assembly (like starting workers below) does not synchronize on the broker. // It contains some slow nested operations to move completed results into storage. Really we should not do // these things synchronously in an HTTP handler called by the worker. We should probably synchronize this // entire method, then somehow enqueue slower async completion and cleanup tasks in the caller. assembler.handleMessage(workResult); + + if (assembler.isComplete()) { + var resultFiles = assembler.finish(); + // Store all result files permanently. + for (var resultFile : resultFiles) { + fileStorage.moveIntoStorage(resultFile.key(), resultFile.file()); + } + } } catch (Throwable t) { recordJobError(job, ExceptionUtils.stackTraceString(t)); eventBus.send(new ErrorEvent(t)); diff --git a/src/main/java/com/conveyal/analysis/components/broker/Job.java b/src/main/java/com/conveyal/analysis/components/broker/Job.java index 28829c33c..a37227093 100644 --- a/src/main/java/com/conveyal/analysis/components/broker/Job.java +++ b/src/main/java/com/conveyal/analysis/components/broker/Job.java @@ -13,7 +13,6 @@ import java.util.Set; import static com.conveyal.r5.common.Util.notNullOrEmpty; -import static com.google.common.base.Preconditions.checkNotNull; /** * A Job is a collection of tasks that represent all the origins in a regional analysis. All the @@ -61,13 +60,13 @@ public class Job { * The number of remaining tasks can be derived from the deliveredTasks BitSet, but as an * optimization we keep a separate counter to avoid constantly scanning over that whole bitset. */ - protected int nTasksCompleted; + protected int nTasksCompleted = 0; /** * The total number of task deliveries that have occurred. A task may be counted more than * once if it is redelivered. */ - protected int nTasksDelivered; + protected int nTasksDelivered = 0; /** Every task in this job will be based on this template task, but have its origin coordinates changed. */ public final RegionalTask templateTask; @@ -131,23 +130,31 @@ private RegionalTask makeOneTask (int taskNumber) { */ public final Set errors = new HashSet(); - public Job (RegionalTask templateTask, WorkerTags workerTags) { - this.jobId = templateTask.jobId; - this.templateTask = templateTask; - this.workerCategory = new WorkerCategory(templateTask.graphId, templateTask.workerVersion); - this.nTasksCompleted = 0; - this.nextTaskToDeliver = 0; - - if (templateTask.originPointSetKey != null) { - checkNotNull(templateTask.originPointSet); - this.nTasksTotal = templateTask.originPointSet.featureCount(); - } else { - this.nTasksTotal = templateTask.width * templateTask.height; - } - - this.completedTasks = new BitSet(nTasksTotal); + public Job (RegionalTask task, WorkerTags workerTags) { + templateTask = templateTaskFromRegionalTask(task); + jobId = task.jobId; + workerCategory = new WorkerCategory(task.graphId, task.workerVersion); + nTasksTotal = task.getTasksTotal(); + completedTasks = new BitSet(nTasksTotal); this.workerTags = workerTags; + } + /** + * The single RegionalTask object represents a lot of individual accessibility tasks at many different origin + * points, typically on a grid. Before passing that RegionalTask on to the Broker (which distributes tasks to + * workers and tracks progress), we remove the details of the scenario, substituting the scenario's unique ID + * to save time and bandwidth. This avoids repeatedly sending the scenario details to the worker in every task, + * as they are often quite voluminous. The workers will fetch the scenario once from S3 and cache it based on + * its ID only. We protectively clone this task because we're going to null out its scenario field, and don't + * want to affect the original object which contains all the scenario details. + */ + private static RegionalTask templateTaskFromRegionalTask(RegionalTask task) { + RegionalTask templateTask = task.clone(); + // First replace the inline scenario with a scenario ID, storing the scenario for retrieval by workers. + templateTask.scenarioId = templateTask.scenario.id; + // Null out the scenario in the template task, avoiding repeated serialization to the workers as massive JSON. + templateTask.scenario = null; + return templateTask; } public boolean markTaskCompleted(int taskId) { diff --git a/src/main/java/com/conveyal/analysis/components/broker/JobStatus.java b/src/main/java/com/conveyal/analysis/components/broker/JobStatus.java index f94a2a6a3..9915f6efd 100644 --- a/src/main/java/com/conveyal/analysis/components/broker/JobStatus.java +++ b/src/main/java/com/conveyal/analysis/components/broker/JobStatus.java @@ -46,7 +46,7 @@ public class JobStatus { public JobStatus () { /* do nothing */ } /** Summarize the given job to return its status over the REST API. */ - public JobStatus (Job job) { + public JobStatus(Job job, int activeWorkers) { this.jobId = job.jobId; this.graphId = job.workerCategory.graphId; this.workerCommit = job.workerCategory.workerVersion; @@ -56,5 +56,6 @@ public JobStatus (Job job) { this.deliveries = job.nTasksDelivered; this.deliveryPass = job.deliveryPass; this.errors = job.errors; + this.activeWorkers = activeWorkers; } } diff --git a/src/main/java/com/conveyal/analysis/components/broker/RedeliveryTest.java b/src/main/java/com/conveyal/analysis/components/broker/RedeliveryTest.java index 3e6b53326..ddbd4760a 100644 --- a/src/main/java/com/conveyal/analysis/components/broker/RedeliveryTest.java +++ b/src/main/java/com/conveyal/analysis/components/broker/RedeliveryTest.java @@ -3,11 +3,13 @@ import com.conveyal.analysis.components.BackendComponents; import com.conveyal.analysis.components.LocalBackendComponents; import com.conveyal.analysis.models.RegionalAnalysis; +import com.conveyal.analysis.results.MultiOriginAssembler; import com.conveyal.r5.analyst.cluster.RegionalTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.UUID; /** @@ -66,7 +68,9 @@ private static void sendFakeJob(Broker broker) { templateTask.scenarioId = "FAKE"; RegionalAnalysis regionalAnalysis = new RegionalAnalysis(); regionalAnalysis.request = templateTask; - broker.enqueueTasksForRegionalJob(regionalAnalysis); + var job = new Job(templateTask, WorkerTags.fromRegionalAnalysis(regionalAnalysis)); + var assembler = new MultiOriginAssembler(job, new ArrayList<>()); + broker.enqueueTasksForRegionalJob(job, assembler); } public static String compactUUID() { diff --git a/src/main/java/com/conveyal/analysis/controllers/BrokerController.java b/src/main/java/com/conveyal/analysis/controllers/BrokerController.java index 9deb1a882..384be559d 100644 --- a/src/main/java/com/conveyal/analysis/controllers/BrokerController.java +++ b/src/main/java/com/conveyal/analysis/controllers/BrokerController.java @@ -14,7 +14,6 @@ import com.conveyal.analysis.persistence.Persistence; import com.conveyal.analysis.util.HttpStatus; import com.conveyal.analysis.util.JsonUtil; -import com.conveyal.gtfs.util.Util; import com.conveyal.r5.analyst.WorkerCategory; import com.conveyal.r5.analyst.cluster.AnalysisWorker; import com.conveyal.r5.analyst.cluster.RegionalTask; @@ -27,7 +26,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteStreams; import com.mongodb.QueryBuilder; -import org.apache.commons.math3.analysis.function.Exp; import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; @@ -46,7 +44,6 @@ import java.net.NoRouteToHostException; import java.net.SocketTimeoutException; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -54,7 +51,6 @@ import java.util.Map; import java.util.UUID; -import static com.conveyal.r5.common.Util.human; import static com.conveyal.r5.common.Util.notNullOrEmpty; import static com.google.common.base.Preconditions.checkNotNull; @@ -351,12 +347,10 @@ private String getAllWorkers(Request request, Response response) { * modifies the contents of the task queue. */ private Object workerPoll (Request request, Response response) { - WorkerStatus workerStatus = objectFromRequestBody(request, WorkerStatus.class); - List perOriginResults = workerStatus.results; // Record any regional analysis results that were supplied by the worker and mark them completed. - for (RegionalWorkResult workResult : perOriginResults) { + for (RegionalWorkResult workResult : workerStatus.results) { broker.handleRegionalWorkResult(workResult); } // Clear out the results field so it's not visible in the worker list API endpoint. @@ -368,7 +362,7 @@ private Object workerPoll (Request request, Response response) { // See if any appropriate tasks exist for this worker. List tasks = broker.getSomeWork(workerCategory, workerStatus.maxTasksRequested); // If there is no work for the worker, signal this clearly with a "no content" code, - // so the worker can sleep a while before the next polling attempt. + // so the worker can sleep for a while before the next polling attempt. if (tasks.isEmpty()) { return jsonResponse(response, HttpStatus.NO_CONTENT_204, tasks); } else { diff --git a/src/main/java/com/conveyal/analysis/controllers/RegionalAnalysisController.java b/src/main/java/com/conveyal/analysis/controllers/RegionalAnalysisController.java index a60eccd2b..24285df67 100644 --- a/src/main/java/com/conveyal/analysis/controllers/RegionalAnalysisController.java +++ b/src/main/java/com/conveyal/analysis/controllers/RegionalAnalysisController.java @@ -4,13 +4,22 @@ import com.conveyal.analysis.SelectingGridReducer; import com.conveyal.analysis.UserPermissions; import com.conveyal.analysis.components.broker.Broker; +import com.conveyal.analysis.components.broker.Job; import com.conveyal.analysis.components.broker.JobStatus; +import com.conveyal.analysis.components.broker.WorkerTags; import com.conveyal.analysis.models.AnalysisRequest; import com.conveyal.analysis.models.Model; import com.conveyal.analysis.models.OpportunityDataset; import com.conveyal.analysis.models.RegionalAnalysis; import com.conveyal.analysis.persistence.Persistence; +import com.conveyal.analysis.results.AccessCsvResultWriter; import com.conveyal.analysis.results.CsvResultType; +import com.conveyal.analysis.results.GridResultWriter; +import com.conveyal.analysis.results.MultiOriginAssembler; +import com.conveyal.analysis.results.PathCsvResultWriter; +import com.conveyal.analysis.results.RegionalResultWriter; +import com.conveyal.analysis.results.TemporalDensityCsvResultWriter; +import com.conveyal.analysis.results.TimeCsvResultWriter; import com.conveyal.analysis.util.JsonUtil; import com.conveyal.file.FileStorage; import com.conveyal.file.FileStorageFormat; @@ -54,6 +63,7 @@ import static com.conveyal.file.FileCategory.BUNDLES; import static com.conveyal.file.FileCategory.RESULTS; import static com.conveyal.file.UrlWithHumanName.filenameCleanString; +import static com.conveyal.r5.common.Util.notNullOrEmpty; import static com.conveyal.r5.transit.TransportNetworkCache.getScenarioFilename; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; @@ -553,6 +563,7 @@ private RegionalAnalysis createRegionalAnalysis (Request req, Response res) thro task.destinationPointSets = new PointSet[] { PointSetCache.readFreeFormFromFileStore(task.destinationPointSetKeys[0]) }; + task.checkIsUnderOdPairLimit(task.destinationPointSets[0]); } if (task.recordTimes) { checkArgument( @@ -569,7 +580,7 @@ private RegionalAnalysis createRegionalAnalysis (Request req, Response res) thro // In fact, there are three separate classes all containing almost the same info: // AnalysisRequest (from UI to backend), RegionalTask (template sent to worker), RegionalAnalysis (in Mongo). // And for regional analyses, two instances of the worker task: the one with the scenario, and the templateTask. - RegionalAnalysis regionalAnalysis = new RegionalAnalysis(); + final RegionalAnalysis regionalAnalysis = new RegionalAnalysis(); regionalAnalysis.request = task; regionalAnalysis.height = task.height; regionalAnalysis.north = task.north; @@ -618,19 +629,96 @@ private RegionalAnalysis createRegionalAnalysis (Request req, Response res) thro // Persist this newly created RegionalAnalysis to Mongo. // This assigns it creation/update time stamps and an ID, which is needed to name any output CSV files. - regionalAnalysis = Persistence.regionalAnalyses.create(regionalAnalysis); + Persistence.regionalAnalyses.create(regionalAnalysis); + + // Set the `jobId` from the newly created regional analysis. + task.jobId = regionalAnalysis._id; + + // Create the regional job + var regionalJob = new Job(task, WorkerTags.fromRegionalAnalysis(regionalAnalysis)); + + // Create the result writers + var writers = createResultWriters(regionalAnalysis, task); + + // Create the multi-origin assembler with the writers. + var assembler = new MultiOriginAssembler(regionalJob, writers); + + // Stored scenario is needed by workers. Must be done ahead of enqueueing the job. + storeRegionalAnalysisScenarioJson(task); // Register the regional job with the broker, which will distribute individual tasks to workers and track progress. - broker.enqueueTasksForRegionalJob(regionalAnalysis); + broker.enqueueTasksForRegionalJob(regionalJob, assembler); // Flush to the database any information added to the RegionalAnalysis object when it was enqueued. - // This includes the paths of any CSV files that will be produced by this analysis. - // TODO verify whether there is a reason to use regionalAnalyses.modifyWithoutUpdatingLock() or put(). + // This includes the paths of any CSV files that will be produced by this analysis. The regional analysis was + // created in this method and therefore we can bypass the nonce / permission checking. Persistence.regionalAnalyses.modifiyWithoutUpdatingLock(regionalAnalysis); return regionalAnalysis; } + /** + * Create results writers for this regional analysis and a task. Stores the result paths that are created by the + * writers. + */ + private static List createResultWriters(RegionalAnalysis analysis, RegionalTask task) { + // Create the result writers. Store their result file paths in the database. + var resultWriters = new ArrayList(); + if (!task.makeTauiSite) { + if (task.recordAccessibility) { + if (task.originPointSet != null) { + // Freeform origins - create CSV regional analysis results + var accessWriter = new AccessCsvResultWriter(task); + resultWriters.add(accessWriter); + analysis.resultStorage.put(accessWriter.resultType(), accessWriter.getFileName()); + } else { + // Gridded origins - create gridded regional analysis results + resultWriters.addAll(GridResultWriter.createWritersFromTask(analysis.destinationPointSetIds, task)); + } + } + + if (task.recordTimes) { + var timesWriter = new TimeCsvResultWriter(task); + resultWriters.add(timesWriter); + analysis.resultStorage.put(timesWriter.resultType(), timesWriter.getFileName()); + } + + if (task.includePathResults) { + var pathsWriter = new PathCsvResultWriter(task); + resultWriters.add(pathsWriter); + analysis.resultStorage.put(pathsWriter.resultType(), pathsWriter.getFileName()); + } + + if (task.includeTemporalDensity) { + if (task.originPointSet == null) { + // Gridded origins. The full temporal density information is probably too voluminous to be useful. + // We might want to record a grid of dual accessibility values, but this will require some serious + // refactoring of the GridResultWriter. + // if (job.templateTask.dualAccessibilityThreshold > 0) { ... } + throw new RuntimeException("Temporal density of opportunities cannot be recorded for gridded origin points."); + } else { + var tDensityWriter = new TemporalDensityCsvResultWriter(task); + resultWriters.add(tDensityWriter); + analysis.resultStorage.put(tDensityWriter.resultType(), tDensityWriter.getFileName()); + } + } + + checkArgument(notNullOrEmpty(resultWriters), "A regional analysis should always create at least one grid or CSV file."); + } + return resultWriters; + } + + /** + * Store the regional analysis scenario as JSON for retrieval by the workers. + */ + private void storeRegionalAnalysisScenarioJson(RegionalTask task) throws IOException { + String fileName = getScenarioFilename(task.graphId, task.scenario.id); + FileStorageKey fileStorageKey = new FileStorageKey(BUNDLES, fileName); + File localScenario = FileUtils.createScratchFile("json"); + JsonUtil.objectMapper.writeValue(localScenario, task.scenario); + fileStorage.moveIntoStorage(fileStorageKey, localScenario); + } + private RegionalAnalysis updateRegionalAnalysis (Request request, Response response) throws IOException { RegionalAnalysis regionalAnalysis = JsonUtil.objectMapper.readValue(request.body(), RegionalAnalysis.class); return Persistence.regionalAnalyses.updateByUserIfPermitted(regionalAnalysis, UserPermissions.from(request)); diff --git a/src/main/java/com/conveyal/analysis/models/RegionalAnalysis.java b/src/main/java/com/conveyal/analysis/models/RegionalAnalysis.java index 673913a7b..57fb4c06b 100644 --- a/src/main/java/com/conveyal/analysis/models/RegionalAnalysis.java +++ b/src/main/java/com/conveyal/analysis/models/RegionalAnalysis.java @@ -1,6 +1,5 @@ package com.conveyal.analysis.models; -import com.conveyal.analysis.AnalysisServerException; import com.conveyal.analysis.results.CsvResultType; import com.conveyal.r5.analyst.cluster.RegionalTask; import org.locationtech.jts.geom.Geometry; @@ -106,7 +105,7 @@ public RegionalAnalysis clone () { try { return (RegionalAnalysis) super.clone(); } catch (CloneNotSupportedException e) { - throw AnalysisServerException.unknown(e); + throw new RuntimeException(e); } } } diff --git a/src/main/java/com/conveyal/analysis/results/AccessCsvResultWriter.java b/src/main/java/com/conveyal/analysis/results/AccessCsvResultWriter.java index e208ac827..78ad694f2 100644 --- a/src/main/java/com/conveyal/analysis/results/AccessCsvResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/AccessCsvResultWriter.java @@ -1,17 +1,14 @@ package com.conveyal.analysis.results; -import com.conveyal.file.FileStorage; import com.conveyal.r5.analyst.cluster.RegionalTask; import com.conveyal.r5.analyst.cluster.RegionalWorkResult; -import java.io.IOException; import java.util.ArrayList; import java.util.List; public class AccessCsvResultWriter extends CsvResultWriter { - - public AccessCsvResultWriter (RegionalTask task, FileStorage fileStorage) throws IOException { - super(task, fileStorage); + public AccessCsvResultWriter(RegionalTask task) { + super(task); } @Override @@ -49,13 +46,14 @@ public Iterable rowValues (RegionalWorkResult workResult) { for (int p = 0; p < task.percentiles.length; p++) { int[] cutoffsForPercentile = percentilesForDestPointset[p]; for (int c = 0; c < task.cutoffsMinutes.length; c++) { + int accessibilityValue = cutoffsForPercentile[c]; // Ideally we'd output the pointset IDs (rather than keys) which we have in the RegionalAnalysis rows.add(new String[] { originId, task.destinationPointSetKeys[d], Integer.toString(task.percentiles[p]), Integer.toString(task.cutoffsMinutes[c]), - Integer.toString(workResult.accessibilityValues[d][p][c]) + Integer.toString(accessibilityValue) }); } } diff --git a/src/main/java/com/conveyal/analysis/results/BaseResultWriter.java b/src/main/java/com/conveyal/analysis/results/BaseResultWriter.java deleted file mode 100644 index 8bcf94d26..000000000 --- a/src/main/java/com/conveyal/analysis/results/BaseResultWriter.java +++ /dev/null @@ -1,87 +0,0 @@ -package com.conveyal.analysis.results; - -import com.conveyal.file.FileCategory; -import com.conveyal.file.FileStorage; -import com.conveyal.file.FileStorageKey; -import com.conveyal.file.FileUtils; -import com.google.common.io.ByteStreams; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.zip.GZIPOutputStream; - -import static com.conveyal.file.FileCategory.RESULTS; -import static com.conveyal.r5.common.Util.human; - -/** - * This is an abstract base class for writing regional analysis results into a file for long term - * storage. It provides reuseable logic for creating local buffer files and uploading them to long - * term cloud storage once the regional analysis is complete. Concrete subclasses handle writing CSV - * or proprietary binary grid files, depending on the type of regional analysis. - */ -public abstract class BaseResultWriter { - - private static final Logger LOG = LoggerFactory.getLogger(BaseResultWriter.class); - - private final FileStorage fileStorage; - - protected File bufferFile; - - public BaseResultWriter (FileStorage fileStorage) { - this.fileStorage = fileStorage; - } - - // Can this be merged into the constructor? - protected void prepare (String jobId) { - try { - bufferFile = File.createTempFile(jobId + "_", ".results"); - // On unexpected server shutdown, these files should be deleted. - // We could attempt to recover from shutdowns but that will take a lot of changes and persisted data. - bufferFile.deleteOnExit(); - } catch (IOException e) { - LOG.error("Exception while creating buffer file for multi-origin assembler: " + e.toString()); - } - } - - /** - * Gzip the access grid and store it. - */ - protected synchronized void finish (String fileName) throws IOException { - LOG.info("Compressing {} and moving into file storage.", fileName); - FileStorageKey fileStorageKey = new FileStorageKey(RESULTS, fileName); - File gzippedResultFile = FileUtils.createScratchFile(); - - // There's probably a more elegant way to do this with NIO and without closing the buffer. - // That would be Files.copy(File.toPath(),X) or ByteStreams.copy. - // Perhaps better: we could wrap the output buffer in a gzip output stream and zip as we write out. - InputStream is = new BufferedInputStream(new FileInputStream(bufferFile)); - OutputStream os = new GZIPOutputStream(new BufferedOutputStream(new FileOutputStream(gzippedResultFile))); - ByteStreams.copy(is, os); - is.close(); - os.close(); - - LOG.info("GZIP compression reduced analysis results {} from {} to {} ({}x compression)", - fileName, - human(bufferFile.length(), "B"), - human(gzippedResultFile.length(), "B"), - (double) bufferFile.length() / gzippedResultFile.length() - ); - - fileStorage.moveIntoStorage(fileStorageKey, gzippedResultFile); - bufferFile.delete(); - } - - /** - * Close all buffers and temporary files. - */ - abstract void terminate () throws Exception; - -} diff --git a/src/main/java/com/conveyal/analysis/results/CsvResultWriter.java b/src/main/java/com/conveyal/analysis/results/CsvResultWriter.java index ca0cf09c5..dc642f07f 100644 --- a/src/main/java/com/conveyal/analysis/results/CsvResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/CsvResultWriter.java @@ -1,6 +1,9 @@ package com.conveyal.analysis.results; -import com.conveyal.file.FileStorage; +import com.conveyal.file.FileCategory; +import com.conveyal.file.FileEntry; +import com.conveyal.file.FileStorageKey; +import com.conveyal.file.FileUtils; import com.conveyal.r5.analyst.cluster.RegionalTask; import com.conveyal.r5.analyst.cluster.RegionalWorkResult; import com.csvreader.CsvWriter; @@ -9,11 +12,11 @@ import org.slf4j.LoggerFactory; import java.io.BufferedWriter; +import java.io.File; import java.io.FileWriter; import java.io.IOException; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; /** @@ -21,11 +24,10 @@ * Subclasses are used to record origin/destination "skim" matrices, accessibility indicators for non-gridded * ("freeform") origin point sets, and cataloging paths between pairs of origins and destinations. */ -public abstract class CsvResultWriter extends BaseResultWriter implements RegionalResultWriter { +public abstract class CsvResultWriter implements RegionalResultWriter { private static final Logger LOG = LoggerFactory.getLogger(CsvResultWriter.class); - - public final String fileName; + private final File bufferFile = FileUtils.createScratchFile("csv"); private final CsvWriter csvWriter; private int nDataColumns; @@ -58,38 +60,46 @@ public abstract class CsvResultWriter extends BaseResultWriter implements Region /** * Construct a writer to record incoming results in a CSV file, with header row consisting of * "origin", "destination", and the supplied indicator. - * FIXME it's strange we're manually passing injectable components into objects not wired up at application construction. */ - CsvResultWriter (RegionalTask task, FileStorage fileStorage) throws IOException { - super(fileStorage); + CsvResultWriter (RegionalTask task) { checkArgument(task.originPointSet != null, "CsvResultWriters require FreeFormPointSet origins."); - super.prepare(task.jobId); - this.fileName = task.jobId + "_" + resultType() +".csv"; - BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(bufferFile)); - csvWriter = new CsvWriter(bufferedWriter, ','); - setDataColumns(columnHeaders()); + String[] columns = columnHeaders(); + csvWriter = getBufferedCsvWriter(columns); + this.nDataColumns = columns.length; this.task = task; LOG.info("Created CSV file to hold {} results for regional job {}", resultType(), task.jobId); } + public String getFileName() { + return task.jobId + "_" + resultType() + ".csv"; + } + /** - * Writes a header row containing the supplied data columns. + * Initialize the CsvWriter with a row of the supplied data columns. */ - protected void setDataColumns(String... columns) throws IOException { - this.nDataColumns = columns.length; - csvWriter.writeRecord(columns); + private CsvWriter getBufferedCsvWriter(String[] columnHeaders) { + try { + BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(bufferFile)); + var writer = new CsvWriter(bufferedWriter, ','); + writer.writeRecord(columnHeaders); + return writer; + } catch (IOException ioException) { + throw new RuntimeException(ioException); + } } /** * Gzip the csv file and move it into permanent file storage such as AWS S3. - * Note: stored file will undergo gzip compression in super.finish(), but be stored with a .csv extension. + * Note: stored file will undergo gzip compression but be stored with a .csv extension. * When this file is downloaded from the UI, the browser will decompress, yielding a logically named .csv file. * Downloads through another channel (e.g. aws s3 cp), will need to be decompressed manually. */ @Override - public synchronized void finish () throws IOException { + public synchronized FileEntry finish() throws IOException { csvWriter.close(); - super.finish(this.fileName); + var gzippedFile = FileUtils.gzipFile(bufferFile); + bufferFile.delete(); + return new FileEntry(new FileStorageKey(FileCategory.RESULTS, getFileName()), gzippedFile); } /** diff --git a/src/main/java/com/conveyal/analysis/results/GridResultWriter.java b/src/main/java/com/conveyal/analysis/results/GridResultWriter.java index 88b1a8c08..b6d2f0f65 100644 --- a/src/main/java/com/conveyal/analysis/results/GridResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/GridResultWriter.java @@ -1,16 +1,23 @@ package com.conveyal.analysis.results; -import com.conveyal.file.FileStorage; +import com.conveyal.file.FileCategory; +import com.conveyal.file.FileEntry; +import com.conveyal.file.FileStorageKey; +import com.conveyal.file.FileUtils; import com.conveyal.r5.analyst.LittleEndianIntOutputStream; import com.conveyal.r5.analyst.cluster.RegionalTask; +import com.conveyal.r5.analyst.cluster.RegionalWorkResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.List; import static com.conveyal.r5.common.Util.human; @@ -37,11 +44,12 @@ *
  • (repeated 4-byte int) values of each pixel in row-major order: axis order (row, column, channel).
  • * */ -public class GridResultWriter extends BaseResultWriter { +public class GridResultWriter implements RegionalResultWriter { private static final Logger LOG = LoggerFactory.getLogger(GridResultWriter.class); - private RandomAccessFile randomAccessFile; + private final File bufferFile = FileUtils.createScratchFile("grid"); + private final RandomAccessFile randomAccessFile; /** The version of the access grids we produce */ private static final int ACCESS_GRID_VERSION = 0; @@ -59,13 +67,42 @@ public class GridResultWriter extends BaseResultWriter { */ private final int channels; + private final int percentileIndex; + private final int destinationIndex; + private final String gridFileName; + + /** + * We create one GridResultWriter for each destination pointset and percentile. + * Each of those output files contains data for all specified travel time cutoffs at each origin. + */ + public static List createWritersFromTask(String[] destinationPointSetIds, RegionalTask task) { + int nPercentiles = task.percentiles.length; + int nDestinationPointSets = destinationPointSetIds.length; + // Create one grid writer per percentile and destination pointset. + var gridWriters = new ArrayList(); + for (int destinationIndex = 0; destinationIndex < nDestinationPointSets; destinationIndex++) { + for (int percentileIndex = 0; percentileIndex < nPercentiles; percentileIndex++) { + String destinationPointSetId = destinationPointSetIds[destinationIndex]; + gridWriters.add(new GridResultWriter( + task, + percentileIndex, + destinationIndex, + destinationPointSetId + )); + } + } + return gridWriters; + } + /** - * Construct an writer for a single regional analysis result grid, using the proprietary + * Construct a writer for a single regional analysis result grid, using the proprietary * Conveyal grid format. This also creates the on-disk scratch buffer into which the results * from the workers will be accumulated. */ - GridResultWriter (RegionalTask task, FileStorage fileStorage) { - super(fileStorage); + GridResultWriter (RegionalTask task, int percentileIndex, int destinationIndex, String destinationPointSetId) { + this.gridFileName = String.format("%s_%s_P%d.access", task.jobId, destinationPointSetId, task.percentiles[percentileIndex]); + this.percentileIndex = percentileIndex; + this.destinationIndex = destinationIndex; int width = task.width; int height = task.height; this.channels = task.cutoffsMinutes.length; @@ -75,7 +112,6 @@ public class GridResultWriter extends BaseResultWriter { height, channels ); - super.prepare(task.jobId); try { // Write the access grid file header to the temporary file. @@ -111,11 +147,15 @@ public class GridResultWriter extends BaseResultWriter { } } - /** Gzip the access grid and upload it to file storage (such as AWS S3). */ + /** + * Gzip the access grid and return the files. + */ @Override - protected synchronized void finish (String fileName) throws IOException { - super.finish(fileName); + public synchronized FileEntry finish() throws IOException { randomAccessFile.close(); + var gzippedFile = FileUtils.gzipFile(bufferFile); + bufferFile.delete(); + return new FileEntry(new FileStorageKey(FileCategory.RESULTS, gridFileName), gzippedFile); } /** @@ -130,18 +170,25 @@ private static byte[] intToLittleEndianByteArray (int i) { return byteBuffer.array(); } + @Override + public void writeOneWorkResult(RegionalWorkResult workResult) throws Exception { + // Drop work results for this particular origin into a little-endian output file. + int[][] percentilesForGrid = workResult.accessibilityValues[destinationIndex]; + int[] cutoffsForPercentile = percentilesForGrid[percentileIndex]; + writeOneOrigin(workResult.taskId, cutoffsForPercentile); + } + /** * Write all channels at once to the proper subregion of the buffer for this origin. The origins we receive have 2d * coordinates. Flatten them to compute file offsets and for the origin checklist. */ - synchronized void writeOneOrigin (int taskNumber, int[] values) throws IOException { + private void writeOneOrigin (int taskNumber, int[] values) throws IOException { if (values.length != channels) { throw new IllegalArgumentException("Number of channels to be written does not match this writer."); } long offset = HEADER_LENGTH_BYTES + (taskNumber * channels * Integer.BYTES); // RandomAccessFile is not threadsafe and multiple threads may call this, so synchronize. - // TODO why is the method also synchronized then? - synchronized (this) { + synchronized (randomAccessFile) { randomAccessFile.seek(offset); // FIXME should this be delta-coded? The Selecting grid reducer seems to expect it to be. int lastValue = 0; @@ -154,7 +201,7 @@ synchronized void writeOneOrigin (int taskNumber, int[] values) throws IOExcepti } @Override - synchronized void terminate () throws IOException { + public synchronized void terminate () throws IOException { randomAccessFile.close(); bufferFile.delete(); } diff --git a/src/main/java/com/conveyal/analysis/results/MultiGridResultWriter.java b/src/main/java/com/conveyal/analysis/results/MultiGridResultWriter.java deleted file mode 100644 index 5f4d90f8a..000000000 --- a/src/main/java/com/conveyal/analysis/results/MultiGridResultWriter.java +++ /dev/null @@ -1,88 +0,0 @@ -package com.conveyal.analysis.results; - -import com.conveyal.analysis.models.RegionalAnalysis; -import com.conveyal.file.FileStorage; -import com.conveyal.r5.analyst.cluster.RegionalTask; -import com.conveyal.r5.analyst.cluster.RegionalWorkResult; - -/** - * Adapts our collection of grid writers (one for each destination pointset and percentile) to give them the - * same interface as our CSV writers, so CSV and Grids can be processed similarly in MultiOriginAssembler. - */ -public class MultiGridResultWriter implements RegionalResultWriter { - - private final RegionalAnalysis regionalAnalysis; - - private final RegionalTask task; - - /** - * We create one GridResultWriter for each destination pointset and percentile. - * Each of those output files contains data for all travel time cutoffs at each origin. - */ - private final GridResultWriter[][] accessibilityGridWriters; - - /** The number of different percentiles for which we're calculating accessibility on the workers. */ - private final int nPercentiles; - - /** The number of destination pointsets to which we're calculating accessibility */ - private final int nDestinationPointSets; - - /** Constructor */ - public MultiGridResultWriter ( - RegionalAnalysis regionalAnalysis, RegionalTask task, FileStorage fileStorage - ) { - // We are storing the regional analysis just to get its pointset IDs (not keys) and its own ID. - this.regionalAnalysis = regionalAnalysis; - this.task = task; - this.nPercentiles = task.percentiles.length; - this.nDestinationPointSets = task.makeTauiSite ? 0 : task.destinationPointSetKeys.length; - // Create one grid writer per percentile and destination pointset. - accessibilityGridWriters = new GridResultWriter[nDestinationPointSets][nPercentiles]; - for (int d = 0; d < nDestinationPointSets; d++) { - for (int p = 0; p < nPercentiles; p++) { - accessibilityGridWriters[d][p] = new GridResultWriter(task, fileStorage); - } - } - } - - @Override - public void writeOneWorkResult (RegionalWorkResult workResult) throws Exception { - // Drop work results for this particular origin into a little-endian output file. - // TODO more efficient way to write little-endian integers - // TODO check monotonic increasing invariants here rather than in worker. - // Infer x and y cell indexes based on the template task - int taskNumber = workResult.taskId; - for (int d = 0; d < workResult.accessibilityValues.length; d++) { - int[][] percentilesForGrid = workResult.accessibilityValues[d]; - for (int p = 0; p < nPercentiles; p++) { - int[] cutoffsForPercentile = percentilesForGrid[p]; - GridResultWriter gridWriter = accessibilityGridWriters[d][p]; - gridWriter.writeOneOrigin(taskNumber, cutoffsForPercentile); - } - } - } - - @Override - public void terminate () throws Exception { - for (GridResultWriter[] writers : accessibilityGridWriters) { - for (GridResultWriter writer : writers) { - writer.terminate(); - } - } - } - - @Override - public void finish () throws Exception { - for (int d = 0; d < nDestinationPointSets; d++) { - for (int p = 0; p < nPercentiles; p++) { - int percentile = task.percentiles[p]; - String destinationPointSetId = regionalAnalysis.destinationPointSetIds[d]; - // TODO verify that regionalAnalysis._id is the same as job.jobId - String gridFileName = - String.format("%s_%s_P%d.access", regionalAnalysis._id, destinationPointSetId, percentile); - accessibilityGridWriters[d][p].finish(gridFileName); - } - } - } - -} diff --git a/src/main/java/com/conveyal/analysis/results/MultiOriginAssembler.java b/src/main/java/com/conveyal/analysis/results/MultiOriginAssembler.java index dc2b0b150..8d6a2f805 100644 --- a/src/main/java/com/conveyal/analysis/results/MultiOriginAssembler.java +++ b/src/main/java/com/conveyal/analysis/results/MultiOriginAssembler.java @@ -1,16 +1,8 @@ package com.conveyal.analysis.results; -import com.conveyal.analysis.AnalysisServerException; import com.conveyal.analysis.components.broker.Job; -import com.conveyal.analysis.models.RegionalAnalysis; -import com.conveyal.analysis.persistence.Persistence; -import com.conveyal.file.FileStorage; -import com.conveyal.file.FileStorageFormat; -import com.conveyal.r5.analyst.PointSet; -import com.conveyal.r5.analyst.cluster.PathResult; -import com.conveyal.r5.analyst.cluster.RegionalTask; +import com.conveyal.file.FileEntry; import com.conveyal.r5.analyst.cluster.RegionalWorkResult; -import com.conveyal.r5.util.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,9 +10,6 @@ import java.util.BitSet; import java.util.List; -import static com.conveyal.r5.common.Util.notNullOrEmpty; -import static com.google.common.base.Preconditions.checkArgument; - /** * This assembles regional results arriving from workers into one or more files per regional analysis on * the backend. This is not a singleton component: one MultiOriginAssembler instance is created per currently active @@ -30,17 +19,6 @@ public class MultiOriginAssembler { public static final Logger LOG = LoggerFactory.getLogger(MultiOriginAssembler.class); - private static final int MAX_FREEFORM_OD_PAIRS = 16_000_000; - - private static final int MAX_FREEFORM_DESTINATIONS = 4_000_000; - - /** - * The regional analysis for which this object is assembling results. - * We retain the whole object rather than just its ID so we'll have the full details, e.g. destination point set - * IDs and scenario, things that are stripped out of the template task sent to the workers. - */ - private final RegionalAnalysis regionalAnalysis; - /** * The object representing the progress of the regional analysis as tracked by the broker. * It may appear job.templateTask has all the information needed, making the regionalAnalysis field @@ -50,7 +28,7 @@ public class MultiOriginAssembler { public final Job job; // One writer per CSV/Grids we're outputting - private List resultWriters = new ArrayList<>(); + private final List resultWriters; /** * The number of distinct origin points for which we've received at least one result. If for @@ -70,121 +48,14 @@ public class MultiOriginAssembler { */ private final BitSet originsReceived; - /** - * Total number of origin points for which we're expecting results. Note that the total - * number of results received could be higher in the event of an overzealous task redelivery. - */ - public final int nOriginsTotal; - /** * Constructor. This sets up one or more ResultWriters depending on whether we're writing gridded or non-gridded * cumulative opportunities accessibility, or origin-destination travel times. - * TODO do not pass the FileStorage component down into this non-component and the ResultWriter non-component, - * clarify design concepts on this point (e.g. only components should know other components exist). - * Rather than pushing the component all the way down to the leaf function call, we return the finished - * file up to an umbrella location where a single reference to the file storage can be used to - * store all of them. */ - public MultiOriginAssembler (RegionalAnalysis regionalAnalysis, Job job, FileStorage fileStorage) { - try { - this.regionalAnalysis = regionalAnalysis; - this.job = job; - this.nOriginsTotal = job.nTasksTotal; - this.originsReceived = new BitSet(job.nTasksTotal); - // If results have been requested for freeform origins, check that the origin and - // destination pointsets are not too big for generating CSV files. - RegionalTask task = job.templateTask; - if (!task.makeTauiSite && task.destinationPointSetKeys[0].endsWith(FileStorageFormat.FREEFORM.extension)) { - // This requires us to have already loaded this destination pointset instance into the transient field. - PointSet destinationPointSet = task.destinationPointSets[0]; - int nDestinations = destinationPointSet.featureCount(); - int nODPairs = task.oneToOne ? nOriginsTotal : nOriginsTotal * nDestinations; - if (task.recordTimes && - (nDestinations > MAX_FREEFORM_DESTINATIONS || nODPairs > MAX_FREEFORM_OD_PAIRS)) { - throw AnalysisServerException.badRequest(String.format( - "Travel time results limited to %d destinations and %d origin-destination pairs.", - MAX_FREEFORM_DESTINATIONS, MAX_FREEFORM_OD_PAIRS - )); - } - if (task.includePathResults && - (nDestinations > PathResult.MAX_PATH_DESTINATIONS || nODPairs > MAX_FREEFORM_OD_PAIRS)) { - throw AnalysisServerException.badRequest(String.format( - "Path results limited to %d destinations and %d origin-destination pairs.", - PathResult.MAX_PATH_DESTINATIONS, MAX_FREEFORM_OD_PAIRS - )); - } - } - - if (job.templateTask.recordAccessibility) { - if (job.templateTask.originPointSet != null) { - // Freeform origins - create CSV regional analysis results - resultWriters.add(new AccessCsvResultWriter(job.templateTask, fileStorage)); - } else { - // Gridded origins - create gridded regional analysis results - resultWriters.add(new MultiGridResultWriter(regionalAnalysis, job.templateTask, fileStorage)); - } - } - - if (job.templateTask.recordTimes) { - resultWriters.add(new TimeCsvResultWriter(job.templateTask, fileStorage)); - } - - if (job.templateTask.includePathResults) { - resultWriters.add(new PathCsvResultWriter(job.templateTask, fileStorage)); - } - - if (job.templateTask.includeTemporalDensity) { - if (job.templateTask.originPointSet == null) { - // Gridded origins. The full temporal density information is probably too voluminous to be useful. - // We might want to record a grid of dual accessibility values, but this will require some serious - // refactoring of the GridResultWriter. - // if (job.templateTask.dualAccessibilityThreshold > 0) { ... } - throw new IllegalArgumentException("Temporal density of opportunities cannot be recorded for gridded origin points."); - } else { - // Freeform origins. - // Output includes temporal density of opportunities and optionally dual accessibility. - resultWriters.add(new TemporalDensityCsvResultWriter(job.templateTask, fileStorage)); - } - } - - checkArgument(job.templateTask.makeTauiSite || notNullOrEmpty(resultWriters), - "A non-Taui regional analysis should always create at least one grid or CSV file."); - - // Record the paths of any CSV files that will be produced by this analysis. - // The caller must flush the RegionalAnalysis back out to the database to retain this information. - // We avoid database access here in constructors, especially when called in synchronized methods. - for (RegionalResultWriter writer : resultWriters) { - // FIXME instanceof+cast is ugly, do this some other way or even record the Grids - if (writer instanceof CsvResultWriter) { - CsvResultWriter csvWriter = (CsvResultWriter) writer; - regionalAnalysis.resultStorage.put(csvWriter.resultType(), csvWriter.fileName); - } - } - } catch (AnalysisServerException e) { - throw e; - } catch (Exception e) { - // Handle any obscure problems we don't want end users to see without context of MultiOriginAssembler. - throw new RuntimeException("Exception while creating multi-origin assembler: " + e.toString(), e); - } - } - - /** - * Gzip the output files and persist them to cloud storage. - */ - private synchronized void finish() { - LOG.info("Finished receiving data for multi-origin analysis {}", job.jobId); - try { - for (RegionalResultWriter writer : resultWriters) { - writer.finish(); - } - regionalAnalysis.complete = true; - // Write updated regionalAnalysis object back out to database, to mark it complete and record locations - // of any CSV files generated. Use method that updates lock/timestamp, otherwise updates are not seen in UI. - // TODO verify whether there is a reason to use regionalAnalyses.modifyWithoutUpdatingLock(). - Persistence.regionalAnalyses.put(regionalAnalysis); - } catch (Exception e) { - LOG.error("Error uploading results of multi-origin analysis {}", job.jobId, e); - } + public MultiOriginAssembler (Job job, List resultWriters) { + this.job = job; + this.resultWriters = resultWriters; + this.originsReceived = new BitSet(job.nTasksTotal); } /** @@ -195,19 +66,34 @@ private synchronized void finish() { * this class are also synchronized for good measure. There should be no additional cost to retaining the lock when * entering those methods. */ - public synchronized void handleMessage (RegionalWorkResult workResult) throws Exception { - for (RegionalResultWriter writer : resultWriters) { - writer.writeOneWorkResult(workResult); - } + public void handleMessage(RegionalWorkResult workResult) throws Exception { // Don't double-count origins if we receive them more than once. Atomic get-and-increment requires // synchronization, currently achieved by synchronizing this entire method. if (!originsReceived.get(workResult.taskId)) { originsReceived.set(workResult.taskId); nComplete += 1; + + for (RegionalResultWriter writer : resultWriters) { + writer.writeOneWorkResult(workResult); + } } - if (nComplete == nOriginsTotal) { - finish(); + } + + public List finish() { + var resultFiles = new ArrayList(resultWriters.size()); + LOG.info("Finished receiving data for multi-origin analysis {}", job.jobId); + try { + for (RegionalResultWriter writer : resultWriters) { + resultFiles.add(writer.finish()); + } + } catch (Exception e) { + LOG.error("Error uploading results of multi-origin analysis {}", job.jobId, e); } + return resultFiles; + } + + public boolean isComplete() { + return nComplete == job.nTasksTotal; } /** Clean up and cancel this grid assembler, typically when a job is canceled while still being processed. */ diff --git a/src/main/java/com/conveyal/analysis/results/PathCsvResultWriter.java b/src/main/java/com/conveyal/analysis/results/PathCsvResultWriter.java index 0dadb4337..5a1515ccc 100644 --- a/src/main/java/com/conveyal/analysis/results/PathCsvResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/PathCsvResultWriter.java @@ -1,12 +1,10 @@ package com.conveyal.analysis.results; -import com.conveyal.file.FileStorage; import com.conveyal.r5.analyst.cluster.PathResult; import com.conveyal.r5.analyst.cluster.RegionalTask; import com.conveyal.r5.analyst.cluster.RegionalWorkResult; import org.apache.commons.lang3.ArrayUtils; -import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -14,8 +12,8 @@ public class PathCsvResultWriter extends CsvResultWriter { - public PathCsvResultWriter (RegionalTask task, FileStorage fileStorage) throws IOException { - super(task, fileStorage); + public PathCsvResultWriter (RegionalTask task) { + super(task); } @Override diff --git a/src/main/java/com/conveyal/analysis/results/RegionalResultWriter.java b/src/main/java/com/conveyal/analysis/results/RegionalResultWriter.java index 8380a5bea..6ace604a0 100644 --- a/src/main/java/com/conveyal/analysis/results/RegionalResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/RegionalResultWriter.java @@ -1,18 +1,16 @@ package com.conveyal.analysis.results; +import com.conveyal.file.FileEntry; import com.conveyal.r5.analyst.cluster.RegionalWorkResult; -import java.io.IOException; - /** * Common interface for classes that write regional results out to CSV or Grids on the backend. */ -interface RegionalResultWriter { +public interface RegionalResultWriter { void writeOneWorkResult (RegionalWorkResult workResult) throws Exception; void terminate () throws Exception; - void finish () throws Exception; - + FileEntry finish() throws Exception; } diff --git a/src/main/java/com/conveyal/analysis/results/TemporalDensityCsvResultWriter.java b/src/main/java/com/conveyal/analysis/results/TemporalDensityCsvResultWriter.java index 02a6168ad..47b9c67e8 100644 --- a/src/main/java/com/conveyal/analysis/results/TemporalDensityCsvResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/TemporalDensityCsvResultWriter.java @@ -1,10 +1,8 @@ package com.conveyal.analysis.results; -import com.conveyal.file.FileStorage; import com.conveyal.r5.analyst.cluster.RegionalTask; import com.conveyal.r5.analyst.cluster.RegionalWorkResult; -import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -17,8 +15,8 @@ public class TemporalDensityCsvResultWriter extends CsvResultWriter { private final int dualThreshold; - public TemporalDensityCsvResultWriter(RegionalTask task, FileStorage fileStorage) throws IOException { - super(task, fileStorage); + public TemporalDensityCsvResultWriter(RegionalTask task) { + super(task); dualThreshold = task.dualAccessibilityThreshold; } diff --git a/src/main/java/com/conveyal/analysis/results/TimeCsvResultWriter.java b/src/main/java/com/conveyal/analysis/results/TimeCsvResultWriter.java index 144da7713..6fe7c672d 100644 --- a/src/main/java/com/conveyal/analysis/results/TimeCsvResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/TimeCsvResultWriter.java @@ -1,21 +1,18 @@ package com.conveyal.analysis.results; -import com.conveyal.file.FileStorage; import com.conveyal.r5.analyst.FreeFormPointSet; import com.conveyal.r5.analyst.cluster.RegionalTask; import com.conveyal.r5.analyst.cluster.RegionalWorkResult; -import java.io.IOException; import java.util.ArrayList; import java.util.List; -import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; public class TimeCsvResultWriter extends CsvResultWriter { - public TimeCsvResultWriter (RegionalTask task, FileStorage fileStorage) throws IOException { - super(task, fileStorage); + public TimeCsvResultWriter (RegionalTask task) { + super(task); } @Override diff --git a/src/main/java/com/conveyal/file/FileEntry.java b/src/main/java/com/conveyal/file/FileEntry.java new file mode 100644 index 000000000..5f64383ec --- /dev/null +++ b/src/main/java/com/conveyal/file/FileEntry.java @@ -0,0 +1,6 @@ +package com.conveyal.file; + +import java.io.File; + +public record FileEntry(FileStorageKey key, File file) { +} diff --git a/src/main/java/com/conveyal/file/FileUtils.java b/src/main/java/com/conveyal/file/FileUtils.java index f2eb94842..f871dd814 100644 --- a/src/main/java/com/conveyal/file/FileUtils.java +++ b/src/main/java/com/conveyal/file/FileUtils.java @@ -1,17 +1,9 @@ package com.conveyal.file; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.RandomAccessFile; +import java.io.*; import java.nio.file.Files; import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; public abstract class FileUtils { /** @@ -86,6 +78,19 @@ public static void transferFromFileTo(File file, OutputStream os) { } } + /** + * GZIP a File and return the new File descriptor. + */ + public static File gzipFile(File file) { + try { + File gzippedFile = createScratchFile(); + transferFromFileTo(file, new GZIPOutputStream(getOutputStream(gzippedFile))); + return gzippedFile; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + /** * Get an BufferedInputStream for a file. Read bytes from the underlying file stream without causing a system call * for each byte read. diff --git a/src/main/java/com/conveyal/r5/analyst/cluster/RegionalTask.java b/src/main/java/com/conveyal/r5/analyst/cluster/RegionalTask.java index 0a444f9aa..fa48c5a26 100644 --- a/src/main/java/com/conveyal/r5/analyst/cluster/RegionalTask.java +++ b/src/main/java/com/conveyal/r5/analyst/cluster/RegionalTask.java @@ -2,6 +2,7 @@ import com.conveyal.r5.analyst.PointSet; import com.conveyal.r5.analyst.WebMercatorExtents; +import com.google.common.base.Preconditions; /** * Represents a task to be performed as part of a regional analysis. @@ -9,6 +10,8 @@ */ public class RegionalTask extends AnalysisWorkerTask implements Cloneable { + public static final int MAX_FREEFORM_OD_PAIRS = 16_000_000; + public static final int MAX_FREEFORM_DESTINATIONS = 4_000_000; /** * The storage key for the pointset we will compute access to (e.g. regionId/datasetId.grid). * This is named grid instead of destinationPointSetId for backward compatibility, namely the ability to start @@ -112,4 +115,29 @@ public int nTargetsPerOrigin () { } } + public int getTasksTotal() { + if (originPointSetKey != null) { + Preconditions.checkNotNull(originPointSet); + return originPointSet.featureCount(); + } else { + return width * height; + } + } + + /** + * Check that origin and destination sets are not too big for generating CSV files. + */ + public void checkIsUnderOdPairLimit(PointSet destinationPointSet) { + // This requires us to have already loaded this destination pointset instance into the transient field. + if ((recordTimes || includePathResults) && !oneToOne) { + if (getTasksTotal() * destinationPointSet.featureCount() > MAX_FREEFORM_OD_PAIRS || + destinationPointSet.featureCount() > MAX_FREEFORM_DESTINATIONS + ) { + throw new RuntimeException(String.format( + "Freeform requests limited to %d destinations and %d origin-destination pairs.", + MAX_FREEFORM_DESTINATIONS, MAX_FREEFORM_OD_PAIRS + )); + } + } + } }