Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added topology status monitor and upgraded Storm version to 0.9.5 #41

Merged
merged 5 commits into from
Aug 30, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ nbactions.xml
.DS_Store
*.DS_Store
*.iml
.idea
.idea/
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

<!-- Dependency Versions -->
<storm.version>0.9.3</storm.version>
<storm.version>0.9.5</storm.version>
<jetty.version>9.0.0.RC2</jetty.version>
<jackson.version>2.4.1</jackson.version>
<jersey.version>1.17.1</jersey.version>
Expand Down
2 changes: 1 addition & 1 deletion streamflow-core/streamflow-engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>13.0</version>
<version>18.0</version>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,13 @@ public void run() {
localCluster.submitTopology(topology.getId(), stormConfig, stormTopology);
}
} catch (AlreadyAliveException ex) {
LOG.error("The specified topology is already running on the cluster: {}", ex);
LOG.error("The specified topology is already running on the cluster:", ex);
} catch (InvalidTopologyException ex) {
LOG.error("The specified topology is invalid: " + ex);
} catch (FrameworkException ex) {
LOG.error("The topology was unable to load a dependent framework: {}", ex);
LOG.error("The topology was unable to load a dependent framework:", ex);
} catch (Exception ex) {
LOG.error("The topology threw an uncaught exception: {}", ex);
LOG.error("The topology threw an uncaught exception:", ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class Cluster implements Serializable {

private Integer nimbusPort = 6627;

private String version = "0.9.1";
private String version = "0.9.5";

private String logServerHost = "localhost";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/**
* Copyright 2014 Lockheed Martin Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package streamflow.model.config;

import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonAnySetter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

public class MonitorConfig implements Serializable {

static Logger LOG = LoggerFactory.getLogger(MonitorConfig.class);

private boolean enabled = false;

private int pollingInterval = 60;

public MonitorConfig() {
}

public boolean isEnabled() {
return Boolean.parseBoolean(
System.getProperty("monitor.enabled", Boolean.toString(enabled)));
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

public int getPollingInterval() {
if (System.getProperty("monitor.pollingInterval") != null) {
try {
pollingInterval = Integer.parseInt(System.getProperty("monitor.pollingInterval"));
} catch (Exception ex) {
}
}
return pollingInterval;
}

public void setPollingInterval(int pollingInterval) {
this.pollingInterval = pollingInterval;
}

@Override
public int hashCode() {
int hash = 3;
hash = 43 * hash + (this.enabled ? 1 : 0);
hash = 43 * hash + this.pollingInterval;
return hash;
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final MonitorConfig other = (MonitorConfig) obj;
if (this.enabled != other.enabled) {
return false;
}
if (this.pollingInterval != other.pollingInterval) {
return false;
}
return true;
}

@Override
public String toString() {
return "MonitorConfig{" + "enabled=" + enabled + ", pollingInterval=" + pollingInterval + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ public class StreamflowConfig implements Serializable {
private LoggerConfig logger = new LoggerConfig();

private AuthConfig auth = new AuthConfig();


private MonitorConfig monitor = new MonitorConfig();

private LocalClusterConfig localCluster = new LocalClusterConfig();

private List<Cluster> clusters = new ArrayList<>();
Expand Down Expand Up @@ -84,6 +86,14 @@ public void setAuth(AuthConfig auth) {
this.auth = auth;
}

public MonitorConfig getMonitor() {
return monitor;
}

public void setMonitor(MonitorConfig monitor) {
this.monitor = monitor;
}

public LocalClusterConfig getLocalCluster() {
return localCluster;
}
Expand Down Expand Up @@ -116,6 +126,7 @@ public int hashCode() {
hash = 29 * hash + (this.datastore != null ? this.datastore.hashCode() : 0);
hash = 29 * hash + (this.logger != null ? this.logger.hashCode() : 0);
hash = 29 * hash + (this.auth != null ? this.auth.hashCode() : 0);
hash = 29 * hash + (this.monitor != null ? this.monitor.hashCode() : 0);
hash = 29 * hash + (this.localCluster != null ? this.localCluster.hashCode() : 0);
hash = 29 * hash + (this.clusters != null ? this.clusters.hashCode() : 0);
return hash;
Expand Down Expand Up @@ -150,6 +161,10 @@ public boolean equals(Object obj) {
|| !this.auth.equals(other.auth))) {
return false;
}
if (this.monitor != other.monitor && (this.monitor == null
|| !this.monitor.equals(other.monitor))) {
return false;
}
if (this.localCluster != other.localCluster && (this.localCluster == null
|| !this.localCluster.equals(other.localCluster))) {
return false;
Expand All @@ -165,7 +180,7 @@ public boolean equals(Object obj) {
public String toString() {
return "StreamFlowConfig{" + "server=" + server + ", proxy=" + proxy
+ ", datastore=" + datastore + ", logger=" + logger
+ ", auth=" + auth + ", localCluster=" + localCluster
+ ", auth=" + auth + ", monitor=" + monitor + ", localCluster=" + localCluster
+ ", clusters=" + clusters + '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package streamflow.server.config;

import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Service;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import streamflow.model.config.MonitorConfig;
import streamflow.model.config.StreamflowConfig;
import streamflow.server.service.TopologyMonitorService;
import streamflow.util.config.ConfigLoader;

import java.util.Set;

public class GuavaServiceModule extends AbstractModule {

@Override
protected void configure() {
MonitorConfig monitorConfig = ConfigLoader.getConfig().getMonitor();
if (monitorConfig.isEnabled()) {
bind(TopologyMonitorService.class);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
*/
package streamflow.server.config;

//import com.google.common.util.concurrent.ServiceManager;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceServletContextListener;
import javax.servlet.ServletContext;
import javax.servlet.ServletContextEvent;
import javax.servlet.annotation.WebListener;

import org.apache.storm.guava.util.concurrent.ServiceManager;
import streamflow.datastore.config.DatastoreModule;
import streamflow.engine.config.EngineModule;
import streamflow.server.service.TopologyMonitorService;
import streamflow.service.config.ServiceModule;
import streamflow.util.config.ConfigModule;
import org.apache.shiro.guice.web.ShiroWebModule;
Expand All @@ -39,9 +43,17 @@ protected Injector getInjector() {
StreamflowEnvironment.setStreamflowHome(System.getenv("STREAMFLOW_HOME"));
StreamflowEnvironment.initialize();

return Guice.createInjector(new ConfigModule(), new DatastoreModule(),
new ServiceModule(), new EngineModule(), new JerseyModule(),
Injector injector = Guice.createInjector(new ConfigModule(), new DatastoreModule(),
new ServiceModule(), new GuavaServiceModule(), new EngineModule(), new JerseyModule(),
new SecurityModule(servletContext), ShiroWebModule.guiceFilterModule());

// Initialize the service manager to manage daemon services
//ServiceManager manager = injector.getInstance(ServiceManager.class);
//manager.startAsync().awaitHealthy();
TopologyMonitorService topologyMonitorService = injector.getInstance(TopologyMonitorService.class);
topologyMonitorService.startAsync().awaitRunning();

return injector;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package streamflow.server.service;

import com.google.common.util.concurrent.AbstractScheduledService;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import streamflow.model.Topology;
import streamflow.model.config.MonitorConfig;
import streamflow.service.TopologyService;

import java.util.concurrent.TimeUnit;

@Singleton
public class TopologyMonitorService extends AbstractScheduledService {

public static final Logger LOG = LoggerFactory.getLogger(TopologyMonitorService.class);

private TopologyService topologyService;
private MonitorConfig monitorConfig;

@Inject
public TopologyMonitorService(TopologyService topologyService, MonitorConfig monitorConfig) {
this.topologyService = topologyService;
this.monitorConfig = monitorConfig;
}

@Override
protected void startUp() throws Exception {
LOG.info("Topology Status Monitor Started...");
}

@Override
protected void runOneIteration() throws Exception {
// Iterate over all of the topologies for each user to check live status
for (Topology topology : topologyService.listAllTopologies()) {
try {
// Get the current live status of the topology
String topologyStatusDesired = topology.getStatus();
String topologyStatusActual = topologyService.getTopology(topology.getId(), topology.getUserId()).getStatus();

if (topologyStatusDesired.equalsIgnoreCase("ACTIVE")) {
// Topology should be submitted, but isn't active so resubmit to desired state
if (!topologyStatusActual.equalsIgnoreCase("ACTIVE")) {
LOG.warn("Topology has a desired state of ACTIVE but is not currently deployed. "
+ "Redeploying topology... ID = " + topology.getId() + ", Name = " + topology.getName());

// Resubmit the topology using the same settings as originally submitted
Topology submittedTopology = topologyService.submitTopology(
topology.getId(), topology.getUserId(), topology.getClusterId(),
topology.getLogLevel(), topology.getClassLoaderPolicy());

if (topology != null && topology.getStatus().equalsIgnoreCase("ACTIVE")) {
LOG.info("Topology redeploy succeeded: ID = " + topology.getId() + ", Name = " + topology.getName());
} else {
LOG.error("Topology redeploy failed: ID = " + topology.getId() + ", Name = " + topology.getName());
}
}
}
} catch (Exception ex) {
LOG.error("An exception occurred while checking topology status: ID = "
+ topology.getId() + ", Name = " + topology.getName(), ex);
}
}
}

@Override
protected Scheduler scheduler() {
return Scheduler.newFixedRateSchedule(monitorConfig.getPollingInterval(), monitorConfig.getPollingInterval(), TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,9 @@ topologyModule.controller('TopologyPropertiesNewController', [
'storm.messaging.netty.max_wait_ms',
'storm.messaging.netty.min_wait_ms',
'storm.messaging.netty.transfer.batch.size',
'storm.messaging.netty.socket.backlog',
'storm.messaging.netty.flush.check.interval.ms',
'storm.messaging.netty.authentication',
'topology.enable.message.timeouts',
'topology.debug',
'topology.workers',
Expand Down Expand Up @@ -584,8 +586,10 @@ topologyModule.controller('TopologyPropertiesNewController', [
'topology.kryo.factory',
'topology.tuple.serializer',
'topology.trident.batch.emit.interval.millis',
'storm.group.mapping.service.cache.duration.secs',
'topology.classpath',
'topology.environment'
'topology.environment',
'topology.bolts.outgoing.overflow.buffer.enable'
];

$scope.add = function() {
Expand Down
Loading