Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the multi-origin result assemblers #860

Draft
wants to merge 13 commits into
base: dev
Choose a base branch
from
Draft
97 changes: 20 additions & 77 deletions src/main/java/com/conveyal/analysis/components/broker/Broker.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,12 @@
import com.conveyal.analysis.components.eventbus.EventBus;
import com.conveyal.analysis.components.eventbus.RegionalAnalysisEvent;
import com.conveyal.analysis.components.eventbus.WorkerEvent;
import com.conveyal.analysis.models.RegionalAnalysis;
import com.conveyal.analysis.results.MultiOriginAssembler;
import com.conveyal.analysis.util.JsonUtil;
import com.conveyal.file.FileStorage;
import com.conveyal.file.FileStorageKey;
import com.conveyal.file.FileUtils;
import com.conveyal.r5.analyst.WorkerCategory;
import com.conveyal.r5.analyst.cluster.RegionalTask;
import com.conveyal.r5.analyst.cluster.RegionalWorkResult;
import com.conveyal.r5.analyst.cluster.WorkerStatus;
import com.conveyal.r5.analyst.scenario.Scenario;
import com.conveyal.r5.util.ExceptionUtils;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.MultimapBuilder;
Expand All @@ -27,8 +22,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -42,7 +35,6 @@
import static com.conveyal.analysis.components.eventbus.WorkerEvent.Action.REQUESTED;
import static com.conveyal.analysis.components.eventbus.WorkerEvent.Role.REGIONAL;
import static com.conveyal.analysis.components.eventbus.WorkerEvent.Role.SINGLE_POINT;
import static com.conveyal.file.FileCategory.BUNDLES;
import static com.google.common.base.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -98,6 +90,7 @@ public interface Config {
private final Config config;

// Component Dependencies
// Result assemblers return files that need to be permanently stored.
private final FileStorage fileStorage;
private final EventBus eventBus;
private final WorkerLauncher workerLauncher;
Expand Down Expand Up @@ -151,7 +144,7 @@ public interface Config {
public TObjectLongMap<WorkerCategory> recentlyRequestedWorkers =
TCollections.synchronizedMap(new TObjectLongHashMap<>());

public Broker (Config config, FileStorage fileStorage, EventBus eventBus, WorkerLauncher workerLauncher) {
public Broker(Config config, FileStorage fileStorage, EventBus eventBus, WorkerLauncher workerLauncher) {
this.config = config;
this.fileStorage = fileStorage;
this.eventBus = eventBus;
Expand All @@ -162,27 +155,14 @@ public Broker (Config config, FileStorage fileStorage, EventBus eventBus, Worker
* Enqueue a set of tasks for a regional analysis.
* Only a single task is passed in, which the broker will expand into all the individual tasks for a regional job.
*/
public synchronized void enqueueTasksForRegionalJob (RegionalAnalysis regionalAnalysis) {

// Make a copy of the regional task inside the RegionalAnalysis, replacing the scenario with a scenario ID.
RegionalTask templateTask = templateTaskFromRegionalAnalysis(regionalAnalysis);

LOG.info("Enqueuing tasks for job {} using template task.", templateTask.jobId);
if (findJob(templateTask.jobId) != null) {
LOG.error("Someone tried to enqueue job {} but it already exists.", templateTask.jobId);
throw new RuntimeException("Enqueued duplicate job " + templateTask.jobId);
public synchronized void enqueueTasksForRegionalJob (Job job, MultiOriginAssembler assembler) {
LOG.info("Enqueuing tasks for job {} using template task.", job.jobId);
if (findJob(job.jobId) != null) {
LOG.error("Someone tried to enqueue job {} but it already exists.", job.jobId);
throw new RuntimeException("Enqueued duplicate job " + job.jobId);
}
// Create the Job object to share with the MultiOriginAssembler, but defer adding this job to the Multimap of
// active jobs until we're sure the result assembler was constructed without any errors. Always add and remove
// the Job and corresponding MultiOriginAssembler as a unit in the same synchronized block of code (see #887).
WorkerTags workerTags = WorkerTags.fromRegionalAnalysis(regionalAnalysis);
Job job = new Job(templateTask, workerTags);

// Register the regional job so results received from multiple workers can be assembled into one file.
// If any parameters fail checks here, an exception may cause this method to exit early.
// TODO encapsulate MultiOriginAssemblers in a new Component
MultiOriginAssembler assembler = new MultiOriginAssembler(regionalAnalysis, job, fileStorage);
resultAssemblers.put(templateTask.jobId, assembler);
resultAssemblers.put(job.jobId, assembler);

// A MultiOriginAssembler was successfully put in place. It's now safe to register and start the Job.
jobs.put(job.workerCategory, job);
Expand All @@ -193,56 +173,12 @@ public synchronized void enqueueTasksForRegionalJob (RegionalAnalysis regionalAn
}

if (workerCatalog.noWorkersAvailable(job.workerCategory, config.offline())) {
createOnDemandWorkerInCategory(job.workerCategory, workerTags);
createOnDemandWorkerInCategory(job.workerCategory, job.workerTags);
} else {
// Workers exist in this category, clear out any record that we're waiting for one to start up.
recentlyRequestedWorkers.remove(job.workerCategory);
}
eventBus.send(new RegionalAnalysisEvent(templateTask.jobId, STARTED).forUser(workerTags.user, workerTags.group));
}

/**
* The single RegionalTask object represents a lot of individual accessibility tasks at many different origin
* points, typically on a grid. Before passing that RegionalTask on to the Broker (which distributes tasks to
* workers and tracks progress), we remove the details of the scenario, substituting the scenario's unique ID
* to save time and bandwidth. This avoids repeatedly sending the scenario details to the worker in every task,
* as they are often quite voluminous. The workers will fetch the scenario once from S3 and cache it based on
* its ID only. We protectively clone this task because we're going to null out its scenario field, and don't
* want to affect the original object which contains all the scenario details.
* TODO Why is all this detail added after the Persistence call?
* We don't want to store all the details added below in Mongo?
*/
private RegionalTask templateTaskFromRegionalAnalysis (RegionalAnalysis regionalAnalysis) {
RegionalTask templateTask = regionalAnalysis.request.clone();
// First replace the inline scenario with a scenario ID, storing the scenario for retrieval by workers.
Scenario scenario = templateTask.scenario;
templateTask.scenarioId = scenario.id;
// Null out the scenario in the template task, avoiding repeated serialization to the workers as massive JSON.
templateTask.scenario = null;
String fileName = String.format("%s_%s.json", regionalAnalysis.bundleId, scenario.id);
FileStorageKey fileStorageKey = new FileStorageKey(BUNDLES, fileName);
try {
File localScenario = FileUtils.createScratchFile("json");
JsonUtil.objectMapper.writeValue(localScenario, scenario);
// FIXME this is using a network service in a method called from a synchronized broker method.
// Move file into storage before entering the synchronized block.
fileStorage.moveIntoStorage(fileStorageKey, localScenario);
} catch (IOException e) {
LOG.error("Error storing scenario for retrieval by workers.", e);
}
// Fill in all the fields in the template task that will remain the same across all tasks in a job.
// I am not sure why we are re-setting all these fields, it seems like they are already set when the task is
// initialized by AnalysisRequest.populateTask. But we'd want to thoroughly check that assumption before
// eliminating or moving these lines.
templateTask.jobId = regionalAnalysis._id;
templateTask.graphId = regionalAnalysis.bundleId;
templateTask.workerVersion = regionalAnalysis.workerVersion;
templateTask.height = regionalAnalysis.height;
templateTask.width = regionalAnalysis.width;
templateTask.north = regionalAnalysis.north;
templateTask.west = regionalAnalysis.west;
templateTask.zoom = regionalAnalysis.zoom;
return templateTask;
eventBus.send(new RegionalAnalysisEvent(job.jobId, STARTED).forUser(job.workerTags.user, job.workerTags.group));
}

/**
Expand Down Expand Up @@ -409,9 +345,7 @@ public synchronized Collection<JobStatus> getAllJobStatuses () {
TObjectIntMap<String> workersPerJob = workerCatalog.activeWorkersPerJob();
Collection<JobStatus> jobStatuses = new ArrayList<>();
for (Job job : jobs.values()) {
JobStatus jobStatus = new JobStatus(job);
jobStatus.activeWorkers = workersPerJob.get(job.jobId);
jobStatuses.add(jobStatus);
jobStatuses.add(new JobStatus(job, workersPerJob.get(job.jobId)));
}
return jobStatuses;
}
Expand Down Expand Up @@ -515,11 +449,20 @@ public void handleRegionalWorkResult(RegionalWorkResult workResult) {
// results from spurious redeliveries, before the assembler is busy finalizing and uploading results.
markTaskCompleted(job, workResult.taskId);
}

// Unlike everything above, result assembly (like starting workers below) does not synchronize on the broker.
// It contains some slow nested operations to move completed results into storage. Really we should not do
// these things synchronously in an HTTP handler called by the worker. We should probably synchronize this
// entire method, then somehow enqueue slower async completion and cleanup tasks in the caller.
assembler.handleMessage(workResult);

if (assembler.isComplete()) {
var resultFiles = assembler.finish();
// Store all result files permanently.
for (var resultFile : resultFiles) {
fileStorage.moveIntoStorage(resultFile.key(), resultFile.file());
}
}
} catch (Throwable t) {
recordJobError(job, ExceptionUtils.stackTraceString(t));
eventBus.send(new ErrorEvent(t));
Expand Down
43 changes: 25 additions & 18 deletions src/main/java/com/conveyal/analysis/components/broker/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.util.Set;

import static com.conveyal.r5.common.Util.notNullOrEmpty;
import static com.google.common.base.Preconditions.checkNotNull;

/**
* A Job is a collection of tasks that represent all the origins in a regional analysis. All the
Expand Down Expand Up @@ -61,13 +60,13 @@ public class Job {
* The number of remaining tasks can be derived from the deliveredTasks BitSet, but as an
* optimization we keep a separate counter to avoid constantly scanning over that whole bitset.
*/
protected int nTasksCompleted;
protected int nTasksCompleted = 0;

/**
* The total number of task deliveries that have occurred. A task may be counted more than
* once if it is redelivered.
*/
protected int nTasksDelivered;
protected int nTasksDelivered = 0;

/** Every task in this job will be based on this template task, but have its origin coordinates changed. */
public final RegionalTask templateTask;
Expand Down Expand Up @@ -131,23 +130,31 @@ private RegionalTask makeOneTask (int taskNumber) {
*/
public final Set<String> errors = new HashSet();

public Job (RegionalTask templateTask, WorkerTags workerTags) {
this.jobId = templateTask.jobId;
this.templateTask = templateTask;
this.workerCategory = new WorkerCategory(templateTask.graphId, templateTask.workerVersion);
this.nTasksCompleted = 0;
this.nextTaskToDeliver = 0;

if (templateTask.originPointSetKey != null) {
checkNotNull(templateTask.originPointSet);
this.nTasksTotal = templateTask.originPointSet.featureCount();
} else {
this.nTasksTotal = templateTask.width * templateTask.height;
}

this.completedTasks = new BitSet(nTasksTotal);
public Job (RegionalTask task, WorkerTags workerTags) {
templateTask = templateTaskFromRegionalTask(task);
jobId = task.jobId;
workerCategory = new WorkerCategory(task.graphId, task.workerVersion);
nTasksTotal = task.getTasksTotal();
completedTasks = new BitSet(nTasksTotal);
this.workerTags = workerTags;
}

/**
* The single RegionalTask object represents a lot of individual accessibility tasks at many different origin
* points, typically on a grid. Before passing that RegionalTask on to the Broker (which distributes tasks to
* workers and tracks progress), we remove the details of the scenario, substituting the scenario's unique ID
* to save time and bandwidth. This avoids repeatedly sending the scenario details to the worker in every task,
* as they are often quite voluminous. The workers will fetch the scenario once from S3 and cache it based on
* its ID only. We protectively clone this task because we're going to null out its scenario field, and don't
* want to affect the original object which contains all the scenario details.
*/
private static RegionalTask templateTaskFromRegionalTask(RegionalTask task) {
RegionalTask templateTask = task.clone();
// First replace the inline scenario with a scenario ID, storing the scenario for retrieval by workers.
templateTask.scenarioId = templateTask.scenario.id;
// Null out the scenario in the template task, avoiding repeated serialization to the workers as massive JSON.
templateTask.scenario = null;
return templateTask;
}

public boolean markTaskCompleted(int taskId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class JobStatus {
public JobStatus () { /* do nothing */ }

/** Summarize the given job to return its status over the REST API. */
public JobStatus (Job job) {
public JobStatus(Job job, int activeWorkers) {
this.jobId = job.jobId;
this.graphId = job.workerCategory.graphId;
this.workerCommit = job.workerCategory.workerVersion;
Expand All @@ -56,5 +56,6 @@ public JobStatus (Job job) {
this.deliveries = job.nTasksDelivered;
this.deliveryPass = job.deliveryPass;
this.errors = job.errors;
this.activeWorkers = activeWorkers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import com.conveyal.analysis.components.BackendComponents;
import com.conveyal.analysis.components.LocalBackendComponents;
import com.conveyal.analysis.models.RegionalAnalysis;
import com.conveyal.analysis.results.MultiOriginAssembler;
import com.conveyal.r5.analyst.cluster.RegionalTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.UUID;

/**
Expand Down Expand Up @@ -66,7 +68,9 @@ private static void sendFakeJob(Broker broker) {
templateTask.scenarioId = "FAKE";
RegionalAnalysis regionalAnalysis = new RegionalAnalysis();
regionalAnalysis.request = templateTask;
broker.enqueueTasksForRegionalJob(regionalAnalysis);
var job = new Job(templateTask, WorkerTags.fromRegionalAnalysis(regionalAnalysis));
var assembler = new MultiOriginAssembler(job, new ArrayList<>());
broker.enqueueTasksForRegionalJob(job, assembler);
}

public static String compactUUID() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import com.conveyal.analysis.persistence.Persistence;
import com.conveyal.analysis.util.HttpStatus;
import com.conveyal.analysis.util.JsonUtil;
import com.conveyal.gtfs.util.Util;
import com.conveyal.r5.analyst.WorkerCategory;
import com.conveyal.r5.analyst.cluster.AnalysisWorker;
import com.conveyal.r5.analyst.cluster.RegionalTask;
Expand All @@ -27,7 +26,6 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.mongodb.QueryBuilder;
import org.apache.commons.math3.analysis.function.Exp;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
Expand All @@ -46,15 +44,13 @@

import java.net.NoRouteToHostException;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static com.conveyal.r5.common.Util.human;
import static com.conveyal.r5.common.Util.notNullOrEmpty;
import static com.google.common.base.Preconditions.checkNotNull;

Expand Down Expand Up @@ -351,12 +347,10 @@ private String getAllWorkers(Request request, Response response) {
* modifies the contents of the task queue.
*/
private Object workerPoll (Request request, Response response) {

WorkerStatus workerStatus = objectFromRequestBody(request, WorkerStatus.class);
List<RegionalWorkResult> perOriginResults = workerStatus.results;

// Record any regional analysis results that were supplied by the worker and mark them completed.
for (RegionalWorkResult workResult : perOriginResults) {
for (RegionalWorkResult workResult : workerStatus.results) {
broker.handleRegionalWorkResult(workResult);
}
// Clear out the results field so it's not visible in the worker list API endpoint.
Expand All @@ -368,7 +362,7 @@ private Object workerPoll (Request request, Response response) {
// See if any appropriate tasks exist for this worker.
List<RegionalTask> tasks = broker.getSomeWork(workerCategory, workerStatus.maxTasksRequested);
// If there is no work for the worker, signal this clearly with a "no content" code,
// so the worker can sleep a while before the next polling attempt.
// so the worker can sleep for a while before the next polling attempt.
if (tasks.isEmpty()) {
return jsonResponse(response, HttpStatus.NO_CONTENT_204, tasks);
} else {
Expand Down
Loading
Loading