Skip to content

Commit

Permalink
Merge pull request #34 from juliencruz/develop
Browse files Browse the repository at this point in the history
Storm LocalCluster configuration and streamflow-annotation fixes
  • Loading branch information
juliencruz committed May 26, 2015
2 parents df0f15d + 3867917 commit 1b58893
Show file tree
Hide file tree
Showing 14 changed files with 122 additions and 32 deletions.
2 changes: 1 addition & 1 deletion streamflow-annotations/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<version>0.12.0-SNAPSHOT</version>
</parent>

<artifactId>framework-annotations</artifactId>
<artifactId>streamflow-annotations</artifactId>
<name>Streamflow Annotations</name>
<description>Annotations for use with Streamflow Frameworks</description>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import backtype.storm.generated.NotAliveException;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -53,7 +54,7 @@ public class StormEngine {

protected static final Logger LOG = LoggerFactory.getLogger(StormEngine.class);

private final LocalCluster stormCluster;
private LocalCluster localCluster;

private final StreamflowConfig streamflowConfig;

Expand All @@ -62,8 +63,7 @@ public class StormEngine {
private static final int KILL_BUFFER_SECS = 60;

@Inject
public StormEngine(LocalCluster stormCluster, StreamflowConfig streamflowConfig) {
this.stormCluster = stormCluster;
public StormEngine(StreamflowConfig streamflowConfig) {
this.streamflowConfig = streamflowConfig;

// Add each of the clusters from the application configuration
Expand All @@ -72,17 +72,22 @@ public StormEngine(LocalCluster stormCluster, StreamflowConfig streamflowConfig)
clusters.put(cluster.getId(), cluster);
}
}
}

@Inject(optional=true)
public void setLocalCluster(@Named("LocalCluster") LocalCluster localCluster) {
this.localCluster = localCluster;

// Manually add the local cluster and add it to the cluster map
Cluster localCluster = new Cluster(
Cluster localClusterEntry = new Cluster(
Cluster.LOCAL, "Local", "localhost", 6627, "localhost", 9300, null);
clusters.put(localCluster.getId(), localCluster);
clusters.put(localClusterEntry.getId(), localClusterEntry);
}

public Topology submitTopology(Topology topology, Cluster cluster) {
// Execute topology submission in a thread to maintain separate context class loader for each topology
TopologySubmitter submitter = new TopologySubmitter(
topology, cluster, stormCluster, streamflowConfig);
topology, cluster, localCluster, streamflowConfig);
submitter.start();

try {
Expand All @@ -107,7 +112,7 @@ public boolean killTopology(Topology topology, int waitTimeSecs, boolean async)

if (isLocal(topology.getClusterId())) {
// Kill the topology on the local cluster
stormCluster.killTopologyWithOpts(topology.getId(), killOptions);
localCluster.killTopologyWithOpts(topology.getId(), killOptions);
} else {
Cluster cluster = clusters.get(topology.getClusterId());

Expand Down Expand Up @@ -142,7 +147,7 @@ public ClusterSummary getClusterSummary(Cluster cluster) {

if (cluster != null) {
if (isLocal(cluster.getId())) {
summary = stormCluster.getClusterInfo();
summary = localCluster.getClusterInfo();
} else {
TSocket tsocket = new TSocket(cluster.getNimbusHost(), cluster.getNimbusPort());
TFramedTransport tTransport = new TFramedTransport(tsocket);
Expand Down Expand Up @@ -228,8 +233,8 @@ public TopologyInfo getTopologyInfo(Topology topology) {
}

if (isLocal(topology.getClusterId())) {
info = stormCluster.getTopologyInfo(stormTopologyId);
topologyConf = stormCluster.getTopologyConf(stormTopologyId);
info = localCluster.getTopologyInfo(stormTopologyId);
topologyConf = localCluster.getTopologyConf(stormTopologyId);
} else {
Cluster cluster = clusters.get(topology.getClusterId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@

import backtype.storm.LocalCluster;
import com.google.inject.AbstractModule;
import com.google.inject.name.Names;
import streamflow.engine.StormEngine;
import streamflow.model.config.LocalClusterConfig;
import streamflow.util.config.ConfigLoader;

public class EngineModule extends AbstractModule {

@Override
protected void configure() {
bind(StormEngine.class);

// Would prefer to use provider, but newer interface does not provide eager initialization
bind(LocalCluster.class).toInstance(new LocalCluster());
LocalClusterConfig localClusterConfig = ConfigLoader.getConfig().getLocalCluster();
if (localClusterConfig != null && localClusterConfig.isEnabled()) {
bind(LocalCluster.class).annotatedWith(Names.named("LocalCluster")).toInstance(new LocalCluster());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* Copyright 2014 Lockheed Martin Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package streamflow.model.config;

import java.io.Serializable;

public class LocalClusterConfig implements Serializable {

private boolean enabled = true;

public LocalClusterConfig() {
}

public boolean isEnabled() {
return Boolean.parseBoolean(System.getProperty("localCluster.enabled", Boolean.toString(enabled)));
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

@Override
public int hashCode() {
int hash = 3;
hash = 43 * hash + (this.enabled ? 1 : 0);
return hash;
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final LocalClusterConfig other = (LocalClusterConfig) obj;
if (this.enabled != other.enabled) {
return false;
}
return true;
}

@Override
public String toString() {
return "LocalClusterConfig{" + "enabled=" + enabled + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ public class StreamflowConfig implements Serializable {
private LoggerConfig logger = new LoggerConfig();

private AuthConfig auth = new AuthConfig();

private LocalClusterConfig localCluster = new LocalClusterConfig();

private List<Cluster> clusters = new ArrayList<Cluster>();
private List<Cluster> clusters = new ArrayList<>();

private Cluster selectedCluster;

Expand Down Expand Up @@ -82,6 +84,14 @@ public void setAuth(AuthConfig auth) {
this.auth = auth;
}

public LocalClusterConfig getLocalCluster() {
return localCluster;
}

public void setLocalCluster(LocalClusterConfig localCluster) {
this.localCluster = localCluster;
}

public List<Cluster> getClusters() {
return clusters;
}
Expand All @@ -106,6 +116,7 @@ public int hashCode() {
hash = 29 * hash + (this.datastore != null ? this.datastore.hashCode() : 0);
hash = 29 * hash + (this.logger != null ? this.logger.hashCode() : 0);
hash = 29 * hash + (this.auth != null ? this.auth.hashCode() : 0);
hash = 29 * hash + (this.localCluster != null ? this.localCluster.hashCode() : 0);
hash = 29 * hash + (this.clusters != null ? this.clusters.hashCode() : 0);
return hash;
}
Expand Down Expand Up @@ -139,6 +150,10 @@ public boolean equals(Object obj) {
|| !this.auth.equals(other.auth))) {
return false;
}
if (this.localCluster != other.localCluster && (this.localCluster == null
|| !this.localCluster.equals(other.localCluster))) {
return false;
}
if (this.clusters != other.clusters && (this.clusters == null
|| !this.clusters.equals(other.clusters))) {
return false;
Expand All @@ -150,6 +165,7 @@ public boolean equals(Object obj) {
public String toString() {
return "StreamFlowConfig{" + "server=" + server + ", proxy=" + proxy
+ ", datastore=" + datastore + ", logger=" + logger
+ ", auth=" + auth + ", clusters=" + clusters + '}';
+ ", auth=" + auth + ", localCluster=" + localCluster
+ ", clusters=" + clusters + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class ClusterService {

private final StormEngine stormEngine;

private final Map<String, Cluster> clusters = new HashMap<String, Cluster>();
private final Map<String, Cluster> clusters = new HashMap<>();

@Inject
public ClusterService(StormEngine stormEngine, StreamflowConfig streamflowConfig) {
Expand All @@ -46,10 +46,12 @@ public ClusterService(StormEngine stormEngine, StreamflowConfig streamflowConfig
}
}

// Generate the local cluster and add it to the cluster map
Cluster localCluster = new Cluster(
Cluster.LOCAL, "Local", "localhost", 6627, "localhost", 9300, null);
clusters.put(localCluster.getId(), localCluster);
if (streamflowConfig.getLocalCluster().isEnabled()) {
// Generate the local cluster and add it to the cluster map
Cluster localCluster = new Cluster(
Cluster.LOCAL, "Local", "localhost", 6627, "localhost", 9300, null);
clusters.put(localCluster.getId(), localCluster);
}
}

public Collection<Cluster> listClusters() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import streamflow.model.config.ServerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import streamflow.model.config.LocalClusterConfig;

public class ConfigModule extends AbstractModule {

Expand All @@ -38,6 +39,7 @@ protected void configure() {
bind(AuthConfig.class).toInstance(streamflowConfig.getAuth());
bind(ProxyConfig.class).toInstance(streamflowConfig.getProxy());
bind(LoggerConfig.class).toInstance(streamflowConfig.getLogger());
bind(LocalClusterConfig.class).toInstance(streamflowConfig.getLocalCluster());
bind(DatastoreConfig.class).toInstance(streamflowConfig.getDatastore());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ datastore:
proxy:
host: test.classpath.proxy
port: 80

localCluster:
enabled: true

# Cluster Configuration
clusters:
Expand Down
8 changes: 0 additions & 8 deletions streamflow-dist/src/main/assembly/common-bin.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,6 @@
<include>streamflow:streamflow-app-jar</include>
</includes>
</dependencySet>
<dependencySet>
<outputDirectory>/deploy</outputDirectory>
<useProjectArtifact>false</useProjectArtifact>
<useTransitiveDependencies>false</useTransitiveDependencies>
<includes>
<include>streamflow:streamflow-app-war</include>
</includes>
</dependencySet>
<dependencySet>
<outputDirectory>/sample</outputDirectory>
<useProjectArtifact>false</useProjectArtifact>
Expand Down
2 changes: 1 addition & 1 deletion streamflow-dist/src/main/resources/bin/streamflow.bat
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
SETLOCAL

rem Change this value to modify any JAVA_OPTS provided to the Streamflow server
set STREAMFLOW_OPTS="-Xms512m -Xmx1g"
set STREAMFLOW_OPTS="-Xms256m -Xmx256g"

if NOT DEFINED JAVA_HOME goto err

Expand Down
2 changes: 1 addition & 1 deletion streamflow-dist/src/main/resources/bin/streamflow.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash

# Change this value to modify any JAVA_OPTS provided to the Streamflow server
STREAMFLOW_OPTS="-Xms512m -Xmx1g"
STREAMFLOW_OPTS="-Xms256m -Xmx256m"

SCRIPT="$0"

Expand Down
4 changes: 4 additions & 0 deletions streamflow-dist/src/main/resources/conf/streamflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ server:
# Datastore configuration
#datastore:
# moduleClass: streamflow.datastore.jdbc.config.JDBCDatastoreModule

# Local Cluster configuration
#localCluster:
# enabled: true

# Cluster Configuration
#clusters:
Expand Down
2 changes: 1 addition & 1 deletion streamflow-frameworks/core-framework/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
<dependencies>
<dependency>
<groupId>streamflow</groupId>
<artifactId>framework-annotations</artifactId>
<artifactId>streamflow-annotations</artifactId>
<version>${project.version}</version>
</dependency>

Expand Down
2 changes: 1 addition & 1 deletion streamflow-frameworks/twitter-framework/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@

<dependency>
<groupId>streamflow</groupId>
<artifactId>framework-annotations</artifactId>
<artifactId>streamflow-annotations</artifactId>
<version>${project.version}</version>
</dependency>

Expand Down

0 comments on commit 1b58893

Please sign in to comment.