Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sparse horizon handler for vdf #276

Merged
merged 5 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class VDFConfigGroup extends ReflectiveConfigGroup {
private int writeFlowInterval = 0;

public enum HandlerType {
Horizon, Interpolation
Horizon, Interpolation, SparseHorizon
}

private HandlerType handler = HandlerType.Horizon;
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/org/eqasim/core/simulation/vdf/VDFModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.eqasim.core.simulation.mode_choice.AbstractEqasimExtension;
import org.eqasim.core.simulation.vdf.handlers.VDFHorizonHandler;
import org.eqasim.core.simulation.vdf.handlers.VDFInterpolationHandler;
import org.eqasim.core.simulation.vdf.handlers.VDFSparseHorizonHandler;
import org.eqasim.core.simulation.vdf.handlers.VDFTrafficHandler;
import org.eqasim.core.simulation.vdf.travel_time.VDFTravelTime;
import org.eqasim.core.simulation.vdf.travel_time.function.BPRFunction;
Expand Down Expand Up @@ -41,6 +42,10 @@ protected void installEqasimExtension() {
bind(VDFTrafficHandler.class).to(VDFHorizonHandler.class);
addEventHandlerBinding().to(VDFHorizonHandler.class);
break;
case SparseHorizon:
bind(VDFTrafficHandler.class).to(VDFSparseHorizonHandler.class);
addEventHandlerBinding().to(VDFSparseHorizonHandler.class);
break;
case Interpolation:
bind(VDFTrafficHandler.class).to(VDFInterpolationHandler.class);
addEventHandlerBinding().to(VDFInterpolationHandler.class);
Expand Down Expand Up @@ -85,6 +90,12 @@ public VDFHorizonHandler provideVDFHorizonHandler(VDFConfigGroup config, Network
return new VDFHorizonHandler(network, scope, config.getHorizon(), getConfig().global().getNumberOfThreads());
}

@Provides
@Singleton
public VDFSparseHorizonHandler provideVDFSparseHorizonHandler(VDFConfigGroup config, Network network, VDFScope scope) {
return new VDFSparseHorizonHandler(network, scope, config.getHorizon(), getConfig().global().getNumberOfThreads());
}

@Provides
@Singleton
public VDFInterpolationHandler provideVDFInterpolationHandler(VDFConfigGroup config, Network network,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
package org.eqasim.core.simulation.vdf.handlers;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eqasim.core.simulation.vdf.VDFScope;
import org.eqasim.core.simulation.vdf.io.VDFReaderInterface;
import org.eqasim.core.simulation.vdf.io.VDFWriterInterface;
import org.matsim.api.core.v01.Id;
import org.matsim.api.core.v01.IdMap;
import org.matsim.api.core.v01.events.LinkEnterEvent;
import org.matsim.api.core.v01.events.handler.LinkEnterEventHandler;
import org.matsim.api.core.v01.network.Link;
import org.matsim.api.core.v01.network.Network;
import org.matsim.core.utils.io.IOUtils;

import com.google.common.base.Verify;

public class VDFSparseHorizonHandler implements VDFTrafficHandler, LinkEnterEventHandler {
private final VDFScope scope;

private final Network network;
private final int horizon;
private final int numberOfThreads;

private final IdMap<Link, List<Double>> counts = new IdMap<>(Link.class);

private final static Logger logger = LogManager.getLogger(VDFSparseHorizonHandler.class);

private record LinkState(List<Integer> time, List<Double> count) {
}

private List<IdMap<Link, LinkState>> state = new LinkedList<>();

public VDFSparseHorizonHandler(Network network, VDFScope scope, int horizon, int numberOfThreads) {
this.scope = scope;
this.network = network;
this.horizon = horizon;
this.numberOfThreads = numberOfThreads;

for (Id<Link> linkId : network.getLinks().keySet()) {
counts.put(linkId, new ArrayList<>(Collections.nCopies(scope.getIntervals(), 0.0)));
}
}

@Override
public synchronized void handleEvent(LinkEnterEvent event) {
processEnterLink(event.getTime(), event.getLinkId());
}

public void processEnterLink(double time, Id<Link> linkId) {
int i = scope.getIntervalIndex(time);
double currentValue = counts.get(linkId).get(i);
counts.get(linkId).set(i, currentValue + 1);
}

@Override
public IdMap<Link, List<Double>> aggregate(boolean ignoreIteration) {
while (state.size() > horizon) {
state.remove(0);
}

logger.info(String.format("Starting aggregation of %d slices", state.size()));

// Transform counts into state object
if (!ignoreIteration) {
IdMap<Link, LinkState> newState = new IdMap<>(Link.class);
state.add(newState);

for (Map.Entry<Id<Link>, List<Double>> entry : counts.entrySet()) {
double total = 0.0;

for (double value : entry.getValue()) {
total += value;
}

if (total > 0.0) {
LinkState linkState = new LinkState(new ArrayList<>(), new ArrayList<>());
newState.put(entry.getKey(), linkState);

int timeIndex = 0;
for (double count : entry.getValue()) {
if (count > 0.0) {
linkState.time.add(timeIndex);
linkState.count.add(count);
}

timeIndex++;
}
}
}
}

IdMap<Link, List<Double>> aggregated = new IdMap<>(Link.class);

for (Id<Link> linkId : network.getLinks().keySet()) {
// Reset current counts
counts.put(linkId, new ArrayList<>(Collections.nCopies(scope.getIntervals(), 0.0)));

// Initialize aggregated counts
aggregated.put(linkId, new ArrayList<>(Collections.nCopies(scope.getIntervals(), 0.0)));
}

// Aggregate
Iterator<Id<Link>> linkIterator = network.getLinks().keySet().iterator();

Runnable worker = () -> {
Id<Link> currentLinkId = null;

while (true) {
// Fetch new link in queue
synchronized (linkIterator) {
if (linkIterator.hasNext()) {
currentLinkId = linkIterator.next();
} else {
break; // Done
}
}

// Go through history for this link and aggregate by time slot
for (int k = 0; k < state.size(); k++) {
LinkState historyItem = state.get(k).get(currentLinkId);
List<Double> linkAggregator = aggregated.get(currentLinkId);

if (historyItem != null) {
for (int i = 0; i < historyItem.count.size(); i++) {
int timeIndex = historyItem.time.get(i);
linkAggregator.set(timeIndex,
linkAggregator.get(timeIndex) + historyItem.count.get(i) / (double) state.size());
}
}
}
}
};

if (numberOfThreads < 2) {
worker.run();
} else {
List<Thread> threads = new ArrayList<>(numberOfThreads);

for (int k = 0; k < numberOfThreads; k++) {
threads.add(new Thread(worker));
}

for (int k = 0; k < numberOfThreads; k++) {
threads.get(k).start();
}

try {
for (int k = 0; k < numberOfThreads; k++) {
threads.get(k).join();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

logger.info(String.format(" Finished aggregation"));

return aggregated;
}

@Override
public VDFReaderInterface getReader() {
return new Reader();
}

@Override
public VDFWriterInterface getWriter() {
return new Writer();
}

public class Reader implements VDFReaderInterface {
@Override
public void readFile(URL inputFile) {
state.clear();

try {
DataInputStream inputStream = new DataInputStream(IOUtils.getInputStream(inputFile));

Verify.verify(inputStream.readDouble() == scope.getStartTime());
Verify.verify(inputStream.readDouble() == scope.getEndTime());
Verify.verify(inputStream.readDouble() == scope.getIntervalTime());
Verify.verify(inputStream.readInt() == scope.getIntervals());
Verify.verify(inputStream.readInt() == horizon);

int slices = (int) inputStream.readInt();
int links = (int) inputStream.readInt();

List<Id<Link>> linkIds = new ArrayList<>(links);
for (int linkIndex = 0; linkIndex < links; linkIndex++) {
linkIds.add(Id.createLinkId(inputStream.readUTF()));
}

logger.info(String.format("Loading %d slices with %d links", slices, links));

for (int sliceIndex = 0; sliceIndex < slices; sliceIndex++) {
IdMap<Link, LinkState> slice = new IdMap<>(Link.class);
state.add(slice);

int sliceLinkCount = inputStream.readInt();

logger.info(String.format("Slice %d/%d, Reading %d link states", sliceIndex+1, slices, sliceLinkCount));

for (int sliceLinkIndex = 0; sliceLinkIndex < sliceLinkCount; sliceLinkIndex++) {
int linkIndex = inputStream.readInt();
int linkStateSize = inputStream.readInt();

LinkState linkState = new LinkState(new ArrayList<>(linkStateSize),
new ArrayList<>(linkStateSize));
slice.put(linkIds.get(linkIndex), linkState);

for (int i = 0; i < linkStateSize; i++) {
linkState.time.add(inputStream.readInt());
linkState.count.add(inputStream.readDouble());
}
}

logger.info(String.format(" Slice %d: %d obs", sliceIndex,
sliceLinkCount));
}

Verify.verify(inputStream.available() == 0);
inputStream.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

public class Writer implements VDFWriterInterface {
@Override
public void writeFile(File outputFile) {
try {
DataOutputStream outputStream = new DataOutputStream(new FileOutputStream(outputFile.toString()));

outputStream.writeDouble(scope.getStartTime());
outputStream.writeDouble(scope.getEndTime());
outputStream.writeDouble(scope.getIntervalTime());
outputStream.writeInt(scope.getIntervals());
outputStream.writeInt(horizon);
outputStream.writeInt(state.size());
outputStream.writeInt(counts.size());

List<Id<Link>> linkIds = new ArrayList<>(counts.keySet());
for (int linkIndex = 0; linkIndex < linkIds.size(); linkIndex++) {
outputStream.writeUTF(linkIds.get(linkIndex).toString());
}

logger.info(String.format("About to write %d slices", state.size()));

for (int sliceIndex = 0; sliceIndex < state.size(); sliceIndex++) {
IdMap<Link, LinkState> slice = state.get(sliceIndex);
outputStream.writeInt(slice.size());

int sliceLinkIndex = 0;
for (Id<Link> linkId : linkIds) {
LinkState linkState = slice.get(linkId);
if(linkState == null) {
continue;
}
outputStream.writeInt(linkIds.indexOf(linkId));
outputStream.writeInt(linkState.count.size());

for (int i = 0; i < linkState.count.size(); i++) {
outputStream.writeInt(linkState.time.get(i));
outputStream.writeDouble(linkState.count.get(i));
}
sliceLinkIndex += 1;
}
assert sliceLinkIndex == slice.size();
}

outputStream.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.eqasim.core.simulation.vdf.utils;

import org.eqasim.core.components.config.EqasimConfigGroup;
import org.eqasim.core.simulation.EqasimConfigurator;
import org.eqasim.core.simulation.vdf.VDFConfigGroup;
import org.eqasim.core.simulation.vdf.engine.VDFEngineConfigGroup;
Expand All @@ -25,9 +24,8 @@ public static void adaptConfigForVDF(Config config, boolean engine) {
VDFConfigGroup.getOrCreate(config).setWriteInterval(1);
VDFConfigGroup.getOrCreate(config).setWriteFlowInterval(1);

// VDF: Set capacity factor instead (We retrieve it form the Eqasim config group)
EqasimConfigGroup eqasimConfigGroup = (EqasimConfigGroup) config.getModules().get(EqasimConfigGroup.GROUP_NAME);
VDFConfigGroup.getOrCreate(config).setCapacityFactor(eqasimConfigGroup.getSampleSize());
VDFConfigGroup vdfConfigGroup = VDFConfigGroup.getOrCreate(config);
vdfConfigGroup.setHandler(VDFConfigGroup.HandlerType.SparseHorizon);

if(engine) {
// VDF Engine: Add config group
Expand Down
18 changes: 17 additions & 1 deletion core/src/test/java/org/eqasim/TestSimulationPipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@

import com.google.inject.Inject;
import com.google.inject.Provider;
import org.matsim.utils.eventsfilecomparison.ComparisonResult;
import org.matsim.utils.eventsfilecomparison.EventsFileComparator;

public class TestSimulationPipeline {

Expand Down Expand Up @@ -377,6 +379,7 @@ public void testTransitWithAbstractAccess() throws CommandLine.ConfigurationExce
}

public void runVdf() throws CommandLine.ConfigurationException, IOException, InterruptedException {
// This one will use the SparseHorizon handler
AdaptConfigForVDF.main(new String[] {
"--input-config-path", "melun_test/input/config.xml",
"--output-config-path", "melun_test/input/config_vdf.xml",
Expand All @@ -387,6 +390,20 @@ public void runVdf() throws CommandLine.ConfigurationException, IOException, Int

runMelunSimulation("melun_test/input/config_vdf.xml", "melun_test/output_vdf");


// We force this one to use the legacy horizon handler
AdaptConfigForVDF.main(new String[] {
"--input-config-path", "melun_test/input/config.xml",
"--output-config-path", "melun_test/input/config_vdf_horizon.xml",
"--engine", "true",
"--config:eqasim:vdf_engine.generateNetworkEvents", "true",
"--config:eqasim:vdf.handler", "Horizon"
});

runMelunSimulation("melun_test/input/config_vdf_horizon.xml", "melun_test/output_vdf_horizon");

assert CRCChecksum.getCRCFromFile("melun_test/output_vdf_horizon/output_plans.xml.gz") == CRCChecksum.getCRCFromFile("melun_test/output_vdf/output_plans.xml.gz");

RunStandaloneModeChoice.main(new String[]{
"--config-path", "melun_test/input/config_vdf.xml",
"--config:standaloneModeChoice.outputDirectory", "melun_test/output_mode_choice_vdf",
Expand Down Expand Up @@ -425,7 +442,6 @@ public void testPipeline() throws Exception {

@Test
public void testBaseDeterminism() throws Exception {
Logger logger = LogManager.getLogger(TestSimulationPipeline.class);
Config config = ConfigUtils.loadConfig("melun_test/input/config.xml");
runMelunSimulation(config, "melun_test/output_determinism_1", null, 2);

Expand Down