Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Application-wide, size-limited cache for scenario networks #940

Merged
merged 6 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(21))
}
withSourcesJar()
}

jar {
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/com/conveyal/r5/analyst/NetworkPreloader.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,13 @@ protected TransportNetwork buildValue(Key key) {

// Get the set of points to which we are measuring travel time. Any smaller sub-grids created here will
// reference the scenarioNetwork's built-in full-extent pointset, so can reuse its linkage.
// TODO handle multiple destination grids.
// FIXME handle multiple destination grids.

if (key.destinationGridExtents == null) {
// Special (and ideally temporary) case for regional freeform destinations, where there is no grid to link.
// The null destinationGridExtents are created by the WebMercatorExtents#forPointsets else clause.
// FIXME there is no grid to link, but there are points and egress tables to make!
// see com.conveyal.r5.analyst.cluster.AnalysisWorkerTask.loadAndValidateDestinationPointSets
return scenarioNetwork;
}

Expand Down
14 changes: 7 additions & 7 deletions src/main/java/com/conveyal/r5/analyst/cluster/ScenarioCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
* scenarios from the backend instead of from S3.
*
* TODO merge this with TransportNetworkCache#resolveScenario into a single multi-level mem/disk/s3 cache.
* Note that this cache is going to just grow indefinitely in size as a worker receives many iterations of the same
* scenario - that could be a memory leak. Again multi level caching could releive those worries.
* It's debatable whether we should be hanging on to scenarios passed with single point requests becuase they may never
* be used again.
* This cache grows in size without bound as a worker receives many iterations of the same scenario.
* This is technically a sort of memory leak for long-lived workers. Multi-level caching could relieve those worries.
* However, this cache stores only the Scenarios and Modifications, not any large egress tables or linkages.
*
* It's debatable whether we should be hanging on to scenarios passed with single point requests,
* because they may never be used again.
* Should we just always require a single point task to be sent to the cluster before a regional?
* That would not ensure the scenario was present on all workers though.
*
* Created by abyrd on 2018-10-29
*/
public class ScenarioCache {

Expand All @@ -44,7 +44,7 @@ public class ScenarioCache {
public synchronized void storeScenario (Scenario scenario) {
Scenario existingScenario = scenariosById.put(scenario.id, scenario);
if (existingScenario != null) {
LOG.debug("Scenario cache already contained a this scenario.");
LOG.debug("Scenario cache already contained this scenario.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ public class WorkerStatus {
public String workerVersion;
public String workerId;
public Set<String> networks = new HashSet<>();
public Set<String> scenarios = new HashSet<>();
public double secondsSinceLastPoll;
public Map<String, Integer> tasksPerMinuteByJobId;
@JsonUnwrapped(prefix = "ec2")
Expand Down Expand Up @@ -86,7 +85,6 @@ public WorkerStatus (AnalysisWorker worker) {
// networks = worker.networkPreloader.transportNetworkCache.getLoadedNetworkIds();
// For now we report a single network, even before it's loaded.
networks = Sets.newHashSet(worker.networkId);
scenarios = worker.networkPreloader.transportNetworkCache.getAppliedScenarios();
ec2 = worker.ec2info;

OperatingSystemMXBean operatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean();
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/conveyal/r5/streets/StreetRouter.java
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ public int getTravelTimeToVertex (int vertexIndex) {
* fragments from the vertices at either end of the edge up to the destination split point.
* If no states can be produced return null.
*
* Note that this is only used by the point to point street router, not by LinkedPointSets (which have equivalent
* NOTE that this is ONLY USED BY the point to point street router, NOT BY LinkedPointSets (which have equivalent
* logic in their eval method). The PointSet implementation only needs to produce times, not States. But ideally
* some common logic can be factored out.
*/
Expand Down
10 changes: 3 additions & 7 deletions src/main/java/com/conveyal/r5/transit/TransportNetwork.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,10 @@ public class TransportNetwork implements Serializable {
public TransitLayer transitLayer;

/**
* This stores any number of lightweight scenario networks built upon the current base network.
* FIXME that sounds like a memory leak, should be a WeighingCache or at least size-limited.
* A single network cache at the top level could store base networks and scenarios since they all have globally
* unique IDs. A hierarchical cache does have the advantage of evicting all the scenarios with the associated
* base network, which keeps the references in the scenarios from holding on to the base network. But considering
* that we have never started evicting networks (other than for a "cache" of one element) this might be getting
* ahead of ourselves.
* This field is no longer used. It has been moved to TransportNetworkCache, but this one remains for now, to
* avoid any inadvertent incompatibilities with serialized network files or serialization library settings.
*/
@Deprecated
public transient Map<String, TransportNetwork> scenarios = new HashMap<>();

/**
Expand Down
131 changes: 63 additions & 68 deletions src/main/java/com/conveyal/r5/transit/TransportNetworkCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,8 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;

Expand All @@ -53,37 +50,70 @@ public class TransportNetworkCache implements Component {
private static final Logger LOG = LoggerFactory.getLogger(TransportNetworkCache.class);

/** Cache size is currently limited to one, i.e. the worker holds on to only one network at a time. */
private static final int DEFAULT_CACHE_SIZE = 1;
private static final int MAX_CACHED_NETWORKS = 1;

/**
* It might seem sufficient to hold only two scenarios (for single point scenario comparison). But in certain cases
* (e.g. the regional task queue is bigger than the size of each queued regional job) we might end up working on
* a mix of tasks from N different scenarios. Note also that scenarios hold references to their base networks, so
* caching multiple scenario networks can theoretically keep just as many TransportNetworks in memory.
* But in practice, in non-local (cloud) operation a given worker instance is locked to a single network for its
* entire lifespan.
*/
public static final int MAX_CACHED_SCENARIO_NETWORKS = 10;

// TODO change all other caches from Guava to Caffeine caches. This one is already a Caffeine cache.
private final LoadingCache<String, TransportNetwork> cache;
private final LoadingCache<String, TransportNetwork> networkCache;

private final FileStorage fileStorage;
private final GTFSCache gtfsCache;
private final OSMCache osmCache;

/**
* A table of already seen scenarios, avoiding downloading them repeatedly from S3 and allowing us to replace
* scenarios with only their IDs, and reverse that replacement later.
* scenarios with only their IDs, and reverse that replacement later. Note that this caches the Scenario objects
* themselves, not the TransportNetworks built from those Scenarios.
*/
private final ScenarioCache scenarioCache = new ScenarioCache();

/**
* This record type is used for the private, encapsulated cache of TransportNetworks for different scenarios.
* Scenario IDs are unique so we could look up these networks by scenario ID alone. However the cache values need
* to be derived entirely from the cache keys. We need some way to look up the base network so we include its ID.
*/
private record BaseAndScenarioId (String baseNetworkId, String scenarioId) { }

/**
* This stores a number of lightweight scenario networks built upon the current base network.
* Each scenario TransportNetwork has its own LinkageCache, containing LinkedPointSets that each have their own
* EgressCostTable. In practice this can exhaust memory, e.g. after using bicycle egress for about 50 scenarios.
* The previous hierarchical arrangement of caches has the advantage of evicting all the scenarios with the
* associated base network, which keeps the references in the scenarios from holding on to the base network.
* But considering that we have never started evicting networks (other than for a "cache" of one element) this
* eviction can be handled in other ways.
*/
private LoadingCache<BaseAndScenarioId, TransportNetwork> scenarioNetworkCache;

/** Create a transport network cache. If source bucket is null, will work offline. */
public TransportNetworkCache (FileStorage fileStorage, GTFSCache gtfsCache, OSMCache osmCache) {
this.osmCache = osmCache;
this.gtfsCache = gtfsCache;
this.cache = createCache(DEFAULT_CACHE_SIZE);
this.networkCache = Caffeine.newBuilder()
.maximumSize(MAX_CACHED_NETWORKS)
.build(this::loadNetwork);
this.scenarioNetworkCache = Caffeine.newBuilder()
.maximumSize(MAX_CACHED_SCENARIO_NETWORKS)
.build(this::loadScenario);
this.fileStorage = fileStorage;
}

/**
* Find a transport network by ID, building or loading as needed from pre-existing OSM, GTFS, MapDB, or Kryo files.
* This should never return null. If a TransportNetwork can't be built or loaded, an exception will be thrown.
*/
public synchronized @Nonnull
TransportNetwork getNetwork (String networkId) throws TransportNetworkException {
public TransportNetwork getNetwork (String networkId) throws TransportNetworkException {
try {
return cache.get(networkId);
return networkCache.get(networkId);
} catch (Exception e) {
throw new TransportNetworkException("Could not load TransportNetwork into cache. ", e);
}
Expand All @@ -107,43 +137,35 @@ public void rememberScenario (Scenario scenario) {
* base graphs). Therefore we can look up cached scenario networks based solely on their scenarioId rather than a
* compound key of (networkId, scenarioId).
*
* The fact that scenario networks are cached means that PointSet linkages will be automatically reused.
* Reusing scenario networks automatically leads to reuse of the associated PointSet linkages and egress tables.
* TODO it seems to me that this method should just take a Scenario as its second parameter, and that resolving
* the scenario against caches on S3 or local disk should be pulled out into a separate function.
* The problem is that then you resolve the scenario every time, even when the ID is enough to look up the already
* built network. So we need to pass the whole task in here, so either the ID or full scenario are visible.
*
* Thread safety notes: This entire method is synchronized so access by multiple threads will be sequential.
* The first thread will have a chance to build and store the requested scenario before any others see it.
* This means each new scenario will be applied one after the other. This is probably OK as long as building egress
* tables is already parallelized.
* Thread safety: getNetwork and getNetworkForScenario are threadsafe caches, so access to the same key by multiple
* threads will occur sequentially without repeatedly or simultaneously performing the same loading actions.
* Javadoc on the Caffeine LoadingCache indicates that it will throw exceptions when the cache loader method throws
* them, without establishing a mapping in the cache. So exceptions occurring during scenario application are
* expected to bubble up unimpeded.
*/
public synchronized TransportNetwork getNetworkForScenario (String networkId, String scenarioId) {
// If the networkId is different than previous calls, a new network will be loaded. Its transient nested map
// of scenarios will be empty at first. This ensures it's initialized if null.
// FIXME apparently this can't happen - the field is transient and initialized in TransportNetwork.
TransportNetwork baseNetwork = this.getNetwork(networkId);
if (baseNetwork.scenarios == null) {
baseNetwork.scenarios = new HashMap<>();
}
public TransportNetwork getNetworkForScenario (String networkId, String scenarioId) {
TransportNetwork scenarioNetwork = scenarioNetworkCache.get(new BaseAndScenarioId(networkId, scenarioId));
return scenarioNetwork;
}

TransportNetwork scenarioNetwork = baseNetwork.scenarios.get(scenarioId);
if (scenarioNetwork == null) {
// The network for this scenario was not found in the cache. Create that scenario network and cache it.
LOG.debug("Applying scenario to base network...");
// Fetch the full scenario if an ID was specified.
Scenario scenario = resolveScenario(networkId, scenarioId);
// Apply any scenario modifications to the network before use, performing protective copies where necessary.
// We used to prepend a filter to the scenario, removing trips that are not running during the search time window.
// However, because we are caching transportNetworks with scenarios already applied to them, we can’t use
// the InactiveTripsFilter. The solution may be to cache linked point sets based on scenario ID but always
// apply scenarios every time.
scenarioNetwork = scenario.applyToTransportNetwork(baseNetwork);
LOG.debug("Done applying scenario. Caching the resulting network.");
baseNetwork.scenarios.put(scenario.id, scenarioNetwork);
} else {
LOG.debug("Reusing cached TransportNetwork for scenario {}.", scenarioId);
}
private TransportNetwork loadScenario (BaseAndScenarioId ids) {
TransportNetwork baseNetwork = this.getNetwork(ids.baseNetworkId());
LOG.debug("Scenario TransportNetwork not found. Applying scenario to base network and caching it.");
// Fetch the full scenario if an ID was specified.
Scenario scenario = resolveScenario(ids.baseNetworkId(), ids.scenarioId());
// Apply any scenario modifications to the network before use, performing protective copies where necessary.
// We used to prepend a filter to the scenario, removing trips that are not running during the search time window.
// However, because we are caching transportNetworks with scenarios already applied to them, we can’t use
// the InactiveTripsFilter. The solution may be to cache linked point sets based on scenario ID but always
// apply scenarios every time.
TransportNetwork scenarioNetwork = scenario.applyToTransportNetwork(baseNetwork);
LOG.debug("Done applying scenario. Caching the resulting network.");
return scenarioNetwork;
}

Expand Down Expand Up @@ -185,6 +207,7 @@ private TransportNetworkConfig loadNetworkConfig (String networkId) {
// The switch to use JSON manifests instead of zips occurred in 32a1aebe in July 2016.
// Over six years have passed, buildNetworkFromBundleZip is deprecated and could probably be removed.
LOG.warn("No network config (aka manifest) found. Assuming old-format network inputs bundle stored as a single ZIP file.");
// FIXME Bundle ZIP building to reduce duplicate code.
network = buildNetworkFromBundleZip(networkId);
} else {
network = buildNetworkFromConfig(networkConfig);
Expand Down Expand Up @@ -355,12 +378,6 @@ private String getNetworkConfigFilename (String networkId) {
return GTFSCache.cleanId(networkId) + ".json";
}

private LoadingCache createCache(int size) {
return Caffeine.newBuilder()
.maximumSize(size)
.build(this::loadNetwork);
}

/**
* CacheLoader method, which should only be called by the LoadingCache.
* Return the graph for the given unique identifier. Load pre-built serialized networks from local or remote
Expand Down Expand Up @@ -392,28 +409,6 @@ private LoadingCache createCache(int size) {
}
}

/**
* This will eventually be used in WorkerStatus to report to the backend all loaded networks, to give it hints about
* what kind of tasks the worker is ready to work on immediately. This is made more complicated by the fact that
* workers are started up with no networks loaded, but with the intent for them to work on a particular job. So
* currently the workers just report which network they were started up for, and this method is not used.
*
* In the future, workers should just report an empty set of loaded networks, and the back end should strategically
* send them tasks when they come on line to assign them to networks as needed. But this will require a new
* mechanism to fairly allocate the workers to jobs.
*/
public Set<String> getLoadedNetworkIds() {
return cache.asMap().keySet();
}

public Set<String> getAppliedScenarios() {
return cache.asMap().values().stream()
.filter(network -> network.scenarios != null)
.map(network -> network.scenarios.keySet())
.flatMap(Collection::stream)
.collect(Collectors.toSet());
}

/**
* Given a network and scenario ID, retrieve that scenario from the local disk cache (falling back on S3).
*/
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/conveyal/r5/transit/TripPattern.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class TripPattern implements Serializable, Cloneable {
* ID in this transport network the ID would depend on the order of application of scenarios, and because this ID is
* used to map results back to the original network.
* TODO This concept of an "original" transport network may be obsolete, this field doesn't seem to be used anywhere.
* These are set to sequential integers: the index of the pattern in the TransitLayer's list of patterns.
*/
public int originalId;

Expand Down
Loading