Skip to content

Commit

Permalink
Use FileEntry record instead of Map.Entry
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorgerhardt committed Apr 8, 2024
1 parent a8e06c9 commit f6c409e
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 51 deletions.
18 changes: 10 additions & 8 deletions src/main/java/com/conveyal/analysis/components/broker/Broker.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public interface Config {
public TObjectLongMap<WorkerCategory> recentlyRequestedWorkers =
TCollections.synchronizedMap(new TObjectLongHashMap<>());

public Broker (Config config, FileStorage fileStorage, EventBus eventBus, WorkerLauncher workerLauncher) {
public Broker(Config config, FileStorage fileStorage, EventBus eventBus, WorkerLauncher workerLauncher) {
this.config = config;
this.fileStorage = fileStorage;
this.eventBus = eventBus;
Expand Down Expand Up @@ -345,9 +345,7 @@ public synchronized Collection<JobStatus> getAllJobStatuses () {
TObjectIntMap<String> workersPerJob = workerCatalog.activeWorkersPerJob();
Collection<JobStatus> jobStatuses = new ArrayList<>();
for (Job job : jobs.values()) {
JobStatus jobStatus = new JobStatus(job);
jobStatus.activeWorkers = workersPerJob.get(job.jobId);
jobStatuses.add(jobStatus);
jobStatuses.add(new JobStatus(job, workersPerJob.get(job.jobId)));
}
return jobStatuses;
}
Expand Down Expand Up @@ -451,15 +449,19 @@ public void handleRegionalWorkResult(RegionalWorkResult workResult) {
// results from spurious redeliveries, before the assembler is busy finalizing and uploading results.
markTaskCompleted(job, workResult.taskId);
}

// Unlike everything above, result assembly (like starting workers below) does not synchronize on the broker.
// It contains some slow nested operations to move completed results into storage. Really we should not do
// these things synchronously in an HTTP handler called by the worker. We should probably synchronize this
// entire method, then somehow enqueue slower async completion and cleanup tasks in the caller.
var resultFiles = assembler.handleMessage(workResult);
assembler.handleMessage(workResult);

// Store all result files permanently.
for (var resultFile : resultFiles.entrySet()) {
fileStorage.moveIntoStorage(resultFile.getKey(), resultFile.getValue());
if (assembler.isComplete()) {
var resultFiles = assembler.finish();
// Store all result files permanently.
for (var resultFile : resultFiles) {
fileStorage.moveIntoStorage(resultFile.key(), resultFile.file());
}
}
} catch (Throwable t) {
recordJobError(job, ExceptionUtils.stackTraceString(t));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class JobStatus {
public JobStatus () { /* do nothing */ }

/** Summarize the given job to return its status over the REST API. */
public JobStatus (Job job) {
public JobStatus(Job job, int activeWorkers) {
this.jobId = job.jobId;
this.graphId = job.workerCategory.graphId;
this.workerCommit = job.workerCategory.workerVersion;
Expand All @@ -56,5 +56,6 @@ public JobStatus (Job job) {
this.deliveries = job.nTasksDelivered;
this.deliveryPass = job.deliveryPass;
this.errors = job.errors;
this.activeWorkers = activeWorkers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import com.conveyal.analysis.persistence.Persistence;
import com.conveyal.analysis.util.HttpStatus;
import com.conveyal.analysis.util.JsonUtil;
import com.conveyal.gtfs.util.Util;
import com.conveyal.r5.analyst.WorkerCategory;
import com.conveyal.r5.analyst.cluster.AnalysisWorker;
import com.conveyal.r5.analyst.cluster.RegionalTask;
Expand All @@ -27,7 +26,6 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.mongodb.QueryBuilder;
import org.apache.commons.math3.analysis.function.Exp;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
Expand All @@ -46,15 +44,13 @@

import java.net.NoRouteToHostException;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static com.conveyal.r5.common.Util.human;
import static com.conveyal.r5.common.Util.notNullOrEmpty;
import static com.google.common.base.Preconditions.checkNotNull;

Expand Down Expand Up @@ -351,12 +347,10 @@ private String getAllWorkers(Request request, Response response) {
* modifies the contents of the task queue.
*/
private Object workerPoll (Request request, Response response) {

WorkerStatus workerStatus = objectFromRequestBody(request, WorkerStatus.class);
List<RegionalWorkResult> perOriginResults = workerStatus.results;

// Record any regional analysis results that were supplied by the worker and mark them completed.
for (RegionalWorkResult workResult : perOriginResults) {
for (RegionalWorkResult workResult : workerStatus.results) {
broker.handleRegionalWorkResult(workResult);
}
// Clear out the results field so it's not visible in the worker list API endpoint.
Expand All @@ -368,7 +362,7 @@ private Object workerPoll (Request request, Response response) {
// See if any appropriate tasks exist for this worker.
List<RegionalTask> tasks = broker.getSomeWork(workerCategory, workerStatus.maxTasksRequested);
// If there is no work for the worker, signal this clearly with a "no content" code,
// so the worker can sleep a while before the next polling attempt.
// so the worker can sleep for a while before the next polling attempt.
if (tasks.isEmpty()) {
return jsonResponse(response, HttpStatus.NO_CONTENT_204, tasks);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.conveyal.analysis.results;

import com.conveyal.file.FileCategory;
import com.conveyal.file.FileEntry;
import com.conveyal.file.FileStorageKey;
import com.conveyal.file.FileUtils;
import com.conveyal.r5.analyst.cluster.RegionalTask;
Expand All @@ -14,7 +15,6 @@
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -95,11 +95,11 @@ private CsvWriter getBufferedCsvWriter(String[] columnHeaders) {
* Downloads through another channel (e.g. aws s3 cp), will need to be decompressed manually.
*/
@Override
public synchronized Map.Entry<FileStorageKey, File> finish () throws IOException {
public synchronized FileEntry finish() throws IOException {
csvWriter.close();
var gzippedFile = FileUtils.gzipFile(bufferFile);
bufferFile.delete();
return Map.entry(new FileStorageKey(FileCategory.RESULTS, getFileName()), gzippedFile);
return new FileEntry(new FileStorageKey(FileCategory.RESULTS, getFileName()), gzippedFile);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.conveyal.analysis.results;

import com.conveyal.file.FileCategory;
import com.conveyal.file.FileEntry;
import com.conveyal.file.FileStorageKey;
import com.conveyal.file.FileUtils;
import com.conveyal.r5.analyst.LittleEndianIntOutputStream;
Expand All @@ -17,7 +18,6 @@
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static com.conveyal.r5.common.Util.human;

Expand Down Expand Up @@ -151,11 +151,11 @@ public static List<GridResultWriter> createWritersFromTask(String[] destinationP
* Gzip the access grid and return the files.
*/
@Override
public synchronized Map.Entry<FileStorageKey, File> finish () throws IOException {
public synchronized FileEntry finish() throws IOException {
randomAccessFile.close();
var gzippedFile = FileUtils.gzipFile(bufferFile);
bufferFile.delete();
return Map.entry(new FileStorageKey(FileCategory.RESULTS, gridFileName), gzippedFile);
return new FileEntry(new FileStorageKey(FileCategory.RESULTS, gridFileName), gzippedFile);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
package com.conveyal.analysis.results;

import com.conveyal.analysis.components.broker.Job;
import com.conveyal.file.FileStorageKey;
import com.conveyal.r5.analyst.PointSet;
import com.conveyal.r5.analyst.cluster.PathResult;
import com.conveyal.r5.analyst.cluster.RegionalTask;
import com.conveyal.file.FileEntry;
import com.conveyal.r5.analyst.cluster.RegionalWorkResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* This assembles regional results arriving from workers into one or more files per regional analysis on
Expand Down Expand Up @@ -71,33 +66,36 @@ public MultiOriginAssembler (Job job, List<RegionalResultWriter> resultWriters)
* this class are also synchronized for good measure. There should be no additional cost to retaining the lock when
* entering those methods.
*/
public synchronized Map<FileStorageKey, File> handleMessage (RegionalWorkResult workResult) throws Exception {
var resultFiles = new HashMap<FileStorageKey, File>();
for (RegionalResultWriter writer : resultWriters) {
writer.writeOneWorkResult(workResult);
}
public void handleMessage(RegionalWorkResult workResult) throws Exception {
// Don't double-count origins if we receive them more than once. Atomic get-and-increment requires
// synchronization, currently achieved by synchronizing this entire method.
if (!originsReceived.get(workResult.taskId)) {
originsReceived.set(workResult.taskId);
nComplete += 1;

for (RegionalResultWriter writer : resultWriters) {
writer.writeOneWorkResult(workResult);
}
}
}

// If finished, run finish on all the result writers.
if (nComplete == job.nTasksTotal) {
LOG.info("Finished receiving data for multi-origin analysis {}", job.jobId);
try {
for (RegionalResultWriter writer : resultWriters) {
var result = writer.finish();
resultFiles.put(result.getKey(), result.getValue());
}
} catch (Exception e) {
LOG.error("Error uploading results of multi-origin analysis {}", job.jobId, e);
public List<FileEntry> finish() {
var resultFiles = new ArrayList<FileEntry>(resultWriters.size());
LOG.info("Finished receiving data for multi-origin analysis {}", job.jobId);
try {
for (RegionalResultWriter writer : resultWriters) {
resultFiles.add(writer.finish());
}
} catch (Exception e) {
LOG.error("Error uploading results of multi-origin analysis {}", job.jobId, e);
}
return resultFiles;
}

public boolean isComplete() {
return nComplete == job.nTasksTotal;
}

/** Clean up and cancel this grid assembler, typically when a job is canceled while still being processed. */
public synchronized void terminate () throws Exception {
for (RegionalResultWriter writer : resultWriters) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package com.conveyal.analysis.results;

import com.conveyal.file.FileStorageKey;
import com.conveyal.file.FileEntry;
import com.conveyal.r5.analyst.cluster.RegionalWorkResult;

import java.io.File;
import java.util.Map;

/**
* Common interface for classes that write regional results out to CSV or Grids on the backend.
*/
Expand All @@ -15,6 +12,5 @@ public interface RegionalResultWriter {

void terminate () throws Exception;

Map.Entry<FileStorageKey, File> finish () throws Exception;

FileEntry finish() throws Exception;
}

0 comments on commit f6c409e

Please sign in to comment.