Skip to content

Commit

Permalink
Merge branch 'release/0.12.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
juliencruz committed May 27, 2015
2 parents d5a2838 + 0f0ab24 commit 9a69bcd
Show file tree
Hide file tree
Showing 30 changed files with 141 additions and 51 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

<groupId>streamflow</groupId>
<artifactId>streamflow</artifactId>
<version>0.11.0</version>
<version>0.12.0</version>
<packaging>pom</packaging>

<name>StreamFlow</name>
Expand Down
4 changes: 2 additions & 2 deletions streamflow-annotations/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
<parent>
<groupId>streamflow</groupId>
<artifactId>streamflow</artifactId>
<version>0.11.0</version>
<version>0.12.0</version>
</parent>

<artifactId>framework-annotations</artifactId>
<artifactId>streamflow-annotations</artifactId>
<name>Streamflow Annotations</name>
<description>Annotations for use with Streamflow Frameworks</description>

Expand Down
2 changes: 1 addition & 1 deletion streamflow-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>streamflow</groupId>
<artifactId>streamflow</artifactId>
<version>0.11.0</version>
<version>0.12.0</version>
</parent>

<artifactId>streamflow-core</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion streamflow-core/streamflow-app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>streamflow</groupId>
<artifactId>streamflow-core</artifactId>
<version>0.11.0</version>
<version>0.12.0</version>
</parent>

<artifactId>streamflow-app</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion streamflow-core/streamflow-app/streamflow-app-jar/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>streamflow</groupId>
<artifactId>streamflow-app</artifactId>
<version>0.11.0</version>
<version>0.12.0</version>
</parent>

<artifactId>streamflow-app-jar</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion streamflow-core/streamflow-app/streamflow-app-war/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>streamflow</groupId>
<artifactId>streamflow-app</artifactId>
<version>0.11.0</version>
<version>0.12.0</version>
</parent>

<artifactId>streamflow-app-war</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion streamflow-core/streamflow-datastore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>streamflow</groupId>
<artifactId>streamflow-core</artifactId>
<version>0.11.0</version>
<version>0.12.0</version>
</parent>

<artifactId>streamflow-datastore</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>streamflow</groupId>
<artifactId>streamflow-datastore</artifactId>
<version>0.11.0</version>
<version>0.12.0</version>
</parent>

<artifactId>streamflow-datastore-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>streamflow</groupId>
<artifactId>streamflow-datastore</artifactId>
<version>0.11.0</version>
<version>0.12.0</version>
</parent>

<artifactId>streamflow-datastore-jdbc</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>streamflow</groupId>
<artifactId>streamflow-datastore</artifactId>
<version>0.11.0</version>
<version>0.12.0</version>
</parent>

<artifactId>streamflow-datastore-mongodb</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion streamflow-core/streamflow-engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>streamflow</groupId>
<artifactId>streamflow-core</artifactId>
<version>0.11.0</version>
<version>0.12.0</version>
</parent>

<artifactId>streamflow-engine</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import backtype.storm.generated.NotAliveException;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -53,7 +54,7 @@ public class StormEngine {

protected static final Logger LOG = LoggerFactory.getLogger(StormEngine.class);

private final LocalCluster stormCluster;
private LocalCluster localCluster;

private final StreamflowConfig streamflowConfig;

Expand All @@ -62,8 +63,7 @@ public class StormEngine {
private static final int KILL_BUFFER_SECS = 60;

@Inject
public StormEngine(LocalCluster stormCluster, StreamflowConfig streamflowConfig) {
this.stormCluster = stormCluster;
public StormEngine(StreamflowConfig streamflowConfig) {
this.streamflowConfig = streamflowConfig;

// Add each of the clusters from the application configuration
Expand All @@ -72,17 +72,22 @@ public StormEngine(LocalCluster stormCluster, StreamflowConfig streamflowConfig)
clusters.put(cluster.getId(), cluster);
}
}
}

@Inject(optional=true)
public void setLocalCluster(@Named("LocalCluster") LocalCluster localCluster) {
this.localCluster = localCluster;

// Manually add the local cluster and add it to the cluster map
Cluster localCluster = new Cluster(
Cluster localClusterEntry = new Cluster(
Cluster.LOCAL, "Local", "localhost", 6627, "localhost", 9300, null);
clusters.put(localCluster.getId(), localCluster);
clusters.put(localClusterEntry.getId(), localClusterEntry);
}

public Topology submitTopology(Topology topology, Cluster cluster) {
// Execute topology submission in a thread to maintain separate context class loader for each topology
TopologySubmitter submitter = new TopologySubmitter(
topology, cluster, stormCluster, streamflowConfig);
topology, cluster, localCluster, streamflowConfig);
submitter.start();

try {
Expand All @@ -107,7 +112,7 @@ public boolean killTopology(Topology topology, int waitTimeSecs, boolean async)

if (isLocal(topology.getClusterId())) {
// Kill the topology on the local cluster
stormCluster.killTopologyWithOpts(topology.getId(), killOptions);
localCluster.killTopologyWithOpts(topology.getId(), killOptions);
} else {
Cluster cluster = clusters.get(topology.getClusterId());

Expand Down Expand Up @@ -142,7 +147,7 @@ public ClusterSummary getClusterSummary(Cluster cluster) {

if (cluster != null) {
if (isLocal(cluster.getId())) {
summary = stormCluster.getClusterInfo();
summary = localCluster.getClusterInfo();
} else {
TSocket tsocket = new TSocket(cluster.getNimbusHost(), cluster.getNimbusPort());
TFramedTransport tTransport = new TFramedTransport(tsocket);
Expand Down Expand Up @@ -228,8 +233,8 @@ public TopologyInfo getTopologyInfo(Topology topology) {
}

if (isLocal(topology.getClusterId())) {
info = stormCluster.getTopologyInfo(stormTopologyId);
topologyConf = stormCluster.getTopologyConf(stormTopologyId);
info = localCluster.getTopologyInfo(stormTopologyId);
topologyConf = localCluster.getTopologyConf(stormTopologyId);
} else {
Cluster cluster = clusters.get(topology.getClusterId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@

import backtype.storm.LocalCluster;
import com.google.inject.AbstractModule;
import com.google.inject.name.Names;
import streamflow.engine.StormEngine;
import streamflow.model.config.LocalClusterConfig;
import streamflow.util.config.ConfigLoader;

public class EngineModule extends AbstractModule {

@Override
protected void configure() {
bind(StormEngine.class);

// Would prefer to use provider, but newer interface does not provide eager initialization
bind(LocalCluster.class).toInstance(new LocalCluster());
LocalClusterConfig localClusterConfig = ConfigLoader.getConfig().getLocalCluster();
if (localClusterConfig != null && localClusterConfig.isEnabled()) {
bind(LocalCluster.class).annotatedWith(Names.named("LocalCluster")).toInstance(new LocalCluster());
}
}
}
2 changes: 1 addition & 1 deletion streamflow-core/streamflow-model/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>streamflow</groupId>
<artifactId>streamflow-core</artifactId>
<version>0.11.0</version>
<version>0.12.0</version>
</parent>

<artifactId>streamflow-model</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* Copyright 2014 Lockheed Martin Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package streamflow.model.config;

import java.io.Serializable;

public class LocalClusterConfig implements Serializable {

private boolean enabled = true;

public LocalClusterConfig() {
}

public boolean isEnabled() {
return Boolean.parseBoolean(System.getProperty("localCluster.enabled", Boolean.toString(enabled)));
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

@Override
public int hashCode() {
int hash = 3;
hash = 43 * hash + (this.enabled ? 1 : 0);
return hash;
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final LocalClusterConfig other = (LocalClusterConfig) obj;
if (this.enabled != other.enabled) {
return false;
}
return true;
}

@Override
public String toString() {
return "LocalClusterConfig{" + "enabled=" + enabled + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ public class StreamflowConfig implements Serializable {
private LoggerConfig logger = new LoggerConfig();

private AuthConfig auth = new AuthConfig();

private LocalClusterConfig localCluster = new LocalClusterConfig();

private List<Cluster> clusters = new ArrayList<Cluster>();
private List<Cluster> clusters = new ArrayList<>();

private Cluster selectedCluster;

Expand Down Expand Up @@ -82,6 +84,14 @@ public void setAuth(AuthConfig auth) {
this.auth = auth;
}

public LocalClusterConfig getLocalCluster() {
return localCluster;
}

public void setLocalCluster(LocalClusterConfig localCluster) {
this.localCluster = localCluster;
}

public List<Cluster> getClusters() {
return clusters;
}
Expand All @@ -106,6 +116,7 @@ public int hashCode() {
hash = 29 * hash + (this.datastore != null ? this.datastore.hashCode() : 0);
hash = 29 * hash + (this.logger != null ? this.logger.hashCode() : 0);
hash = 29 * hash + (this.auth != null ? this.auth.hashCode() : 0);
hash = 29 * hash + (this.localCluster != null ? this.localCluster.hashCode() : 0);
hash = 29 * hash + (this.clusters != null ? this.clusters.hashCode() : 0);
return hash;
}
Expand Down Expand Up @@ -139,6 +150,10 @@ public boolean equals(Object obj) {
|| !this.auth.equals(other.auth))) {
return false;
}
if (this.localCluster != other.localCluster && (this.localCluster == null
|| !this.localCluster.equals(other.localCluster))) {
return false;
}
if (this.clusters != other.clusters && (this.clusters == null
|| !this.clusters.equals(other.clusters))) {
return false;
Expand All @@ -150,6 +165,7 @@ public boolean equals(Object obj) {
public String toString() {
return "StreamFlowConfig{" + "server=" + server + ", proxy=" + proxy
+ ", datastore=" + datastore + ", logger=" + logger
+ ", auth=" + auth + ", clusters=" + clusters + '}';
+ ", auth=" + auth + ", localCluster=" + localCluster
+ ", clusters=" + clusters + '}';
}
}
2 changes: 1 addition & 1 deletion streamflow-core/streamflow-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>streamflow</groupId>
<artifactId>streamflow-core</artifactId>
<version>0.11.0</version>
<version>0.12.0</version>
</parent>

<artifactId>streamflow-server</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion streamflow-core/streamflow-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>streamflow</groupId>
<artifactId>streamflow-core</artifactId>
<version>0.11.0</version>
<version>0.12.0</version>
</parent>

<artifactId>streamflow-service</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class ClusterService {

private final StormEngine stormEngine;

private final Map<String, Cluster> clusters = new HashMap<String, Cluster>();
private final Map<String, Cluster> clusters = new HashMap<>();

@Inject
public ClusterService(StormEngine stormEngine, StreamflowConfig streamflowConfig) {
Expand All @@ -46,10 +46,12 @@ public ClusterService(StormEngine stormEngine, StreamflowConfig streamflowConfig
}
}

// Generate the local cluster and add it to the cluster map
Cluster localCluster = new Cluster(
Cluster.LOCAL, "Local", "localhost", 6627, "localhost", 9300, null);
clusters.put(localCluster.getId(), localCluster);
if (streamflowConfig.getLocalCluster().isEnabled()) {
// Generate the local cluster and add it to the cluster map
Cluster localCluster = new Cluster(
Cluster.LOCAL, "Local", "localhost", 6627, "localhost", 9300, null);
clusters.put(localCluster.getId(), localCluster);
}
}

public Collection<Cluster> listClusters() {
Expand Down
2 changes: 1 addition & 1 deletion streamflow-core/streamflow-util/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>streamflow</groupId>
<artifactId>streamflow-core</artifactId>
<version>0.11.0</version>
<version>0.12.0</version>
</parent>

<artifactId>streamflow-util</artifactId>
Expand Down
Loading

0 comments on commit 9a69bcd

Please sign in to comment.