diff --git a/src/main/java/com/conveyal/analysis/components/broker/Broker.java b/src/main/java/com/conveyal/analysis/components/broker/Broker.java index 9970506ba..8ad66b0a5 100644 --- a/src/main/java/com/conveyal/analysis/components/broker/Broker.java +++ b/src/main/java/com/conveyal/analysis/components/broker/Broker.java @@ -144,7 +144,7 @@ public interface Config { public TObjectLongMap recentlyRequestedWorkers = TCollections.synchronizedMap(new TObjectLongHashMap<>()); - public Broker (Config config, FileStorage fileStorage, EventBus eventBus, WorkerLauncher workerLauncher) { + public Broker(Config config, FileStorage fileStorage, EventBus eventBus, WorkerLauncher workerLauncher) { this.config = config; this.fileStorage = fileStorage; this.eventBus = eventBus; @@ -345,9 +345,7 @@ public synchronized Collection getAllJobStatuses () { TObjectIntMap workersPerJob = workerCatalog.activeWorkersPerJob(); Collection jobStatuses = new ArrayList<>(); for (Job job : jobs.values()) { - JobStatus jobStatus = new JobStatus(job); - jobStatus.activeWorkers = workersPerJob.get(job.jobId); - jobStatuses.add(jobStatus); + jobStatuses.add(new JobStatus(job, workersPerJob.get(job.jobId))); } return jobStatuses; } @@ -451,15 +449,19 @@ public void handleRegionalWorkResult(RegionalWorkResult workResult) { // results from spurious redeliveries, before the assembler is busy finalizing and uploading results. markTaskCompleted(job, workResult.taskId); } + // Unlike everything above, result assembly (like starting workers below) does not synchronize on the broker. // It contains some slow nested operations to move completed results into storage. Really we should not do // these things synchronously in an HTTP handler called by the worker. We should probably synchronize this // entire method, then somehow enqueue slower async completion and cleanup tasks in the caller. - var resultFiles = assembler.handleMessage(workResult); + assembler.handleMessage(workResult); - // Store all result files permanently. - for (var resultFile : resultFiles.entrySet()) { - fileStorage.moveIntoStorage(resultFile.getKey(), resultFile.getValue()); + if (assembler.isComplete()) { + var resultFiles = assembler.finish(); + // Store all result files permanently. + for (var resultFile : resultFiles) { + fileStorage.moveIntoStorage(resultFile.key(), resultFile.file()); + } } } catch (Throwable t) { recordJobError(job, ExceptionUtils.stackTraceString(t)); diff --git a/src/main/java/com/conveyal/analysis/components/broker/JobStatus.java b/src/main/java/com/conveyal/analysis/components/broker/JobStatus.java index f94a2a6a3..9915f6efd 100644 --- a/src/main/java/com/conveyal/analysis/components/broker/JobStatus.java +++ b/src/main/java/com/conveyal/analysis/components/broker/JobStatus.java @@ -46,7 +46,7 @@ public class JobStatus { public JobStatus () { /* do nothing */ } /** Summarize the given job to return its status over the REST API. */ - public JobStatus (Job job) { + public JobStatus(Job job, int activeWorkers) { this.jobId = job.jobId; this.graphId = job.workerCategory.graphId; this.workerCommit = job.workerCategory.workerVersion; @@ -56,5 +56,6 @@ public JobStatus (Job job) { this.deliveries = job.nTasksDelivered; this.deliveryPass = job.deliveryPass; this.errors = job.errors; + this.activeWorkers = activeWorkers; } } diff --git a/src/main/java/com/conveyal/analysis/controllers/BrokerController.java b/src/main/java/com/conveyal/analysis/controllers/BrokerController.java index 9deb1a882..384be559d 100644 --- a/src/main/java/com/conveyal/analysis/controllers/BrokerController.java +++ b/src/main/java/com/conveyal/analysis/controllers/BrokerController.java @@ -14,7 +14,6 @@ import com.conveyal.analysis.persistence.Persistence; import com.conveyal.analysis.util.HttpStatus; import com.conveyal.analysis.util.JsonUtil; -import com.conveyal.gtfs.util.Util; import com.conveyal.r5.analyst.WorkerCategory; import com.conveyal.r5.analyst.cluster.AnalysisWorker; import com.conveyal.r5.analyst.cluster.RegionalTask; @@ -27,7 +26,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteStreams; import com.mongodb.QueryBuilder; -import org.apache.commons.math3.analysis.function.Exp; import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; @@ -46,7 +44,6 @@ import java.net.NoRouteToHostException; import java.net.SocketTimeoutException; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -54,7 +51,6 @@ import java.util.Map; import java.util.UUID; -import static com.conveyal.r5.common.Util.human; import static com.conveyal.r5.common.Util.notNullOrEmpty; import static com.google.common.base.Preconditions.checkNotNull; @@ -351,12 +347,10 @@ private String getAllWorkers(Request request, Response response) { * modifies the contents of the task queue. */ private Object workerPoll (Request request, Response response) { - WorkerStatus workerStatus = objectFromRequestBody(request, WorkerStatus.class); - List perOriginResults = workerStatus.results; // Record any regional analysis results that were supplied by the worker and mark them completed. - for (RegionalWorkResult workResult : perOriginResults) { + for (RegionalWorkResult workResult : workerStatus.results) { broker.handleRegionalWorkResult(workResult); } // Clear out the results field so it's not visible in the worker list API endpoint. @@ -368,7 +362,7 @@ private Object workerPoll (Request request, Response response) { // See if any appropriate tasks exist for this worker. List tasks = broker.getSomeWork(workerCategory, workerStatus.maxTasksRequested); // If there is no work for the worker, signal this clearly with a "no content" code, - // so the worker can sleep a while before the next polling attempt. + // so the worker can sleep for a while before the next polling attempt. if (tasks.isEmpty()) { return jsonResponse(response, HttpStatus.NO_CONTENT_204, tasks); } else { diff --git a/src/main/java/com/conveyal/analysis/results/CsvResultWriter.java b/src/main/java/com/conveyal/analysis/results/CsvResultWriter.java index ff443df18..dc642f07f 100644 --- a/src/main/java/com/conveyal/analysis/results/CsvResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/CsvResultWriter.java @@ -1,6 +1,7 @@ package com.conveyal.analysis.results; import com.conveyal.file.FileCategory; +import com.conveyal.file.FileEntry; import com.conveyal.file.FileStorageKey; import com.conveyal.file.FileUtils; import com.conveyal.r5.analyst.cluster.RegionalTask; @@ -14,7 +15,6 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; -import java.util.Map; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; @@ -95,11 +95,11 @@ private CsvWriter getBufferedCsvWriter(String[] columnHeaders) { * Downloads through another channel (e.g. aws s3 cp), will need to be decompressed manually. */ @Override - public synchronized Map.Entry finish () throws IOException { + public synchronized FileEntry finish() throws IOException { csvWriter.close(); var gzippedFile = FileUtils.gzipFile(bufferFile); bufferFile.delete(); - return Map.entry(new FileStorageKey(FileCategory.RESULTS, getFileName()), gzippedFile); + return new FileEntry(new FileStorageKey(FileCategory.RESULTS, getFileName()), gzippedFile); } /** diff --git a/src/main/java/com/conveyal/analysis/results/GridResultWriter.java b/src/main/java/com/conveyal/analysis/results/GridResultWriter.java index 4971caf5e..b6d2f0f65 100644 --- a/src/main/java/com/conveyal/analysis/results/GridResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/GridResultWriter.java @@ -1,6 +1,7 @@ package com.conveyal.analysis.results; import com.conveyal.file.FileCategory; +import com.conveyal.file.FileEntry; import com.conveyal.file.FileStorageKey; import com.conveyal.file.FileUtils; import com.conveyal.r5.analyst.LittleEndianIntOutputStream; @@ -17,7 +18,6 @@ import java.nio.ByteOrder; import java.util.ArrayList; import java.util.List; -import java.util.Map; import static com.conveyal.r5.common.Util.human; @@ -151,11 +151,11 @@ public static List createWritersFromTask(String[] destinationP * Gzip the access grid and return the files. */ @Override - public synchronized Map.Entry finish () throws IOException { + public synchronized FileEntry finish() throws IOException { randomAccessFile.close(); var gzippedFile = FileUtils.gzipFile(bufferFile); bufferFile.delete(); - return Map.entry(new FileStorageKey(FileCategory.RESULTS, gridFileName), gzippedFile); + return new FileEntry(new FileStorageKey(FileCategory.RESULTS, gridFileName), gzippedFile); } /** diff --git a/src/main/java/com/conveyal/analysis/results/MultiOriginAssembler.java b/src/main/java/com/conveyal/analysis/results/MultiOriginAssembler.java index 3d0eeb4a3..8d6a2f805 100644 --- a/src/main/java/com/conveyal/analysis/results/MultiOriginAssembler.java +++ b/src/main/java/com/conveyal/analysis/results/MultiOriginAssembler.java @@ -1,19 +1,14 @@ package com.conveyal.analysis.results; import com.conveyal.analysis.components.broker.Job; -import com.conveyal.file.FileStorageKey; -import com.conveyal.r5.analyst.PointSet; -import com.conveyal.r5.analyst.cluster.PathResult; -import com.conveyal.r5.analyst.cluster.RegionalTask; +import com.conveyal.file.FileEntry; import com.conveyal.r5.analyst.cluster.RegionalWorkResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; +import java.util.ArrayList; import java.util.BitSet; -import java.util.HashMap; import java.util.List; -import java.util.Map; /** * This assembles regional results arriving from workers into one or more files per regional analysis on @@ -71,33 +66,36 @@ public MultiOriginAssembler (Job job, List resultWriters) * this class are also synchronized for good measure. There should be no additional cost to retaining the lock when * entering those methods. */ - public synchronized Map handleMessage (RegionalWorkResult workResult) throws Exception { - var resultFiles = new HashMap(); - for (RegionalResultWriter writer : resultWriters) { - writer.writeOneWorkResult(workResult); - } + public void handleMessage(RegionalWorkResult workResult) throws Exception { // Don't double-count origins if we receive them more than once. Atomic get-and-increment requires // synchronization, currently achieved by synchronizing this entire method. if (!originsReceived.get(workResult.taskId)) { originsReceived.set(workResult.taskId); nComplete += 1; + + for (RegionalResultWriter writer : resultWriters) { + writer.writeOneWorkResult(workResult); + } } + } - // If finished, run finish on all the result writers. - if (nComplete == job.nTasksTotal) { - LOG.info("Finished receiving data for multi-origin analysis {}", job.jobId); - try { - for (RegionalResultWriter writer : resultWriters) { - var result = writer.finish(); - resultFiles.put(result.getKey(), result.getValue()); - } - } catch (Exception e) { - LOG.error("Error uploading results of multi-origin analysis {}", job.jobId, e); + public List finish() { + var resultFiles = new ArrayList(resultWriters.size()); + LOG.info("Finished receiving data for multi-origin analysis {}", job.jobId); + try { + for (RegionalResultWriter writer : resultWriters) { + resultFiles.add(writer.finish()); } + } catch (Exception e) { + LOG.error("Error uploading results of multi-origin analysis {}", job.jobId, e); } return resultFiles; } + public boolean isComplete() { + return nComplete == job.nTasksTotal; + } + /** Clean up and cancel this grid assembler, typically when a job is canceled while still being processed. */ public synchronized void terminate () throws Exception { for (RegionalResultWriter writer : resultWriters) { diff --git a/src/main/java/com/conveyal/analysis/results/RegionalResultWriter.java b/src/main/java/com/conveyal/analysis/results/RegionalResultWriter.java index a874589dd..6ace604a0 100644 --- a/src/main/java/com/conveyal/analysis/results/RegionalResultWriter.java +++ b/src/main/java/com/conveyal/analysis/results/RegionalResultWriter.java @@ -1,11 +1,8 @@ package com.conveyal.analysis.results; -import com.conveyal.file.FileStorageKey; +import com.conveyal.file.FileEntry; import com.conveyal.r5.analyst.cluster.RegionalWorkResult; -import java.io.File; -import java.util.Map; - /** * Common interface for classes that write regional results out to CSV or Grids on the backend. */ @@ -15,6 +12,5 @@ public interface RegionalResultWriter { void terminate () throws Exception; - Map.Entry finish () throws Exception; - + FileEntry finish() throws Exception; }