Skip to content

Commit

Permalink
refactored Merger
Browse files Browse the repository at this point in the history
  • Loading branch information
Christian Wulf committed May 9, 2015
1 parent e1b774c commit 458c4d7
Showing 1 changed file with 2 additions and 34 deletions.
36 changes: 2 additions & 34 deletions src/main/java/teetime/stage/basic/merger/Merger.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,10 @@
public final class Merger<T> extends AbstractStage {

private final OutputPort<T> outputPort = this.createOutputPort();
private final Map<Class<? extends ISignal>, Set<InputPort<?>>> signalMap = new HashMap<Class<? extends ISignal>, Set<InputPort<?>>>();

private IMergerStrategy strategy;

private final Map<Class<ISignal>, Set<InputPort<?>>> signalMap = new HashMap<Class<ISignal>, Set<InputPort<?>>>();

public Merger() {
this(new RoundRobinStrategy());
}
Expand Down Expand Up @@ -73,7 +72,6 @@ public void executeStage() {
* @param inputPort
* The port which the signal was sent to
*/
@SuppressWarnings("unchecked")
@Override
public void onSignal(final ISignal signal, final InputPort<?> inputPort) {
if (logger.isTraceEnabled()) {
Expand All @@ -87,7 +85,7 @@ public void onSignal(final ISignal signal, final InputPort<?> inputPort) {
inputPorts = signalMap.get(signalClass);
} else {
inputPorts = new HashSet<InputPort<?>>();
signalMap.put((Class<ISignal>) signalClass, inputPorts);
signalMap.put(signalClass, inputPorts);
}

if (!inputPorts.add(inputPort)) {
Expand All @@ -97,38 +95,8 @@ public void onSignal(final ISignal signal, final InputPort<?> inputPort) {
if (signal.mayBeTriggered(inputPorts, getInputPorts())) {
super.onSignal(signal, inputPort);
}

// if (signalMap.containsKey(signalClass)) {
// Set<InputPort<?>> set = signalMap.get(signalClass);
// if (!set.add(inputPort)) {
// this.logger.warn("Received more than one signal - " + signal + " - from input port: " + inputPort);
// }
// if (signalMap.get(signalClass).size() == this.getInputPorts().length && signalClass == TerminatingSignal.class) {
// triggerAndPassOn(signal);
// // signalMap.remove(signalClass);
// }
// } else {
// Set<InputPort<?>> tempSet = new HashSet<InputPort<?>>();
// signalMap.put((Class<ISignal>) signalClass, tempSet);
// tempSet.add(inputPort);
// if (signalClass == InitializingSignal.class || signalClass == StartingSignal.class) {
// triggerAndPassOn(signal);
// }
// }

}

// private void triggerAndPassOn(final ISignal signal) {
// signal.trigger(this);
// sendSignalToOutputPorts(signal);
// }

// private void sendSignalToOutputPorts(final ISignal signal) {
// for (OutputPort<?> outputPort : getOutputPorts()) {
// outputPort.sendSignal(signal);
// }
// }

public IMergerStrategy getMergerStrategy() {
return this.strategy;
}
Expand Down

0 comments on commit 458c4d7

Please sign in to comment.