diff --git a/build.gradle b/build.gradle index 6452a06dc..2302cd8f7 100644 --- a/build.gradle +++ b/build.gradle @@ -14,6 +14,7 @@ java { toolchain { languageVersion.set(JavaLanguageVersion.of(21)) } + withSourcesJar() } jar { diff --git a/src/main/java/com/conveyal/r5/analyst/NetworkPreloader.java b/src/main/java/com/conveyal/r5/analyst/NetworkPreloader.java index 943ae7a8e..c3ee89aef 100644 --- a/src/main/java/com/conveyal/r5/analyst/NetworkPreloader.java +++ b/src/main/java/com/conveyal/r5/analyst/NetworkPreloader.java @@ -110,11 +110,13 @@ protected TransportNetwork buildValue(Key key) { // Get the set of points to which we are measuring travel time. Any smaller sub-grids created here will // reference the scenarioNetwork's built-in full-extent pointset, so can reuse its linkage. - // TODO handle multiple destination grids. + // FIXME handle multiple destination grids. if (key.destinationGridExtents == null) { // Special (and ideally temporary) case for regional freeform destinations, where there is no grid to link. // The null destinationGridExtents are created by the WebMercatorExtents#forPointsets else clause. + // FIXME there is no grid to link, but there are points and egress tables to make! + // see com.conveyal.r5.analyst.cluster.AnalysisWorkerTask.loadAndValidateDestinationPointSets return scenarioNetwork; } diff --git a/src/main/java/com/conveyal/r5/analyst/cluster/ScenarioCache.java b/src/main/java/com/conveyal/r5/analyst/cluster/ScenarioCache.java index d9e6789a8..d253e4b6b 100644 --- a/src/main/java/com/conveyal/r5/analyst/cluster/ScenarioCache.java +++ b/src/main/java/com/conveyal/r5/analyst/cluster/ScenarioCache.java @@ -26,14 +26,14 @@ * scenarios from the backend instead of from S3. * * TODO merge this with TransportNetworkCache#resolveScenario into a single multi-level mem/disk/s3 cache. - * Note that this cache is going to just grow indefinitely in size as a worker receives many iterations of the same - * scenario - that could be a memory leak. Again multi level caching could releive those worries. - * It's debatable whether we should be hanging on to scenarios passed with single point requests becuase they may never - * be used again. + * This cache grows in size without bound as a worker receives many iterations of the same scenario. + * This is technically a sort of memory leak for long-lived workers. Multi-level caching could relieve those worries. + * However, this cache stores only the Scenarios and Modifications, not any large egress tables or linkages. + * + * It's debatable whether we should be hanging on to scenarios passed with single point requests, + * because they may never be used again. * Should we just always require a single point task to be sent to the cluster before a regional? * That would not ensure the scenario was present on all workers though. - * - * Created by abyrd on 2018-10-29 */ public class ScenarioCache { @@ -44,7 +44,7 @@ public class ScenarioCache { public synchronized void storeScenario (Scenario scenario) { Scenario existingScenario = scenariosById.put(scenario.id, scenario); if (existingScenario != null) { - LOG.debug("Scenario cache already contained a this scenario."); + LOG.debug("Scenario cache already contained this scenario."); } } diff --git a/src/main/java/com/conveyal/r5/analyst/cluster/WorkerStatus.java b/src/main/java/com/conveyal/r5/analyst/cluster/WorkerStatus.java index 1fcd17a6e..9b6d54632 100644 --- a/src/main/java/com/conveyal/r5/analyst/cluster/WorkerStatus.java +++ b/src/main/java/com/conveyal/r5/analyst/cluster/WorkerStatus.java @@ -39,7 +39,6 @@ public class WorkerStatus { public String workerVersion; public String workerId; public Set networks = new HashSet<>(); - public Set scenarios = new HashSet<>(); public double secondsSinceLastPoll; public Map tasksPerMinuteByJobId; @JsonUnwrapped(prefix = "ec2") @@ -86,7 +85,6 @@ public WorkerStatus (AnalysisWorker worker) { // networks = worker.networkPreloader.transportNetworkCache.getLoadedNetworkIds(); // For now we report a single network, even before it's loaded. networks = Sets.newHashSet(worker.networkId); - scenarios = worker.networkPreloader.transportNetworkCache.getAppliedScenarios(); ec2 = worker.ec2info; OperatingSystemMXBean operatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean(); diff --git a/src/main/java/com/conveyal/r5/streets/StreetRouter.java b/src/main/java/com/conveyal/r5/streets/StreetRouter.java index fa3750e35..2fa4b24e5 100644 --- a/src/main/java/com/conveyal/r5/streets/StreetRouter.java +++ b/src/main/java/com/conveyal/r5/streets/StreetRouter.java @@ -742,7 +742,7 @@ public int getTravelTimeToVertex (int vertexIndex) { * fragments from the vertices at either end of the edge up to the destination split point. * If no states can be produced return null. * - * Note that this is only used by the point to point street router, not by LinkedPointSets (which have equivalent + * NOTE that this is ONLY USED BY the point to point street router, NOT BY LinkedPointSets (which have equivalent * logic in their eval method). The PointSet implementation only needs to produce times, not States. But ideally * some common logic can be factored out. */ diff --git a/src/main/java/com/conveyal/r5/transit/TransportNetwork.java b/src/main/java/com/conveyal/r5/transit/TransportNetwork.java index 3e3cb3720..bbf4f8f70 100644 --- a/src/main/java/com/conveyal/r5/transit/TransportNetwork.java +++ b/src/main/java/com/conveyal/r5/transit/TransportNetwork.java @@ -53,14 +53,10 @@ public class TransportNetwork implements Serializable { public TransitLayer transitLayer; /** - * This stores any number of lightweight scenario networks built upon the current base network. - * FIXME that sounds like a memory leak, should be a WeighingCache or at least size-limited. - * A single network cache at the top level could store base networks and scenarios since they all have globally - * unique IDs. A hierarchical cache does have the advantage of evicting all the scenarios with the associated - * base network, which keeps the references in the scenarios from holding on to the base network. But considering - * that we have never started evicting networks (other than for a "cache" of one element) this might be getting - * ahead of ourselves. + * This field is no longer used. It has been moved to TransportNetworkCache, but this one remains for now, to + * avoid any inadvertent incompatibilities with serialized network files or serialization library settings. */ + @Deprecated public transient Map scenarios = new HashMap<>(); /** diff --git a/src/main/java/com/conveyal/r5/transit/TransportNetworkCache.java b/src/main/java/com/conveyal/r5/transit/TransportNetworkCache.java index b7f1f1d8f..1c512640b 100644 --- a/src/main/java/com/conveyal/r5/transit/TransportNetworkCache.java +++ b/src/main/java/com/conveyal/r5/transit/TransportNetworkCache.java @@ -29,11 +29,8 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; -import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Set; -import java.util.stream.Collectors; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; @@ -53,10 +50,20 @@ public class TransportNetworkCache implements Component { private static final Logger LOG = LoggerFactory.getLogger(TransportNetworkCache.class); /** Cache size is currently limited to one, i.e. the worker holds on to only one network at a time. */ - private static final int DEFAULT_CACHE_SIZE = 1; + private static final int MAX_CACHED_NETWORKS = 1; + + /** + * It might seem sufficient to hold only two scenarios (for single point scenario comparison). But in certain cases + * (e.g. the regional task queue is bigger than the size of each queued regional job) we might end up working on + * a mix of tasks from N different scenarios. Note also that scenarios hold references to their base networks, so + * caching multiple scenario networks can theoretically keep just as many TransportNetworks in memory. + * But in practice, in non-local (cloud) operation a given worker instance is locked to a single network for its + * entire lifespan. + */ + public static final int MAX_CACHED_SCENARIO_NETWORKS = 10; // TODO change all other caches from Guava to Caffeine caches. This one is already a Caffeine cache. - private final LoadingCache cache; + private final LoadingCache networkCache; private final FileStorage fileStorage; private final GTFSCache gtfsCache; @@ -64,15 +71,39 @@ public class TransportNetworkCache implements Component { /** * A table of already seen scenarios, avoiding downloading them repeatedly from S3 and allowing us to replace - * scenarios with only their IDs, and reverse that replacement later. + * scenarios with only their IDs, and reverse that replacement later. Note that this caches the Scenario objects + * themselves, not the TransportNetworks built from those Scenarios. */ private final ScenarioCache scenarioCache = new ScenarioCache(); + /** + * This record type is used for the private, encapsulated cache of TransportNetworks for different scenarios. + * Scenario IDs are unique so we could look up these networks by scenario ID alone. However the cache values need + * to be derived entirely from the cache keys. We need some way to look up the base network so we include its ID. + */ + private record BaseAndScenarioId (String baseNetworkId, String scenarioId) { } + + /** + * This stores a number of lightweight scenario networks built upon the current base network. + * Each scenario TransportNetwork has its own LinkageCache, containing LinkedPointSets that each have their own + * EgressCostTable. In practice this can exhaust memory, e.g. after using bicycle egress for about 50 scenarios. + * The previous hierarchical arrangement of caches has the advantage of evicting all the scenarios with the + * associated base network, which keeps the references in the scenarios from holding on to the base network. + * But considering that we have never started evicting networks (other than for a "cache" of one element) this + * eviction can be handled in other ways. + */ + private LoadingCache scenarioNetworkCache; + /** Create a transport network cache. If source bucket is null, will work offline. */ public TransportNetworkCache (FileStorage fileStorage, GTFSCache gtfsCache, OSMCache osmCache) { this.osmCache = osmCache; this.gtfsCache = gtfsCache; - this.cache = createCache(DEFAULT_CACHE_SIZE); + this.networkCache = Caffeine.newBuilder() + .maximumSize(MAX_CACHED_NETWORKS) + .build(this::loadNetwork); + this.scenarioNetworkCache = Caffeine.newBuilder() + .maximumSize(MAX_CACHED_SCENARIO_NETWORKS) + .build(this::loadScenario); this.fileStorage = fileStorage; } @@ -80,10 +111,9 @@ public TransportNetworkCache (FileStorage fileStorage, GTFSCache gtfsCache, OSMC * Find a transport network by ID, building or loading as needed from pre-existing OSM, GTFS, MapDB, or Kryo files. * This should never return null. If a TransportNetwork can't be built or loaded, an exception will be thrown. */ - public synchronized @Nonnull - TransportNetwork getNetwork (String networkId) throws TransportNetworkException { + public TransportNetwork getNetwork (String networkId) throws TransportNetworkException { try { - return cache.get(networkId); + return networkCache.get(networkId); } catch (Exception e) { throw new TransportNetworkException("Could not load TransportNetwork into cache. ", e); } @@ -107,43 +137,35 @@ public void rememberScenario (Scenario scenario) { * base graphs). Therefore we can look up cached scenario networks based solely on their scenarioId rather than a * compound key of (networkId, scenarioId). * - * The fact that scenario networks are cached means that PointSet linkages will be automatically reused. + * Reusing scenario networks automatically leads to reuse of the associated PointSet linkages and egress tables. * TODO it seems to me that this method should just take a Scenario as its second parameter, and that resolving * the scenario against caches on S3 or local disk should be pulled out into a separate function. * The problem is that then you resolve the scenario every time, even when the ID is enough to look up the already * built network. So we need to pass the whole task in here, so either the ID or full scenario are visible. * - * Thread safety notes: This entire method is synchronized so access by multiple threads will be sequential. - * The first thread will have a chance to build and store the requested scenario before any others see it. - * This means each new scenario will be applied one after the other. This is probably OK as long as building egress - * tables is already parallelized. + * Thread safety: getNetwork and getNetworkForScenario are threadsafe caches, so access to the same key by multiple + * threads will occur sequentially without repeatedly or simultaneously performing the same loading actions. + * Javadoc on the Caffeine LoadingCache indicates that it will throw exceptions when the cache loader method throws + * them, without establishing a mapping in the cache. So exceptions occurring during scenario application are + * expected to bubble up unimpeded. */ - public synchronized TransportNetwork getNetworkForScenario (String networkId, String scenarioId) { - // If the networkId is different than previous calls, a new network will be loaded. Its transient nested map - // of scenarios will be empty at first. This ensures it's initialized if null. - // FIXME apparently this can't happen - the field is transient and initialized in TransportNetwork. - TransportNetwork baseNetwork = this.getNetwork(networkId); - if (baseNetwork.scenarios == null) { - baseNetwork.scenarios = new HashMap<>(); - } + public TransportNetwork getNetworkForScenario (String networkId, String scenarioId) { + TransportNetwork scenarioNetwork = scenarioNetworkCache.get(new BaseAndScenarioId(networkId, scenarioId)); + return scenarioNetwork; + } - TransportNetwork scenarioNetwork = baseNetwork.scenarios.get(scenarioId); - if (scenarioNetwork == null) { - // The network for this scenario was not found in the cache. Create that scenario network and cache it. - LOG.debug("Applying scenario to base network..."); - // Fetch the full scenario if an ID was specified. - Scenario scenario = resolveScenario(networkId, scenarioId); - // Apply any scenario modifications to the network before use, performing protective copies where necessary. - // We used to prepend a filter to the scenario, removing trips that are not running during the search time window. - // However, because we are caching transportNetworks with scenarios already applied to them, we can’t use - // the InactiveTripsFilter. The solution may be to cache linked point sets based on scenario ID but always - // apply scenarios every time. - scenarioNetwork = scenario.applyToTransportNetwork(baseNetwork); - LOG.debug("Done applying scenario. Caching the resulting network."); - baseNetwork.scenarios.put(scenario.id, scenarioNetwork); - } else { - LOG.debug("Reusing cached TransportNetwork for scenario {}.", scenarioId); - } + private TransportNetwork loadScenario (BaseAndScenarioId ids) { + TransportNetwork baseNetwork = this.getNetwork(ids.baseNetworkId()); + LOG.debug("Scenario TransportNetwork not found. Applying scenario to base network and caching it."); + // Fetch the full scenario if an ID was specified. + Scenario scenario = resolveScenario(ids.baseNetworkId(), ids.scenarioId()); + // Apply any scenario modifications to the network before use, performing protective copies where necessary. + // We used to prepend a filter to the scenario, removing trips that are not running during the search time window. + // However, because we are caching transportNetworks with scenarios already applied to them, we can’t use + // the InactiveTripsFilter. The solution may be to cache linked point sets based on scenario ID but always + // apply scenarios every time. + TransportNetwork scenarioNetwork = scenario.applyToTransportNetwork(baseNetwork); + LOG.debug("Done applying scenario. Caching the resulting network."); return scenarioNetwork; } @@ -185,6 +207,7 @@ private TransportNetworkConfig loadNetworkConfig (String networkId) { // The switch to use JSON manifests instead of zips occurred in 32a1aebe in July 2016. // Over six years have passed, buildNetworkFromBundleZip is deprecated and could probably be removed. LOG.warn("No network config (aka manifest) found. Assuming old-format network inputs bundle stored as a single ZIP file."); + // FIXME Bundle ZIP building to reduce duplicate code. network = buildNetworkFromBundleZip(networkId); } else { network = buildNetworkFromConfig(networkConfig); @@ -355,12 +378,6 @@ private String getNetworkConfigFilename (String networkId) { return GTFSCache.cleanId(networkId) + ".json"; } - private LoadingCache createCache(int size) { - return Caffeine.newBuilder() - .maximumSize(size) - .build(this::loadNetwork); - } - /** * CacheLoader method, which should only be called by the LoadingCache. * Return the graph for the given unique identifier. Load pre-built serialized networks from local or remote @@ -392,28 +409,6 @@ private LoadingCache createCache(int size) { } } - /** - * This will eventually be used in WorkerStatus to report to the backend all loaded networks, to give it hints about - * what kind of tasks the worker is ready to work on immediately. This is made more complicated by the fact that - * workers are started up with no networks loaded, but with the intent for them to work on a particular job. So - * currently the workers just report which network they were started up for, and this method is not used. - * - * In the future, workers should just report an empty set of loaded networks, and the back end should strategically - * send them tasks when they come on line to assign them to networks as needed. But this will require a new - * mechanism to fairly allocate the workers to jobs. - */ - public Set getLoadedNetworkIds() { - return cache.asMap().keySet(); - } - - public Set getAppliedScenarios() { - return cache.asMap().values().stream() - .filter(network -> network.scenarios != null) - .map(network -> network.scenarios.keySet()) - .flatMap(Collection::stream) - .collect(Collectors.toSet()); - } - /** * Given a network and scenario ID, retrieve that scenario from the local disk cache (falling back on S3). */ diff --git a/src/main/java/com/conveyal/r5/transit/TripPattern.java b/src/main/java/com/conveyal/r5/transit/TripPattern.java index 7c7e08224..522fa8907 100644 --- a/src/main/java/com/conveyal/r5/transit/TripPattern.java +++ b/src/main/java/com/conveyal/r5/transit/TripPattern.java @@ -34,6 +34,7 @@ public class TripPattern implements Serializable, Cloneable { * ID in this transport network the ID would depend on the order of application of scenarios, and because this ID is * used to map results back to the original network. * TODO This concept of an "original" transport network may be obsolete, this field doesn't seem to be used anywhere. + * These are set to sequential integers: the index of the pattern in the TransitLayer's list of patterns. */ public int originalId;