Skip to content

Commit

Permalink
Merge branch 'InitializingSignal' into 'master'
Browse files Browse the repository at this point in the history
Initializing signal

new InitializingSignal
RunnableConsumer takes now 2 signals (Start and Init) instead of a single one

See merge request !35
  • Loading branch information
Christian Wulf committed May 6, 2015
2 parents 63aebed + 9f21618 commit 44bba84
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/main/java/teetime/framework/AbstractStage.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ public StageState getCurrentState() {
public void onSignal(final ISignal signal, final InputPort<?> inputPort) {
if (!this.signalAlreadyReceived(signal, inputPort)) {
signal.trigger(this);

for (OutputPort<?> outputPort : outputPorts) {
outputPort.sendSignal(signal);
}

}
}

Expand Down
11 changes: 5 additions & 6 deletions src/main/java/teetime/framework/Analysis.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import teetime.framework.exceptionHandling.AbstractExceptionListener;
import teetime.framework.exceptionHandling.IExceptionListenerFactory;
import teetime.framework.exceptionHandling.IgnoringExceptionListenerFactory;
import teetime.framework.signal.InitializingSignal;
import teetime.framework.signal.ValidatingSignal;
import teetime.framework.validation.AnalysisNotValidException;
import teetime.util.Pair;
Expand Down Expand Up @@ -140,18 +141,21 @@ private final void init() {
final RunnableProducerStage runnable = new RunnableProducerStage(stage);
thread = createThread(runnable, stage.getId());
this.finiteProducerThreads.add(thread);
InitializingSignal initializingSignal = new InitializingSignal();
stage.onSignal(initializingSignal, null);
break;
}
case BY_INTERRUPT: {
final RunnableProducerStage runnable = new RunnableProducerStage(stage);
thread = createThread(runnable, stage.getId());
InitializingSignal initializingSignal = new InitializingSignal();
stage.onSignal(initializingSignal, null);
this.infiniteProducerThreads.add(thread);
break;
}
default:
throw new IllegalStateException("Unhandled termination strategy: " + terminationStrategy);
}

final Set<Stage> intraStages = traverseIntraStages(stage);
final AbstractExceptionListener newListener = factory.createInstance();
initializeIntraStages(intraStages, thread, newListener);
Expand All @@ -170,11 +174,6 @@ private void initializeIntraStages(final Set<Stage> intraStages, final Thread th
for (Stage intraStage : intraStages) {
intraStage.setOwningThread(thread);
intraStage.setExceptionHandler(newListener);
try {
intraStage.onInitializing();
} catch (Exception e) { // NOPMD(generic framework catch)
throw new IllegalStateException("The following exception occurs within initializing the analysis:", e);
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/main/java/teetime/framework/RunnableConsumerStage.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ protected void beforeStageExecution(final Stage stage) throws InterruptedExcepti
for (InputPort<?> inputPort : inputPorts) {
inputPort.waitForStartSignal();
}
for (InputPort<?> inputPort : inputPorts) {
inputPort.waitForStartSignal();
}
logger.trace("Starting..." + stage);
}

Expand Down
47 changes: 47 additions & 0 deletions src/main/java/teetime/framework/signal/InitializingSignal.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Copyright (C) 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime.sourceforge.net)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.signal;

import java.util.LinkedList;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import teetime.framework.Stage;

public final class InitializingSignal implements ISignal {

private static final Logger LOGGER = LoggerFactory.getLogger(StartingSignal.class);
private final List<Exception> catchedExceptions = new LinkedList<Exception>();

public InitializingSignal() {}

@Override
public void trigger(final Stage stage) {
try {
stage.onInitializing();
} catch (Exception e) { // NOCS (Stages can throw any arbitrary Exception)
this.catchedExceptions.add(e);
LOGGER.error("Exception while sending the start signal", e);
}
}

public List<Exception> getCatchedExceptions() {
return this.catchedExceptions;
}

}
1 change: 0 additions & 1 deletion src/main/java/teetime/stage/MultipleInstanceOfFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public <T extends I> OutputPort<T> getOutputPortForType(final Class<T> clazz) {
@SuppressWarnings("unchecked")
public void onStarting() throws Exception {
super.onStarting();

// We cache the map to avoid the creating of iterators during runtime
cachedOutputPortsMap = (Entry<Class<? extends I>, OutputPort<? super I>>[]) outputPortsMap.entrySet().toArray(new Entry<?, ?>[outputPortsMap.size()]);
}
Expand Down
3 changes: 3 additions & 0 deletions src/test/java/teetime/stage/MultipleInstanceOfFilterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
package teetime.stage;

import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;

import java.util.ArrayList;
Expand All @@ -42,6 +44,7 @@ public void filteringForSingleTypeShouldWork() {

StageTester.test(filter).and().send(input).to(filter.getInputPort()).and().receive(result).from(filter.getOutputPortForType(String.class)).start();

assertThat(result, is(not(empty())));
assertThat(result, contains("1", "2", "3"));
}

Expand Down

0 comments on commit 44bba84

Please sign in to comment.