diff --git a/pom.xml b/pom.xml index 80ffe72..5bde54e 100644 --- a/pom.xml +++ b/pom.xml @@ -25,7 +25,7 @@ streamflow streamflow - 0.11.0 + 0.12.0 pom StreamFlow diff --git a/streamflow-annotations/pom.xml b/streamflow-annotations/pom.xml index fe61fb2..10127c7 100644 --- a/streamflow-annotations/pom.xml +++ b/streamflow-annotations/pom.xml @@ -20,10 +20,10 @@ streamflow streamflow - 0.11.0 + 0.12.0 - framework-annotations + streamflow-annotations Streamflow Annotations Annotations for use with Streamflow Frameworks diff --git a/streamflow-core/pom.xml b/streamflow-core/pom.xml index 4a84afe..4e53041 100644 --- a/streamflow-core/pom.xml +++ b/streamflow-core/pom.xml @@ -22,7 +22,7 @@ streamflow streamflow - 0.11.0 + 0.12.0 streamflow-core diff --git a/streamflow-core/streamflow-app/pom.xml b/streamflow-core/streamflow-app/pom.xml index fc32853..c30b548 100644 --- a/streamflow-core/streamflow-app/pom.xml +++ b/streamflow-core/streamflow-app/pom.xml @@ -22,7 +22,7 @@ streamflow streamflow-core - 0.11.0 + 0.12.0 streamflow-app diff --git a/streamflow-core/streamflow-app/streamflow-app-jar/pom.xml b/streamflow-core/streamflow-app/streamflow-app-jar/pom.xml index f7b444e..b5fe072 100644 --- a/streamflow-core/streamflow-app/streamflow-app-jar/pom.xml +++ b/streamflow-core/streamflow-app/streamflow-app-jar/pom.xml @@ -22,7 +22,7 @@ streamflow streamflow-app - 0.11.0 + 0.12.0 streamflow-app-jar diff --git a/streamflow-core/streamflow-app/streamflow-app-war/pom.xml b/streamflow-core/streamflow-app/streamflow-app-war/pom.xml index 9fd1aea..13b342c 100644 --- a/streamflow-core/streamflow-app/streamflow-app-war/pom.xml +++ b/streamflow-core/streamflow-app/streamflow-app-war/pom.xml @@ -22,7 +22,7 @@ streamflow streamflow-app - 0.11.0 + 0.12.0 streamflow-app-war diff --git a/streamflow-core/streamflow-datastore/pom.xml b/streamflow-core/streamflow-datastore/pom.xml index f005287..716e089 100644 --- a/streamflow-core/streamflow-datastore/pom.xml +++ b/streamflow-core/streamflow-datastore/pom.xml @@ -22,7 +22,7 @@ streamflow streamflow-core - 0.11.0 + 0.12.0 streamflow-datastore diff --git a/streamflow-core/streamflow-datastore/streamflow-datastore-core/pom.xml b/streamflow-core/streamflow-datastore/streamflow-datastore-core/pom.xml index 0b523f8..dba6b9b 100644 --- a/streamflow-core/streamflow-datastore/streamflow-datastore-core/pom.xml +++ b/streamflow-core/streamflow-datastore/streamflow-datastore-core/pom.xml @@ -22,7 +22,7 @@ streamflow streamflow-datastore - 0.11.0 + 0.12.0 streamflow-datastore-core diff --git a/streamflow-core/streamflow-datastore/streamflow-datastore-jdbc/pom.xml b/streamflow-core/streamflow-datastore/streamflow-datastore-jdbc/pom.xml index 6df55b1..0d695fe 100644 --- a/streamflow-core/streamflow-datastore/streamflow-datastore-jdbc/pom.xml +++ b/streamflow-core/streamflow-datastore/streamflow-datastore-jdbc/pom.xml @@ -22,7 +22,7 @@ streamflow streamflow-datastore - 0.11.0 + 0.12.0 streamflow-datastore-jdbc diff --git a/streamflow-core/streamflow-datastore/streamflow-datastore-mongodb/pom.xml b/streamflow-core/streamflow-datastore/streamflow-datastore-mongodb/pom.xml index 9f4dfda..15ce2bf 100644 --- a/streamflow-core/streamflow-datastore/streamflow-datastore-mongodb/pom.xml +++ b/streamflow-core/streamflow-datastore/streamflow-datastore-mongodb/pom.xml @@ -22,7 +22,7 @@ streamflow streamflow-datastore - 0.11.0 + 0.12.0 streamflow-datastore-mongodb diff --git a/streamflow-core/streamflow-engine/pom.xml b/streamflow-core/streamflow-engine/pom.xml index f8e366b..bf3f88d 100644 --- a/streamflow-core/streamflow-engine/pom.xml +++ b/streamflow-core/streamflow-engine/pom.xml @@ -22,7 +22,7 @@ streamflow streamflow-core - 0.11.0 + 0.12.0 streamflow-engine diff --git a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/StormEngine.java b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/StormEngine.java index f2b9d86..123ab04 100644 --- a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/StormEngine.java +++ b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/StormEngine.java @@ -21,6 +21,7 @@ import backtype.storm.generated.NotAliveException; import com.google.inject.Inject; import com.google.inject.Singleton; +import com.google.inject.name.Named; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -53,7 +54,7 @@ public class StormEngine { protected static final Logger LOG = LoggerFactory.getLogger(StormEngine.class); - private final LocalCluster stormCluster; + private LocalCluster localCluster; private final StreamflowConfig streamflowConfig; @@ -62,8 +63,7 @@ public class StormEngine { private static final int KILL_BUFFER_SECS = 60; @Inject - public StormEngine(LocalCluster stormCluster, StreamflowConfig streamflowConfig) { - this.stormCluster = stormCluster; + public StormEngine(StreamflowConfig streamflowConfig) { this.streamflowConfig = streamflowConfig; // Add each of the clusters from the application configuration @@ -72,17 +72,22 @@ public StormEngine(LocalCluster stormCluster, StreamflowConfig streamflowConfig) clusters.put(cluster.getId(), cluster); } } + } + + @Inject(optional=true) + public void setLocalCluster(@Named("LocalCluster") LocalCluster localCluster) { + this.localCluster = localCluster; // Manually add the local cluster and add it to the cluster map - Cluster localCluster = new Cluster( + Cluster localClusterEntry = new Cluster( Cluster.LOCAL, "Local", "localhost", 6627, "localhost", 9300, null); - clusters.put(localCluster.getId(), localCluster); + clusters.put(localClusterEntry.getId(), localClusterEntry); } public Topology submitTopology(Topology topology, Cluster cluster) { // Execute topology submission in a thread to maintain separate context class loader for each topology TopologySubmitter submitter = new TopologySubmitter( - topology, cluster, stormCluster, streamflowConfig); + topology, cluster, localCluster, streamflowConfig); submitter.start(); try { @@ -107,7 +112,7 @@ public boolean killTopology(Topology topology, int waitTimeSecs, boolean async) if (isLocal(topology.getClusterId())) { // Kill the topology on the local cluster - stormCluster.killTopologyWithOpts(topology.getId(), killOptions); + localCluster.killTopologyWithOpts(topology.getId(), killOptions); } else { Cluster cluster = clusters.get(topology.getClusterId()); @@ -142,7 +147,7 @@ public ClusterSummary getClusterSummary(Cluster cluster) { if (cluster != null) { if (isLocal(cluster.getId())) { - summary = stormCluster.getClusterInfo(); + summary = localCluster.getClusterInfo(); } else { TSocket tsocket = new TSocket(cluster.getNimbusHost(), cluster.getNimbusPort()); TFramedTransport tTransport = new TFramedTransport(tsocket); @@ -228,8 +233,8 @@ public TopologyInfo getTopologyInfo(Topology topology) { } if (isLocal(topology.getClusterId())) { - info = stormCluster.getTopologyInfo(stormTopologyId); - topologyConf = stormCluster.getTopologyConf(stormTopologyId); + info = localCluster.getTopologyInfo(stormTopologyId); + topologyConf = localCluster.getTopologyConf(stormTopologyId); } else { Cluster cluster = clusters.get(topology.getClusterId()); diff --git a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/config/EngineModule.java b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/config/EngineModule.java index d803034..26756ca 100644 --- a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/config/EngineModule.java +++ b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/config/EngineModule.java @@ -17,7 +17,10 @@ import backtype.storm.LocalCluster; import com.google.inject.AbstractModule; +import com.google.inject.name.Names; import streamflow.engine.StormEngine; +import streamflow.model.config.LocalClusterConfig; +import streamflow.util.config.ConfigLoader; public class EngineModule extends AbstractModule { @@ -25,7 +28,9 @@ public class EngineModule extends AbstractModule { protected void configure() { bind(StormEngine.class); - // Would prefer to use provider, but newer interface does not provide eager initialization - bind(LocalCluster.class).toInstance(new LocalCluster()); + LocalClusterConfig localClusterConfig = ConfigLoader.getConfig().getLocalCluster(); + if (localClusterConfig != null && localClusterConfig.isEnabled()) { + bind(LocalCluster.class).annotatedWith(Names.named("LocalCluster")).toInstance(new LocalCluster()); + } } } diff --git a/streamflow-core/streamflow-model/pom.xml b/streamflow-core/streamflow-model/pom.xml index 3b97f6b..6740d3a 100644 --- a/streamflow-core/streamflow-model/pom.xml +++ b/streamflow-core/streamflow-model/pom.xml @@ -22,7 +22,7 @@ streamflow streamflow-core - 0.11.0 + 0.12.0 streamflow-model diff --git a/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/LocalClusterConfig.java b/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/LocalClusterConfig.java new file mode 100644 index 0000000..a6819a7 --- /dev/null +++ b/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/LocalClusterConfig.java @@ -0,0 +1,61 @@ +/** + * Copyright 2014 Lockheed Martin Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package streamflow.model.config; + +import java.io.Serializable; + +public class LocalClusterConfig implements Serializable { + + private boolean enabled = true; + + public LocalClusterConfig() { + } + + public boolean isEnabled() { + return Boolean.parseBoolean(System.getProperty("localCluster.enabled", Boolean.toString(enabled))); + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + @Override + public int hashCode() { + int hash = 3; + hash = 43 * hash + (this.enabled ? 1 : 0); + return hash; + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final LocalClusterConfig other = (LocalClusterConfig) obj; + if (this.enabled != other.enabled) { + return false; + } + return true; + } + + @Override + public String toString() { + return "LocalClusterConfig{" + "enabled=" + enabled + '}'; + } +} diff --git a/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/StreamflowConfig.java b/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/StreamflowConfig.java index 4fd731f..9756c9d 100644 --- a/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/StreamflowConfig.java +++ b/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/StreamflowConfig.java @@ -34,8 +34,10 @@ public class StreamflowConfig implements Serializable { private LoggerConfig logger = new LoggerConfig(); private AuthConfig auth = new AuthConfig(); + + private LocalClusterConfig localCluster = new LocalClusterConfig(); - private List clusters = new ArrayList(); + private List clusters = new ArrayList<>(); private Cluster selectedCluster; @@ -82,6 +84,14 @@ public void setAuth(AuthConfig auth) { this.auth = auth; } + public LocalClusterConfig getLocalCluster() { + return localCluster; + } + + public void setLocalCluster(LocalClusterConfig localCluster) { + this.localCluster = localCluster; + } + public List getClusters() { return clusters; } @@ -106,6 +116,7 @@ public int hashCode() { hash = 29 * hash + (this.datastore != null ? this.datastore.hashCode() : 0); hash = 29 * hash + (this.logger != null ? this.logger.hashCode() : 0); hash = 29 * hash + (this.auth != null ? this.auth.hashCode() : 0); + hash = 29 * hash + (this.localCluster != null ? this.localCluster.hashCode() : 0); hash = 29 * hash + (this.clusters != null ? this.clusters.hashCode() : 0); return hash; } @@ -139,6 +150,10 @@ public boolean equals(Object obj) { || !this.auth.equals(other.auth))) { return false; } + if (this.localCluster != other.localCluster && (this.localCluster == null + || !this.localCluster.equals(other.localCluster))) { + return false; + } if (this.clusters != other.clusters && (this.clusters == null || !this.clusters.equals(other.clusters))) { return false; @@ -150,6 +165,7 @@ public boolean equals(Object obj) { public String toString() { return "StreamFlowConfig{" + "server=" + server + ", proxy=" + proxy + ", datastore=" + datastore + ", logger=" + logger - + ", auth=" + auth + ", clusters=" + clusters + '}'; + + ", auth=" + auth + ", localCluster=" + localCluster + + ", clusters=" + clusters + '}'; } } diff --git a/streamflow-core/streamflow-server/pom.xml b/streamflow-core/streamflow-server/pom.xml index 2830e16..d4ffd18 100644 --- a/streamflow-core/streamflow-server/pom.xml +++ b/streamflow-core/streamflow-server/pom.xml @@ -22,7 +22,7 @@ streamflow streamflow-core - 0.11.0 + 0.12.0 streamflow-server diff --git a/streamflow-core/streamflow-service/pom.xml b/streamflow-core/streamflow-service/pom.xml index fb7274c..b15d6d5 100644 --- a/streamflow-core/streamflow-service/pom.xml +++ b/streamflow-core/streamflow-service/pom.xml @@ -22,7 +22,7 @@ streamflow streamflow-core - 0.11.0 + 0.12.0 streamflow-service diff --git a/streamflow-core/streamflow-service/src/main/java/streamflow/service/ClusterService.java b/streamflow-core/streamflow-service/src/main/java/streamflow/service/ClusterService.java index 717f9fc..dc3aba6 100644 --- a/streamflow-core/streamflow-service/src/main/java/streamflow/service/ClusterService.java +++ b/streamflow-core/streamflow-service/src/main/java/streamflow/service/ClusterService.java @@ -33,7 +33,7 @@ public class ClusterService { private final StormEngine stormEngine; - private final Map clusters = new HashMap(); + private final Map clusters = new HashMap<>(); @Inject public ClusterService(StormEngine stormEngine, StreamflowConfig streamflowConfig) { @@ -46,10 +46,12 @@ public ClusterService(StormEngine stormEngine, StreamflowConfig streamflowConfig } } - // Generate the local cluster and add it to the cluster map - Cluster localCluster = new Cluster( - Cluster.LOCAL, "Local", "localhost", 6627, "localhost", 9300, null); - clusters.put(localCluster.getId(), localCluster); + if (streamflowConfig.getLocalCluster().isEnabled()) { + // Generate the local cluster and add it to the cluster map + Cluster localCluster = new Cluster( + Cluster.LOCAL, "Local", "localhost", 6627, "localhost", 9300, null); + clusters.put(localCluster.getId(), localCluster); + } } public Collection listClusters() { diff --git a/streamflow-core/streamflow-util/pom.xml b/streamflow-core/streamflow-util/pom.xml index 7f09a85..ce0500c 100644 --- a/streamflow-core/streamflow-util/pom.xml +++ b/streamflow-core/streamflow-util/pom.xml @@ -22,7 +22,7 @@ streamflow streamflow-core - 0.11.0 + 0.12.0 streamflow-util diff --git a/streamflow-core/streamflow-util/src/main/java/streamflow/util/config/ConfigModule.java b/streamflow-core/streamflow-util/src/main/java/streamflow/util/config/ConfigModule.java index b4f23bb..5feeb81 100644 --- a/streamflow-core/streamflow-util/src/main/java/streamflow/util/config/ConfigModule.java +++ b/streamflow-core/streamflow-util/src/main/java/streamflow/util/config/ConfigModule.java @@ -24,6 +24,7 @@ import streamflow.model.config.ServerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import streamflow.model.config.LocalClusterConfig; public class ConfigModule extends AbstractModule { @@ -38,6 +39,7 @@ protected void configure() { bind(AuthConfig.class).toInstance(streamflowConfig.getAuth()); bind(ProxyConfig.class).toInstance(streamflowConfig.getProxy()); bind(LoggerConfig.class).toInstance(streamflowConfig.getLogger()); + bind(LocalClusterConfig.class).toInstance(streamflowConfig.getLocalCluster()); bind(DatastoreConfig.class).toInstance(streamflowConfig.getDatastore()); } } diff --git a/streamflow-core/streamflow-util/src/test/resources/streamflow.yml b/streamflow-core/streamflow-util/src/test/resources/streamflow.yml index f16ca06..dc824ab 100644 --- a/streamflow-core/streamflow-util/src/test/resources/streamflow.yml +++ b/streamflow-core/streamflow-util/src/test/resources/streamflow.yml @@ -20,6 +20,9 @@ datastore: proxy: host: test.classpath.proxy port: 80 + +localCluster: + enabled: true # Cluster Configuration clusters: diff --git a/streamflow-dist/pom.xml b/streamflow-dist/pom.xml index 7900fa2..8152909 100644 --- a/streamflow-dist/pom.xml +++ b/streamflow-dist/pom.xml @@ -22,7 +22,7 @@ streamflow streamflow - 0.11.0 + 0.12.0 streamflow-dist diff --git a/streamflow-dist/src/main/assembly/common-bin.xml b/streamflow-dist/src/main/assembly/common-bin.xml index 9f43a2c..9e78bc4 100644 --- a/streamflow-dist/src/main/assembly/common-bin.xml +++ b/streamflow-dist/src/main/assembly/common-bin.xml @@ -26,14 +26,6 @@ streamflow:streamflow-app-jar - - /deploy - false - false - - streamflow:streamflow-app-war - - /sample false diff --git a/streamflow-dist/src/main/resources/bin/streamflow.bat b/streamflow-dist/src/main/resources/bin/streamflow.bat index 8240f9a..8926aee 100644 --- a/streamflow-dist/src/main/resources/bin/streamflow.bat +++ b/streamflow-dist/src/main/resources/bin/streamflow.bat @@ -3,7 +3,7 @@ SETLOCAL rem Change this value to modify any JAVA_OPTS provided to the Streamflow server -set STREAMFLOW_OPTS="-Xms512m -Xmx1g" +set STREAMFLOW_OPTS="-Xms256m -Xmx256g" if NOT DEFINED JAVA_HOME goto err diff --git a/streamflow-dist/src/main/resources/bin/streamflow.sh b/streamflow-dist/src/main/resources/bin/streamflow.sh index 09f29a4..3c262a2 100644 --- a/streamflow-dist/src/main/resources/bin/streamflow.sh +++ b/streamflow-dist/src/main/resources/bin/streamflow.sh @@ -1,7 +1,7 @@ #!/bin/bash # Change this value to modify any JAVA_OPTS provided to the Streamflow server -STREAMFLOW_OPTS="-Xms512m -Xmx1g" +STREAMFLOW_OPTS="-Xms256m -Xmx256m" SCRIPT="$0" diff --git a/streamflow-dist/src/main/resources/conf/streamflow.yml b/streamflow-dist/src/main/resources/conf/streamflow.yml index 7930117..f2b88ad 100644 --- a/streamflow-dist/src/main/resources/conf/streamflow.yml +++ b/streamflow-dist/src/main/resources/conf/streamflow.yml @@ -23,6 +23,10 @@ server: # Datastore configuration #datastore: # moduleClass: streamflow.datastore.jdbc.config.JDBCDatastoreModule + +# Local Cluster configuration +#localCluster: +# enabled: true # Cluster Configuration #clusters: diff --git a/streamflow-frameworks/core-framework/pom.xml b/streamflow-frameworks/core-framework/pom.xml index a5d140c..47def39 100644 --- a/streamflow-frameworks/core-framework/pom.xml +++ b/streamflow-frameworks/core-framework/pom.xml @@ -22,7 +22,7 @@ streamflow streamflow-frameworks - 0.11.0 + 0.12.0 core-framework @@ -86,7 +86,7 @@ streamflow - framework-annotations + streamflow-annotations ${project.version} diff --git a/streamflow-frameworks/pom.xml b/streamflow-frameworks/pom.xml index 519f0c7..6b7dda2 100644 --- a/streamflow-frameworks/pom.xml +++ b/streamflow-frameworks/pom.xml @@ -22,7 +22,7 @@ streamflow streamflow - 0.11.0 + 0.12.0 streamflow-frameworks diff --git a/streamflow-frameworks/twitter-framework/pom.xml b/streamflow-frameworks/twitter-framework/pom.xml index 8f22afc..48ba905 100644 --- a/streamflow-frameworks/twitter-framework/pom.xml +++ b/streamflow-frameworks/twitter-framework/pom.xml @@ -22,7 +22,7 @@ streamflow streamflow-frameworks - 0.11.0 + 0.12.0 twitter-framework @@ -87,7 +87,7 @@ streamflow - framework-annotations + streamflow-annotations ${project.version}