From e74e082bb66455f779b07b0ad8b9eb6e420205a7 Mon Sep 17 00:00:00 2001 From: Julien Cruz Date: Mon, 20 Apr 2015 02:22:15 -0400 Subject: [PATCH 1/5] Fixed build issues with streamflow-annotations --- pom.xml | 2 ++ streamflow-annotations/pom.xml | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 1f500bf..fb085ba 100644 --- a/pom.xml +++ b/pom.xml @@ -590,6 +590,8 @@ jgitflow-maven-plugin 1.0-m4.3 + true + true diff --git a/streamflow-annotations/pom.xml b/streamflow-annotations/pom.xml index 553c3e3..bd7674b 100644 --- a/streamflow-annotations/pom.xml +++ b/streamflow-annotations/pom.xml @@ -20,7 +20,7 @@ streamflow streamflow - 0.9.1 + 0.10.0-SNAPSHOT framework-annotations From a7b100d4b2e665d26ad1e9d7175bfb785bd19789 Mon Sep 17 00:00:00 2001 From: Julien Cruz Date: Mon, 25 May 2015 14:54:24 -0400 Subject: [PATCH 2/5] Updated streamflow-annotations maven coordinates to match existing artifacts. --- streamflow-annotations/pom.xml | 2 +- streamflow-frameworks/core-framework/pom.xml | 2 +- streamflow-frameworks/twitter-framework/pom.xml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/streamflow-annotations/pom.xml b/streamflow-annotations/pom.xml index bc3dd17..4518859 100644 --- a/streamflow-annotations/pom.xml +++ b/streamflow-annotations/pom.xml @@ -23,7 +23,7 @@ 0.11.0-SNAPSHOT - framework-annotations + streamflow-annotations Streamflow Annotations Annotations for use with Streamflow Frameworks diff --git a/streamflow-frameworks/core-framework/pom.xml b/streamflow-frameworks/core-framework/pom.xml index 379625f..427dd66 100644 --- a/streamflow-frameworks/core-framework/pom.xml +++ b/streamflow-frameworks/core-framework/pom.xml @@ -86,7 +86,7 @@ streamflow - framework-annotations + streamflow-annotations ${project.version} diff --git a/streamflow-frameworks/twitter-framework/pom.xml b/streamflow-frameworks/twitter-framework/pom.xml index 7812691..64ae362 100644 --- a/streamflow-frameworks/twitter-framework/pom.xml +++ b/streamflow-frameworks/twitter-framework/pom.xml @@ -87,7 +87,7 @@ streamflow - framework-annotations + streamflow-annotations ${project.version} From 43713444c5d36af3057b3b3316fa9ca7f353905d Mon Sep 17 00:00:00 2001 From: Julien Cruz Date: Mon, 25 May 2015 14:55:04 -0400 Subject: [PATCH 3/5] Reduced memory requirements for streamflow and removed WAR from the distribution to reduce footprint. --- streamflow-dist/src/main/assembly/common-bin.xml | 8 -------- streamflow-dist/src/main/resources/bin/streamflow.bat | 2 +- streamflow-dist/src/main/resources/bin/streamflow.sh | 2 +- 3 files changed, 2 insertions(+), 10 deletions(-) diff --git a/streamflow-dist/src/main/assembly/common-bin.xml b/streamflow-dist/src/main/assembly/common-bin.xml index 9f43a2c..9e78bc4 100644 --- a/streamflow-dist/src/main/assembly/common-bin.xml +++ b/streamflow-dist/src/main/assembly/common-bin.xml @@ -26,14 +26,6 @@ streamflow:streamflow-app-jar - - /deploy - false - false - - streamflow:streamflow-app-war - - /sample false diff --git a/streamflow-dist/src/main/resources/bin/streamflow.bat b/streamflow-dist/src/main/resources/bin/streamflow.bat index 8240f9a..8926aee 100644 --- a/streamflow-dist/src/main/resources/bin/streamflow.bat +++ b/streamflow-dist/src/main/resources/bin/streamflow.bat @@ -3,7 +3,7 @@ SETLOCAL rem Change this value to modify any JAVA_OPTS provided to the Streamflow server -set STREAMFLOW_OPTS="-Xms512m -Xmx1g" +set STREAMFLOW_OPTS="-Xms256m -Xmx256g" if NOT DEFINED JAVA_HOME goto err diff --git a/streamflow-dist/src/main/resources/bin/streamflow.sh b/streamflow-dist/src/main/resources/bin/streamflow.sh index 09f29a4..3c262a2 100644 --- a/streamflow-dist/src/main/resources/bin/streamflow.sh +++ b/streamflow-dist/src/main/resources/bin/streamflow.sh @@ -1,7 +1,7 @@ #!/bin/bash # Change this value to modify any JAVA_OPTS provided to the Streamflow server -STREAMFLOW_OPTS="-Xms512m -Xmx1g" +STREAMFLOW_OPTS="-Xms256m -Xmx256m" SCRIPT="$0" From 02f1ce7776c12d98b29e6204f7716b1efcbd0096 Mon Sep 17 00:00:00 2001 From: Julien Cruz Date: Mon, 25 May 2015 23:44:25 -0400 Subject: [PATCH 4/5] Added configuration option to disable the LocalCluster for Storm. This allows the cluster to be disabled in production environments using the system property -DlocalCluster.enabled=false. --- .../java/streamflow/engine/StormEngine.java | 25 +++++++----- .../engine/config/EngineModule.java | 9 ++++- .../model/config/LocalClusterConfig.java | 39 +++++++++++++++++++ .../model/config/StreamflowConfig.java | 20 +++++++++- .../streamflow/service/ClusterService.java | 12 +++--- .../streamflow/util/config/ConfigModule.java | 2 + 6 files changed, 88 insertions(+), 19 deletions(-) create mode 100644 streamflow-core/streamflow-model/src/main/java/streamflow/model/config/LocalClusterConfig.java diff --git a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/StormEngine.java b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/StormEngine.java index f2b9d86..123ab04 100644 --- a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/StormEngine.java +++ b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/StormEngine.java @@ -21,6 +21,7 @@ import backtype.storm.generated.NotAliveException; import com.google.inject.Inject; import com.google.inject.Singleton; +import com.google.inject.name.Named; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -53,7 +54,7 @@ public class StormEngine { protected static final Logger LOG = LoggerFactory.getLogger(StormEngine.class); - private final LocalCluster stormCluster; + private LocalCluster localCluster; private final StreamflowConfig streamflowConfig; @@ -62,8 +63,7 @@ public class StormEngine { private static final int KILL_BUFFER_SECS = 60; @Inject - public StormEngine(LocalCluster stormCluster, StreamflowConfig streamflowConfig) { - this.stormCluster = stormCluster; + public StormEngine(StreamflowConfig streamflowConfig) { this.streamflowConfig = streamflowConfig; // Add each of the clusters from the application configuration @@ -72,17 +72,22 @@ public StormEngine(LocalCluster stormCluster, StreamflowConfig streamflowConfig) clusters.put(cluster.getId(), cluster); } } + } + + @Inject(optional=true) + public void setLocalCluster(@Named("LocalCluster") LocalCluster localCluster) { + this.localCluster = localCluster; // Manually add the local cluster and add it to the cluster map - Cluster localCluster = new Cluster( + Cluster localClusterEntry = new Cluster( Cluster.LOCAL, "Local", "localhost", 6627, "localhost", 9300, null); - clusters.put(localCluster.getId(), localCluster); + clusters.put(localClusterEntry.getId(), localClusterEntry); } public Topology submitTopology(Topology topology, Cluster cluster) { // Execute topology submission in a thread to maintain separate context class loader for each topology TopologySubmitter submitter = new TopologySubmitter( - topology, cluster, stormCluster, streamflowConfig); + topology, cluster, localCluster, streamflowConfig); submitter.start(); try { @@ -107,7 +112,7 @@ public boolean killTopology(Topology topology, int waitTimeSecs, boolean async) if (isLocal(topology.getClusterId())) { // Kill the topology on the local cluster - stormCluster.killTopologyWithOpts(topology.getId(), killOptions); + localCluster.killTopologyWithOpts(topology.getId(), killOptions); } else { Cluster cluster = clusters.get(topology.getClusterId()); @@ -142,7 +147,7 @@ public ClusterSummary getClusterSummary(Cluster cluster) { if (cluster != null) { if (isLocal(cluster.getId())) { - summary = stormCluster.getClusterInfo(); + summary = localCluster.getClusterInfo(); } else { TSocket tsocket = new TSocket(cluster.getNimbusHost(), cluster.getNimbusPort()); TFramedTransport tTransport = new TFramedTransport(tsocket); @@ -228,8 +233,8 @@ public TopologyInfo getTopologyInfo(Topology topology) { } if (isLocal(topology.getClusterId())) { - info = stormCluster.getTopologyInfo(stormTopologyId); - topologyConf = stormCluster.getTopologyConf(stormTopologyId); + info = localCluster.getTopologyInfo(stormTopologyId); + topologyConf = localCluster.getTopologyConf(stormTopologyId); } else { Cluster cluster = clusters.get(topology.getClusterId()); diff --git a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/config/EngineModule.java b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/config/EngineModule.java index d803034..26756ca 100644 --- a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/config/EngineModule.java +++ b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/config/EngineModule.java @@ -17,7 +17,10 @@ import backtype.storm.LocalCluster; import com.google.inject.AbstractModule; +import com.google.inject.name.Names; import streamflow.engine.StormEngine; +import streamflow.model.config.LocalClusterConfig; +import streamflow.util.config.ConfigLoader; public class EngineModule extends AbstractModule { @@ -25,7 +28,9 @@ public class EngineModule extends AbstractModule { protected void configure() { bind(StormEngine.class); - // Would prefer to use provider, but newer interface does not provide eager initialization - bind(LocalCluster.class).toInstance(new LocalCluster()); + LocalClusterConfig localClusterConfig = ConfigLoader.getConfig().getLocalCluster(); + if (localClusterConfig != null && localClusterConfig.isEnabled()) { + bind(LocalCluster.class).annotatedWith(Names.named("LocalCluster")).toInstance(new LocalCluster()); + } } } diff --git a/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/LocalClusterConfig.java b/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/LocalClusterConfig.java new file mode 100644 index 0000000..23491a2 --- /dev/null +++ b/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/LocalClusterConfig.java @@ -0,0 +1,39 @@ +/** + * Copyright 2014 Lockheed Martin Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package streamflow.model.config; + +import java.io.Serializable; + +public class LocalClusterConfig implements Serializable { + + private boolean enabled = true; + + public LocalClusterConfig() { + } + + public boolean isEnabled() { + return Boolean.parseBoolean(System.getProperty("localCluster.enabled", Boolean.toString(enabled))); + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + @Override + public String toString() { + return "LocalClusterConfig{" + "enabled=" + enabled + '}'; + } +} diff --git a/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/StreamflowConfig.java b/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/StreamflowConfig.java index 4fd731f..9756c9d 100644 --- a/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/StreamflowConfig.java +++ b/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/StreamflowConfig.java @@ -34,8 +34,10 @@ public class StreamflowConfig implements Serializable { private LoggerConfig logger = new LoggerConfig(); private AuthConfig auth = new AuthConfig(); + + private LocalClusterConfig localCluster = new LocalClusterConfig(); - private List clusters = new ArrayList(); + private List clusters = new ArrayList<>(); private Cluster selectedCluster; @@ -82,6 +84,14 @@ public void setAuth(AuthConfig auth) { this.auth = auth; } + public LocalClusterConfig getLocalCluster() { + return localCluster; + } + + public void setLocalCluster(LocalClusterConfig localCluster) { + this.localCluster = localCluster; + } + public List getClusters() { return clusters; } @@ -106,6 +116,7 @@ public int hashCode() { hash = 29 * hash + (this.datastore != null ? this.datastore.hashCode() : 0); hash = 29 * hash + (this.logger != null ? this.logger.hashCode() : 0); hash = 29 * hash + (this.auth != null ? this.auth.hashCode() : 0); + hash = 29 * hash + (this.localCluster != null ? this.localCluster.hashCode() : 0); hash = 29 * hash + (this.clusters != null ? this.clusters.hashCode() : 0); return hash; } @@ -139,6 +150,10 @@ public boolean equals(Object obj) { || !this.auth.equals(other.auth))) { return false; } + if (this.localCluster != other.localCluster && (this.localCluster == null + || !this.localCluster.equals(other.localCluster))) { + return false; + } if (this.clusters != other.clusters && (this.clusters == null || !this.clusters.equals(other.clusters))) { return false; @@ -150,6 +165,7 @@ public boolean equals(Object obj) { public String toString() { return "StreamFlowConfig{" + "server=" + server + ", proxy=" + proxy + ", datastore=" + datastore + ", logger=" + logger - + ", auth=" + auth + ", clusters=" + clusters + '}'; + + ", auth=" + auth + ", localCluster=" + localCluster + + ", clusters=" + clusters + '}'; } } diff --git a/streamflow-core/streamflow-service/src/main/java/streamflow/service/ClusterService.java b/streamflow-core/streamflow-service/src/main/java/streamflow/service/ClusterService.java index 717f9fc..dc3aba6 100644 --- a/streamflow-core/streamflow-service/src/main/java/streamflow/service/ClusterService.java +++ b/streamflow-core/streamflow-service/src/main/java/streamflow/service/ClusterService.java @@ -33,7 +33,7 @@ public class ClusterService { private final StormEngine stormEngine; - private final Map clusters = new HashMap(); + private final Map clusters = new HashMap<>(); @Inject public ClusterService(StormEngine stormEngine, StreamflowConfig streamflowConfig) { @@ -46,10 +46,12 @@ public ClusterService(StormEngine stormEngine, StreamflowConfig streamflowConfig } } - // Generate the local cluster and add it to the cluster map - Cluster localCluster = new Cluster( - Cluster.LOCAL, "Local", "localhost", 6627, "localhost", 9300, null); - clusters.put(localCluster.getId(), localCluster); + if (streamflowConfig.getLocalCluster().isEnabled()) { + // Generate the local cluster and add it to the cluster map + Cluster localCluster = new Cluster( + Cluster.LOCAL, "Local", "localhost", 6627, "localhost", 9300, null); + clusters.put(localCluster.getId(), localCluster); + } } public Collection listClusters() { diff --git a/streamflow-core/streamflow-util/src/main/java/streamflow/util/config/ConfigModule.java b/streamflow-core/streamflow-util/src/main/java/streamflow/util/config/ConfigModule.java index b4f23bb..5feeb81 100644 --- a/streamflow-core/streamflow-util/src/main/java/streamflow/util/config/ConfigModule.java +++ b/streamflow-core/streamflow-util/src/main/java/streamflow/util/config/ConfigModule.java @@ -24,6 +24,7 @@ import streamflow.model.config.ServerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import streamflow.model.config.LocalClusterConfig; public class ConfigModule extends AbstractModule { @@ -38,6 +39,7 @@ protected void configure() { bind(AuthConfig.class).toInstance(streamflowConfig.getAuth()); bind(ProxyConfig.class).toInstance(streamflowConfig.getProxy()); bind(LoggerConfig.class).toInstance(streamflowConfig.getLogger()); + bind(LocalClusterConfig.class).toInstance(streamflowConfig.getLocalCluster()); bind(DatastoreConfig.class).toInstance(streamflowConfig.getDatastore()); } } From 3867917802794d97de24f028363088b547dda062 Mon Sep 17 00:00:00 2001 From: Julien Cruz Date: Tue, 26 May 2015 01:32:18 -0400 Subject: [PATCH 5/5] Updated streamflow.yml configurations and added hashcode/equals methods to new model classes. --- .../model/config/LocalClusterConfig.java | 22 +++++++++++++++++++ .../src/test/resources/streamflow.yml | 3 +++ .../src/main/resources/conf/streamflow.yml | 4 ++++ 3 files changed, 29 insertions(+) diff --git a/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/LocalClusterConfig.java b/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/LocalClusterConfig.java index 23491a2..a6819a7 100644 --- a/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/LocalClusterConfig.java +++ b/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/LocalClusterConfig.java @@ -31,6 +31,28 @@ public boolean isEnabled() { public void setEnabled(boolean enabled) { this.enabled = enabled; } + + @Override + public int hashCode() { + int hash = 3; + hash = 43 * hash + (this.enabled ? 1 : 0); + return hash; + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final LocalClusterConfig other = (LocalClusterConfig) obj; + if (this.enabled != other.enabled) { + return false; + } + return true; + } @Override public String toString() { diff --git a/streamflow-core/streamflow-util/src/test/resources/streamflow.yml b/streamflow-core/streamflow-util/src/test/resources/streamflow.yml index f16ca06..dc824ab 100644 --- a/streamflow-core/streamflow-util/src/test/resources/streamflow.yml +++ b/streamflow-core/streamflow-util/src/test/resources/streamflow.yml @@ -20,6 +20,9 @@ datastore: proxy: host: test.classpath.proxy port: 80 + +localCluster: + enabled: true # Cluster Configuration clusters: diff --git a/streamflow-dist/src/main/resources/conf/streamflow.yml b/streamflow-dist/src/main/resources/conf/streamflow.yml index 7930117..f2b88ad 100644 --- a/streamflow-dist/src/main/resources/conf/streamflow.yml +++ b/streamflow-dist/src/main/resources/conf/streamflow.yml @@ -23,6 +23,10 @@ server: # Datastore configuration #datastore: # moduleClass: streamflow.datastore.jdbc.config.JDBCDatastoreModule + +# Local Cluster configuration +#localCluster: +# enabled: true # Cluster Configuration #clusters: