diff --git a/.gitignore b/.gitignore index 6a7917c..561e014 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,4 @@ nbactions.xml .DS_Store *.DS_Store *.iml -.idea +.idea/ diff --git a/pom.xml b/pom.xml index ec5189f..5ce491b 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,7 @@ UTF-8 - 0.9.3 + 0.9.5 9.0.0.RC2 2.4.1 1.17.1 diff --git a/streamflow-core/streamflow-engine/pom.xml b/streamflow-core/streamflow-engine/pom.xml index dacf9a3..7eb27cd 100644 --- a/streamflow-core/streamflow-engine/pom.xml +++ b/streamflow-core/streamflow-engine/pom.xml @@ -50,7 +50,7 @@ com.google.guava guava - 13.0 + 18.0 com.google.inject diff --git a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/StormEngine.java b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/StormEngine.java index 123ab04..09d9149 100644 --- a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/StormEngine.java +++ b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/StormEngine.java @@ -130,10 +130,13 @@ public boolean killTopology(Topology topology, int waitTimeSecs, boolean async) killed = waitForTopologyRemoval(topology, waitTimeSecs + KILL_BUFFER_SECS); } + } catch (NotAliveException ex) { + // Topology is not running on the cluster so just ignore + killed = true; } catch (Exception ex) { LOG.error("Exception occurred while killing the remote topology: ID = " + topology.getId() + ", Reason = " + ex.getMessage()); - + ex.printStackTrace(); killed = false; } } @@ -252,7 +255,7 @@ public TopologyInfo getTopologyInfo(Topology topology) { } catch (NotAliveException ex) { LOG.error("The requested topology was not found in the cluster: ID = " + stormTopologyId); } catch (TException ex) { - LOG.error("Exception while retrieving the remote topology info: ", ex); + LOG.error("Exception while retrieving the remote topology info: ", ex.getMessage()); } finally { tTransport.close(); } @@ -267,167 +270,169 @@ public TopologyInfo getTopologyInfo(Topology topology) { */ TopologyInfo topologyInfo = new TopologyInfo(); - topologyInfo.setId(info.get_id()); - topologyInfo.setName(info.get_name()); - topologyInfo.setStatus(info.get_status()); - topologyInfo.setUptimeSecs(info.get_uptime_secs()); - topologyInfo.setTopologyConf(topologyConf); - - for (Map.Entry> error - : info.get_errors().entrySet()) { - List errorInfoList = new ArrayList<>(); - for (backtype.storm.generated.ErrorInfo ei : error.getValue()) { - ErrorInfo errorInfo = new ErrorInfo(); - errorInfo.setError(ei.get_error()); - errorInfo.setErrorTimeSecs(ei.get_error_time_secs()); - errorInfo.setHost(ei.get_host()); - errorInfo.setPort(ei.get_port()); - - errorInfoList.add(errorInfo); + if (info != null) { + topologyInfo.setId(info.get_id()); + topologyInfo.setName(info.get_name()); + topologyInfo.setStatus(info.get_status()); + topologyInfo.setUptimeSecs(info.get_uptime_secs()); + topologyInfo.setTopologyConf(topologyConf); + + for (Map.Entry> error + : info.get_errors().entrySet()) { + List errorInfoList = new ArrayList<>(); + for (backtype.storm.generated.ErrorInfo ei : error.getValue()) { + ErrorInfo errorInfo = new ErrorInfo(); + errorInfo.setError(ei.get_error()); + errorInfo.setErrorTimeSecs(ei.get_error_time_secs()); + errorInfo.setHost(ei.get_host()); + errorInfo.setPort(ei.get_port()); + + errorInfoList.add(errorInfo); + } + + topologyInfo.getErrors().put(error.getKey(), errorInfoList); } - topologyInfo.getErrors().put(error.getKey(), errorInfoList); - } + List executorSummaries = new ArrayList<>(); + for (backtype.storm.generated.ExecutorSummary es : info.get_executors()) { + ExecutorSummary executor = new ExecutorSummary(); + executor.setComponentId(es.get_component_id()); + executor.setHost(es.get_host()); + executor.setPort(es.get_port()); + executor.setUptimeSecs(es.get_uptime_secs()); + + backtype.storm.generated.ExecutorInfo ei = es.get_executor_info(); + if (ei != null) { + ExecutorInfo executorInfo = new ExecutorInfo(); + executorInfo.setTaskStart(ei.get_task_start()); + executorInfo.setTaskEnd(ei.get_task_end()); + + executor.setExecutorInfo(executorInfo); + } - List executorSummaries = new ArrayList<>(); - for (backtype.storm.generated.ExecutorSummary es : info.get_executors()) { - ExecutorSummary executor = new ExecutorSummary(); - executor.setComponentId(es.get_component_id()); - executor.setHost(es.get_host()); - executor.setPort(es.get_port()); - executor.setUptimeSecs(es.get_uptime_secs()); - - backtype.storm.generated.ExecutorInfo ei = es.get_executor_info(); - if (ei != null) { - ExecutorInfo executorInfo = new ExecutorInfo(); - executorInfo.setTaskStart(ei.get_task_start()); - executorInfo.setTaskEnd(ei.get_task_end()); - - executor.setExecutorInfo(executorInfo); - } + backtype.storm.generated.ExecutorStats eStats = es.get_stats(); + if (eStats != null) { + ExecutorStats stats = new ExecutorStats(); + stats.setEmitted(eStats.get_emitted()); + stats.setTransferred(eStats.get_transferred()); - backtype.storm.generated.ExecutorStats eStats = es.get_stats(); - if (eStats != null) { - ExecutorStats stats = new ExecutorStats(); - stats.setEmitted(eStats.get_emitted()); - stats.setTransferred(eStats.get_transferred()); + backtype.storm.generated.ExecutorSpecificStats ess = eStats.get_specific(); + if (ess != null) { + ExecutorSpecificStats specific = new ExecutorSpecificStats(); - backtype.storm.generated.ExecutorSpecificStats ess = eStats.get_specific(); - if (ess != null) { - ExecutorSpecificStats specific = new ExecutorSpecificStats(); + if (ess.is_set_bolt()) { + backtype.storm.generated.BoltStats bs = ess.get_bolt(); + if (bs != null) { + BoltStats boltStats = new BoltStats(); - if (ess.is_set_bolt()) { - backtype.storm.generated.BoltStats bs = ess.get_bolt(); - if (bs != null) { - BoltStats boltStats = new BoltStats(); + for (Map.Entry> ae + : bs.get_acked().entrySet()) { + Map ackedMap = new HashMap<>(); - for (Map.Entry> ae - : bs.get_acked().entrySet()) { - Map ackedMap = new HashMap<>(); + for (Map.Entry aem + : ae.getValue().entrySet()) { + backtype.storm.generated.GlobalStreamId gsi = aem.getKey(); - for (Map.Entry aem - : ae.getValue().entrySet()) { - backtype.storm.generated.GlobalStreamId gsi = aem.getKey(); + String globalStreamId = gsi.get_componentId() + ":" + gsi.get_streamId(); - String globalStreamId = gsi.get_componentId() + ":" + gsi.get_streamId(); + ackedMap.put(globalStreamId, aem.getValue()); + } - ackedMap.put(globalStreamId, aem.getValue()); + boltStats.getAcked().put(ae.getKey(), ackedMap); } - boltStats.getAcked().put(ae.getKey(), ackedMap); - } + for (Map.Entry> fe + : bs.get_failed().entrySet()) { + Map failedMap = new HashMap<>(); - for (Map.Entry> fe - : bs.get_failed().entrySet()) { - Map failedMap = new HashMap<>(); + for (Map.Entry fem + : fe.getValue().entrySet()) { + backtype.storm.generated.GlobalStreamId gsi = fem.getKey(); - for (Map.Entry fem - : fe.getValue().entrySet()) { - backtype.storm.generated.GlobalStreamId gsi = fem.getKey(); + String globalStreamId = gsi.get_componentId() + ":" + gsi.get_streamId(); - String globalStreamId = gsi.get_componentId() + ":" + gsi.get_streamId(); + failedMap.put(globalStreamId, fem.getValue()); + } - failedMap.put(globalStreamId, fem.getValue()); + boltStats.getFailed().put(fe.getKey(), failedMap); } - boltStats.getFailed().put(fe.getKey(), failedMap); - } + for (Map.Entry> ee + : bs.get_executed().entrySet()) { + Map executedMap = new HashMap<>(); - for (Map.Entry> ee - : bs.get_executed().entrySet()) { - Map executedMap = new HashMap<>(); + for (Map.Entry eem + : ee.getValue().entrySet()) { + backtype.storm.generated.GlobalStreamId gsi = eem.getKey(); - for (Map.Entry eem - : ee.getValue().entrySet()) { - backtype.storm.generated.GlobalStreamId gsi = eem.getKey(); + String globalStreamId = gsi.get_componentId() + ":" + gsi.get_streamId(); - String globalStreamId = gsi.get_componentId() + ":" + gsi.get_streamId(); + executedMap.put(globalStreamId, eem.getValue()); + } - executedMap.put(globalStreamId, eem.getValue()); + boltStats.getExecuted().put(ee.getKey(), executedMap); } - boltStats.getExecuted().put(ee.getKey(), executedMap); - } + for (Map.Entry> ema + : bs.get_execute_ms_avg().entrySet()) { + Map executedMap = new HashMap<>(); - for (Map.Entry> ema - : bs.get_execute_ms_avg().entrySet()) { - Map executedMap = new HashMap<>(); + for (Map.Entry emam + : ema.getValue().entrySet()) { + backtype.storm.generated.GlobalStreamId gsi = emam.getKey(); - for (Map.Entry emam - : ema.getValue().entrySet()) { - backtype.storm.generated.GlobalStreamId gsi = emam.getKey(); + String globalStreamId = gsi.get_componentId() + ":" + gsi.get_streamId(); - String globalStreamId = gsi.get_componentId() + ":" + gsi.get_streamId(); + executedMap.put(globalStreamId, emam.getValue()); + } - executedMap.put(globalStreamId, emam.getValue()); + boltStats.getExecuteMsAvg().put(ema.getKey(), executedMap); } - boltStats.getExecuteMsAvg().put(ema.getKey(), executedMap); - } + for (Map.Entry> pma + : bs.get_process_ms_avg().entrySet()) { + Map processMap = new HashMap<>(); - for (Map.Entry> pma - : bs.get_process_ms_avg().entrySet()) { - Map processMap = new HashMap<>(); + for (Map.Entry pmam + : pma.getValue().entrySet()) { + backtype.storm.generated.GlobalStreamId gsi = pmam.getKey(); - for (Map.Entry pmam - : pma.getValue().entrySet()) { - backtype.storm.generated.GlobalStreamId gsi = pmam.getKey(); + String globalStreamId = gsi.get_componentId() + ":" + gsi.get_streamId(); - String globalStreamId = gsi.get_componentId() + ":" + gsi.get_streamId(); + processMap.put(globalStreamId, pmam.getValue()); + } - processMap.put(globalStreamId, pmam.getValue()); + boltStats.getProcessMsAvg().put(pma.getKey(), processMap); } - boltStats.getProcessMsAvg().put(pma.getKey(), processMap); + specific.setBolt(boltStats); } - - specific.setBolt(boltStats); } - } - if (ess.is_set_spout()) { - backtype.storm.generated.SpoutStats ss = ess.get_spout(); - if (ss != null) { - SpoutStats spoutStats = new SpoutStats(); - spoutStats.setAcked(ss.get_acked()); - spoutStats.setFailed(ss.get_failed()); - spoutStats.setCompleteMsAvg(ss.get_complete_ms_avg()); + if (ess.is_set_spout()) { + backtype.storm.generated.SpoutStats ss = ess.get_spout(); + if (ss != null) { + SpoutStats spoutStats = new SpoutStats(); + spoutStats.setAcked(ss.get_acked()); + spoutStats.setFailed(ss.get_failed()); + spoutStats.setCompleteMsAvg(ss.get_complete_ms_avg()); - specific.setSpout(spoutStats); + specific.setSpout(spoutStats); + } } + + stats.setSpecific(specific); } - stats.setSpecific(specific); + executor.setStats(stats); } - executor.setStats(stats); + executorSummaries.add(executor); } - executorSummaries.add(executor); + topologyInfo.setExecutors(executorSummaries); } - topologyInfo.setExecutors(executorSummaries); - return topologyInfo; } diff --git a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/topology/TopologySubmitter.java b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/topology/TopologySubmitter.java index 8ba4ea5..b8f242c 100644 --- a/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/topology/TopologySubmitter.java +++ b/streamflow-core/streamflow-engine/src/main/java/streamflow/engine/topology/TopologySubmitter.java @@ -102,13 +102,13 @@ public void run() { localCluster.submitTopology(topology.getId(), stormConfig, stormTopology); } } catch (AlreadyAliveException ex) { - LOG.error("The specified topology is already running on the cluster: {}", ex); + LOG.error("The specified topology is already running on the cluster:", ex); } catch (InvalidTopologyException ex) { LOG.error("The specified topology is invalid: " + ex); } catch (FrameworkException ex) { - LOG.error("The topology was unable to load a dependent framework: {}", ex); + LOG.error("The topology was unable to load a dependent framework:", ex); } catch (Exception ex) { - LOG.error("The topology threw an uncaught exception: {}", ex); + LOG.error("The topology threw an uncaught exception:", ex); } } diff --git a/streamflow-core/streamflow-model/src/main/java/streamflow/model/Cluster.java b/streamflow-core/streamflow-model/src/main/java/streamflow/model/Cluster.java index d0f2a95..ec98a5c 100644 --- a/streamflow-core/streamflow-model/src/main/java/streamflow/model/Cluster.java +++ b/streamflow-core/streamflow-model/src/main/java/streamflow/model/Cluster.java @@ -30,7 +30,7 @@ public class Cluster implements Serializable { private Integer nimbusPort = 6627; - private String version = "0.9.1"; + private String version = "0.9.5"; private String logServerHost = "localhost"; diff --git a/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/MonitorConfig.java b/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/MonitorConfig.java new file mode 100644 index 0000000..0c911d2 --- /dev/null +++ b/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/MonitorConfig.java @@ -0,0 +1,91 @@ +/** + * Copyright 2014 Lockheed Martin Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package streamflow.model.config; + +import com.fasterxml.jackson.annotation.JsonAnyGetter; +import com.fasterxml.jackson.annotation.JsonAnySetter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +public class MonitorConfig implements Serializable { + + static Logger LOG = LoggerFactory.getLogger(MonitorConfig.class); + + private boolean enabled = false; + + private int pollingInterval = 60; + + public MonitorConfig() { + } + + public boolean isEnabled() { + return Boolean.parseBoolean( + System.getProperty("monitor.enabled", Boolean.toString(enabled))); + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public int getPollingInterval() { + if (System.getProperty("monitor.pollingInterval") != null) { + try { + pollingInterval = Integer.parseInt(System.getProperty("monitor.pollingInterval")); + } catch (Exception ex) { + } + } + return pollingInterval; + } + + public void setPollingInterval(int pollingInterval) { + this.pollingInterval = pollingInterval; + } + + @Override + public int hashCode() { + int hash = 3; + hash = 43 * hash + (this.enabled ? 1 : 0); + hash = 43 * hash + this.pollingInterval; + return hash; + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final MonitorConfig other = (MonitorConfig) obj; + if (this.enabled != other.enabled) { + return false; + } + if (this.pollingInterval != other.pollingInterval) { + return false; + } + return true; + } + + @Override + public String toString() { + return "MonitorConfig{" + "enabled=" + enabled + ", pollingInterval=" + pollingInterval + '}'; + } +} diff --git a/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/StreamflowConfig.java b/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/StreamflowConfig.java index 9756c9d..76dc7d0 100644 --- a/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/StreamflowConfig.java +++ b/streamflow-core/streamflow-model/src/main/java/streamflow/model/config/StreamflowConfig.java @@ -34,7 +34,9 @@ public class StreamflowConfig implements Serializable { private LoggerConfig logger = new LoggerConfig(); private AuthConfig auth = new AuthConfig(); - + + private MonitorConfig monitor = new MonitorConfig(); + private LocalClusterConfig localCluster = new LocalClusterConfig(); private List clusters = new ArrayList<>(); @@ -84,6 +86,14 @@ public void setAuth(AuthConfig auth) { this.auth = auth; } + public MonitorConfig getMonitor() { + return monitor; + } + + public void setMonitor(MonitorConfig monitor) { + this.monitor = monitor; + } + public LocalClusterConfig getLocalCluster() { return localCluster; } @@ -116,6 +126,7 @@ public int hashCode() { hash = 29 * hash + (this.datastore != null ? this.datastore.hashCode() : 0); hash = 29 * hash + (this.logger != null ? this.logger.hashCode() : 0); hash = 29 * hash + (this.auth != null ? this.auth.hashCode() : 0); + hash = 29 * hash + (this.monitor != null ? this.monitor.hashCode() : 0); hash = 29 * hash + (this.localCluster != null ? this.localCluster.hashCode() : 0); hash = 29 * hash + (this.clusters != null ? this.clusters.hashCode() : 0); return hash; @@ -150,6 +161,10 @@ public boolean equals(Object obj) { || !this.auth.equals(other.auth))) { return false; } + if (this.monitor != other.monitor && (this.monitor == null + || !this.monitor.equals(other.monitor))) { + return false; + } if (this.localCluster != other.localCluster && (this.localCluster == null || !this.localCluster.equals(other.localCluster))) { return false; @@ -165,7 +180,7 @@ public boolean equals(Object obj) { public String toString() { return "StreamFlowConfig{" + "server=" + server + ", proxy=" + proxy + ", datastore=" + datastore + ", logger=" + logger - + ", auth=" + auth + ", localCluster=" + localCluster + + ", auth=" + auth + ", monitor=" + monitor + ", localCluster=" + localCluster + ", clusters=" + clusters + '}'; } } diff --git a/streamflow-core/streamflow-server/src/main/java/streamflow/server/config/GuavaServiceModule.java b/streamflow-core/streamflow-server/src/main/java/streamflow/server/config/GuavaServiceModule.java new file mode 100644 index 0000000..e78f1cb --- /dev/null +++ b/streamflow-core/streamflow-server/src/main/java/streamflow/server/config/GuavaServiceModule.java @@ -0,0 +1,23 @@ +package streamflow.server.config; + +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.Service; +import com.google.inject.AbstractModule; +import com.google.inject.Provides; +import streamflow.model.config.MonitorConfig; +import streamflow.model.config.StreamflowConfig; +import streamflow.server.service.TopologyMonitorService; +import streamflow.util.config.ConfigLoader; + +import java.util.Set; + +public class GuavaServiceModule extends AbstractModule { + + @Override + protected void configure() { + MonitorConfig monitorConfig = ConfigLoader.getConfig().getMonitor(); + if (monitorConfig.isEnabled()) { + bind(TopologyMonitorService.class); + } + } +} diff --git a/streamflow-core/streamflow-server/src/main/java/streamflow/server/config/WebConfig.java b/streamflow-core/streamflow-server/src/main/java/streamflow/server/config/WebConfig.java index f283d20..cd75871 100644 --- a/streamflow-core/streamflow-server/src/main/java/streamflow/server/config/WebConfig.java +++ b/streamflow-core/streamflow-server/src/main/java/streamflow/server/config/WebConfig.java @@ -15,14 +15,18 @@ */ package streamflow.server.config; +//import com.google.common.util.concurrent.ServiceManager; import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.servlet.GuiceServletContextListener; import javax.servlet.ServletContext; import javax.servlet.ServletContextEvent; import javax.servlet.annotation.WebListener; + +import org.apache.storm.guava.util.concurrent.ServiceManager; import streamflow.datastore.config.DatastoreModule; import streamflow.engine.config.EngineModule; +import streamflow.server.service.TopologyMonitorService; import streamflow.service.config.ServiceModule; import streamflow.util.config.ConfigModule; import org.apache.shiro.guice.web.ShiroWebModule; @@ -39,9 +43,17 @@ protected Injector getInjector() { StreamflowEnvironment.setStreamflowHome(System.getenv("STREAMFLOW_HOME")); StreamflowEnvironment.initialize(); - return Guice.createInjector(new ConfigModule(), new DatastoreModule(), - new ServiceModule(), new EngineModule(), new JerseyModule(), + Injector injector = Guice.createInjector(new ConfigModule(), new DatastoreModule(), + new ServiceModule(), new GuavaServiceModule(), new EngineModule(), new JerseyModule(), new SecurityModule(servletContext), ShiroWebModule.guiceFilterModule()); + + // Initialize the service manager to manage daemon services + //ServiceManager manager = injector.getInstance(ServiceManager.class); + //manager.startAsync().awaitHealthy(); + TopologyMonitorService topologyMonitorService = injector.getInstance(TopologyMonitorService.class); + topologyMonitorService.startAsync().awaitRunning(); + + return injector; } @Override diff --git a/streamflow-core/streamflow-server/src/main/java/streamflow/server/service/TopologyMonitorService.java b/streamflow-core/streamflow-server/src/main/java/streamflow/server/service/TopologyMonitorService.java new file mode 100644 index 0000000..cf4670a --- /dev/null +++ b/streamflow-core/streamflow-server/src/main/java/streamflow/server/service/TopologyMonitorService.java @@ -0,0 +1,71 @@ +package streamflow.server.service; + +import com.google.common.util.concurrent.AbstractScheduledService; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import streamflow.model.Topology; +import streamflow.model.config.MonitorConfig; +import streamflow.service.TopologyService; + +import java.util.concurrent.TimeUnit; + +@Singleton +public class TopologyMonitorService extends AbstractScheduledService { + + public static final Logger LOG = LoggerFactory.getLogger(TopologyMonitorService.class); + + private TopologyService topologyService; + private MonitorConfig monitorConfig; + + @Inject + public TopologyMonitorService(TopologyService topologyService, MonitorConfig monitorConfig) { + this.topologyService = topologyService; + this.monitorConfig = monitorConfig; + } + + @Override + protected void startUp() throws Exception { + LOG.info("Topology Status Monitor Started..."); + } + + @Override + protected void runOneIteration() throws Exception { + // Iterate over all of the topologies for each user to check live status + for (Topology topology : topologyService.listAllTopologies()) { + try { + // Get the current live status of the topology + String topologyStatusDesired = topology.getStatus(); + String topologyStatusActual = topologyService.getTopology(topology.getId(), topology.getUserId()).getStatus(); + + if (topologyStatusDesired.equalsIgnoreCase("ACTIVE")) { + // Topology should be submitted, but isn't active so resubmit to desired state + if (!topologyStatusActual.equalsIgnoreCase("ACTIVE")) { + LOG.warn("Topology has a desired state of ACTIVE but is not currently deployed. " + + "Redeploying topology... ID = " + topology.getId() + ", Name = " + topology.getName()); + + // Resubmit the topology using the same settings as originally submitted + Topology submittedTopology = topologyService.submitTopology( + topology.getId(), topology.getUserId(), topology.getClusterId(), + topology.getLogLevel(), topology.getClassLoaderPolicy()); + + if (topology != null && topology.getStatus().equalsIgnoreCase("ACTIVE")) { + LOG.info("Topology redeploy succeeded: ID = " + topology.getId() + ", Name = " + topology.getName()); + } else { + LOG.error("Topology redeploy failed: ID = " + topology.getId() + ", Name = " + topology.getName()); + } + } + } + } catch (Exception ex) { + LOG.error("An exception occurred while checking topology status: ID = " + + topology.getId() + ", Name = " + topology.getName(), ex); + } + } + } + + @Override + protected Scheduler scheduler() { + return Scheduler.newFixedRateSchedule(monitorConfig.getPollingInterval(), monitorConfig.getPollingInterval(), TimeUnit.SECONDS); + } +} diff --git a/streamflow-core/streamflow-server/src/main/webapp/app/topology/topology.js b/streamflow-core/streamflow-server/src/main/webapp/app/topology/topology.js index 969eb3b..1fa95c3 100644 --- a/streamflow-core/streamflow-server/src/main/webapp/app/topology/topology.js +++ b/streamflow-core/streamflow-server/src/main/webapp/app/topology/topology.js @@ -554,7 +554,9 @@ topologyModule.controller('TopologyPropertiesNewController', [ 'storm.messaging.netty.max_wait_ms', 'storm.messaging.netty.min_wait_ms', 'storm.messaging.netty.transfer.batch.size', + 'storm.messaging.netty.socket.backlog', 'storm.messaging.netty.flush.check.interval.ms', + 'storm.messaging.netty.authentication', 'topology.enable.message.timeouts', 'topology.debug', 'topology.workers', @@ -584,8 +586,10 @@ topologyModule.controller('TopologyPropertiesNewController', [ 'topology.kryo.factory', 'topology.tuple.serializer', 'topology.trident.batch.emit.interval.millis', + 'storm.group.mapping.service.cache.duration.secs', 'topology.classpath', - 'topology.environment' + 'topology.environment', + 'topology.bolts.outgoing.overflow.buffer.enable' ]; $scope.add = function() { diff --git a/streamflow-core/streamflow-service/src/main/java/streamflow/service/FrameworkService.java b/streamflow-core/streamflow-service/src/main/java/streamflow/service/FrameworkService.java index 1333051..af854a6 100644 --- a/streamflow-core/streamflow-service/src/main/java/streamflow/service/FrameworkService.java +++ b/streamflow-core/streamflow-service/src/main/java/streamflow/service/FrameworkService.java @@ -1,17 +1,15 @@ /** * Copyright 2014 Lockheed Martin Corporation * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. */ package streamflow.service; @@ -23,7 +21,6 @@ import java.util.Date; import java.util.Enumeration; import java.util.List; -import java.util.UUID; import java.util.jar.JarEntry; import java.util.jar.JarFile; import java.util.zip.ZipEntry; @@ -156,182 +153,233 @@ public byte[] getFrameworkJar(String frameworkId) { return frameworkJarContent; } - + /** * Process the annotations found in a framework jar + * * @param jarFile * @return a FrameworkConfig or null if no annotations were found */ - public FrameworkConfig processFrameworkAnnotations(File jarFile){ - FrameworkConfig config = new FrameworkConfig(); - ArrayList components = new ArrayList(); - String frameworkLevel = null; - boolean foundFrameworkAnnotations = false; - ZipFile zipFile = null; - - try { - zipFile = new ZipFile(jarFile); - - Enumeration entries = zipFile.entries(); - while(entries.hasMoreElements()){ - ZipEntry entry = entries.nextElement(); - String entryName = entry.getName(); - if(entry.isDirectory()){ - if(frameworkLevel != null){ - if(entryName.startsWith(frameworkLevel) == false) frameworkLevel = null; - } - ZipEntry packageInfoEntry = zipFile.getEntry(entryName+"package-info.class"); - if(packageInfoEntry != null){ - InputStream fileInputStream = zipFile.getInputStream(packageInfoEntry); - DataInputStream dstream = new DataInputStream(fileInputStream); - ClassFile cf = new ClassFile(dstream); - String cfName = cf.getName(); - AnnotationsAttribute attr = (AnnotationsAttribute)cf.getAttribute(AnnotationsAttribute.visibleTag); - Annotation annotation = attr.getAnnotation("streamflow.annotations.Framework"); - if(annotation == null) continue; - - frameworkLevel = cfName; - foundFrameworkAnnotations = true; - StringMemberValue frameworkLabel = (StringMemberValue) annotation.getMemberValue("label"); - if(frameworkLabel != null) config.setLabel(frameworkLabel.getValue()); - StringMemberValue frameworkName = (StringMemberValue) annotation.getMemberValue("name"); - if(frameworkName != null) config.setName(frameworkName.getValue()); - StringMemberValue frameworkVersion = (StringMemberValue) annotation.getMemberValue("version"); - if(frameworkVersion != null) config.setVersion(frameworkVersion.getValue()); - - Annotation descriptionAnnotation = attr.getAnnotation("streamflow.annotations.Description"); - if(descriptionAnnotation != null){ - StringMemberValue frameworkDescription = (StringMemberValue) descriptionAnnotation.getMemberValue("value"); - if(frameworkDescription != null) config.setDescription(frameworkDescription.getValue()); - } - - - } - } else if(frameworkLevel != null && entryName.endsWith(".class") && entryName.endsWith("package-info.class") == false){ - ZipEntry packageInfoEntry = zipFile.getEntry(entryName); - InputStream fileInputStream = zipFile.getInputStream(packageInfoEntry); - DataInputStream dstream = new DataInputStream(fileInputStream); - ClassFile cf = new ClassFile(dstream); - String cfName = cf.getName(); - AnnotationsAttribute attr = (AnnotationsAttribute)cf.getAttribute(AnnotationsAttribute.visibleTag); - if(attr == null) continue; - Annotation componentAnnotation = attr.getAnnotation("streamflow.annotations.Component"); - - if(componentAnnotation == null) continue; - - ComponentConfig component = new ComponentConfig(); - component.setMainClass(cf.getName()); - StringMemberValue componentLabel = (StringMemberValue) componentAnnotation.getMemberValue("label"); - if(componentLabel != null) component.setLabel(componentLabel.getValue()); - StringMemberValue componentName = (StringMemberValue) componentAnnotation.getMemberValue("name"); - if(componentName != null) component.setName(componentName.getValue()); - StringMemberValue componentType = (StringMemberValue) componentAnnotation.getMemberValue("type"); - if(componentType != null) component.setType(componentType.getValue()); - StringMemberValue componentIcon = (StringMemberValue) componentAnnotation.getMemberValue("icon"); - if(componentIcon != null) component.setIcon(componentIcon.getValue()); - - Annotation componentDescriptionAnnotation = attr.getAnnotation("streamflow.annotations.Description"); - if(componentDescriptionAnnotation != null){ - StringMemberValue componentDescription = (StringMemberValue) componentDescriptionAnnotation.getMemberValue("value"); - if(componentDescription != null) component.setDescription(componentDescription.getValue()); - } - - Annotation componentInputsAnnotation = attr.getAnnotation("streamflow.annotations.ComponentInputs"); - if(componentInputsAnnotation != null){ - ArrayList inputs = new ArrayList(); - ArrayMemberValue componentInputs = (ArrayMemberValue) componentInputsAnnotation.getMemberValue("value"); - for(MemberValue value : componentInputs.getValue()){ - AnnotationMemberValue annotationMember = (AnnotationMemberValue)value; - Annotation annotationValue = annotationMember.getValue(); - StringMemberValue keyAnnotationValue = (StringMemberValue) annotationValue.getMemberValue("key"); - StringMemberValue descriptionAnnotationValue = (StringMemberValue) annotationValue.getMemberValue("description"); - ComponentInterface inputInterface = new ComponentInterface(); - if(keyAnnotationValue != null) inputInterface.setKey(keyAnnotationValue.getValue()); - if(descriptionAnnotationValue != null) inputInterface.setDescription(descriptionAnnotationValue.getValue()); - inputs.add(inputInterface); - } - - component.setInputs(inputs); - } - - Annotation componentOutputsAnnotation = attr.getAnnotation("streamflow.annotations.ComponentOutputs"); - if(componentOutputsAnnotation != null){ - ArrayList outputs = new ArrayList(); - ArrayMemberValue componentOutputs = (ArrayMemberValue) componentOutputsAnnotation.getMemberValue("value"); - for(MemberValue value : componentOutputs.getValue()){ - AnnotationMemberValue annotationMember = (AnnotationMemberValue)value; - Annotation annotationValue = annotationMember.getValue(); - StringMemberValue keyAnnotationValue = (StringMemberValue) annotationValue.getMemberValue("key"); - StringMemberValue descriptionAnnotationValue = (StringMemberValue) annotationValue.getMemberValue("description"); - ComponentInterface outputInterface = new ComponentInterface(); - if(keyAnnotationValue != null) outputInterface.setKey(keyAnnotationValue.getValue()); - if(descriptionAnnotationValue != null) outputInterface.setDescription(descriptionAnnotationValue.getValue()); - outputs.add(outputInterface); - } - - component.setOutputs(outputs); - } - - - List memberMethods = cf.getMethods(); - if(memberMethods != null){ - ArrayList properties = new ArrayList(); - - for(MethodInfo method : memberMethods){ - AnnotationsAttribute methodAttr = (AnnotationsAttribute) method.getAttribute(AnnotationsAttribute.visibleTag); - if(methodAttr == null) continue; - Annotation propertyAnnotation = methodAttr.getAnnotation("streamflow.annotations.ComponentProperty"); - if(propertyAnnotation == null) continue; - - ComponentProperty property = new ComponentProperty(); - - StringMemberValue propertyName = (StringMemberValue) propertyAnnotation.getMemberValue("name"); - if(propertyName != null) property.setName(propertyName.getValue()); - StringMemberValue propertylabel = (StringMemberValue) propertyAnnotation.getMemberValue("label"); - if(propertylabel != null) property.setLabel(propertylabel.getValue()); - StringMemberValue propertyType = (StringMemberValue) propertyAnnotation.getMemberValue("type"); - if(propertyType != null) property.setType(propertyType.getValue()); - StringMemberValue propertyDefaultValue = (StringMemberValue) propertyAnnotation.getMemberValue("defaultValue"); - if(propertyDefaultValue != null) property.setDefaultValue(propertyDefaultValue.getValue()); - BooleanMemberValue propertyRequired = (BooleanMemberValue)propertyAnnotation.getMemberValue("required"); - if(propertyRequired != null) property.setRequired(propertyRequired.getValue()); - - Annotation methodDescriptionAnnotation = methodAttr.getAnnotation("streamflow.annotations.Description"); - if(methodDescriptionAnnotation != null){ - StringMemberValue methodDescription = (StringMemberValue) methodDescriptionAnnotation.getMemberValue("value"); - if(methodDescription != null) property.setDescription(methodDescription.getValue()); - } - properties.add(property); - } - component.setProperties(properties); - - } - - components.add(component); - } - } - - config.setComponents(components); - - // return null if no framework annotations were located - if(foundFrameworkAnnotations == false) return null; - - return config; - } catch (IOException ex){ + public FrameworkConfig processFrameworkAnnotations(File jarFile) { + FrameworkConfig config = new FrameworkConfig(); + ArrayList components = new ArrayList(); + String frameworkLevel = null; + boolean foundFrameworkAnnotations = false; + ZipFile zipFile = null; + + try { + zipFile = new ZipFile(jarFile); + + Enumeration entries = zipFile.entries(); + while (entries.hasMoreElements()) { + ZipEntry entry = entries.nextElement(); + String entryName = entry.getName(); + if (entry.isDirectory()) { + if (frameworkLevel != null) { + if (entryName.startsWith(frameworkLevel) == false) { + frameworkLevel = null; + } + } + ZipEntry packageInfoEntry = zipFile.getEntry(entryName + "package-info.class"); + if (packageInfoEntry != null) { + InputStream fileInputStream = zipFile.getInputStream(packageInfoEntry); + DataInputStream dstream = new DataInputStream(fileInputStream); + ClassFile cf = new ClassFile(dstream); + String cfName = cf.getName(); + AnnotationsAttribute attr = (AnnotationsAttribute) cf.getAttribute(AnnotationsAttribute.visibleTag); + Annotation annotation = attr.getAnnotation("streamflow.annotations.Framework"); + if (annotation == null) { + continue; + } + + frameworkLevel = cfName; + foundFrameworkAnnotations = true; + StringMemberValue frameworkLabel = (StringMemberValue) annotation.getMemberValue("label"); + if (frameworkLabel != null) { + config.setLabel(frameworkLabel.getValue()); + } + StringMemberValue frameworkName = (StringMemberValue) annotation.getMemberValue("name"); + if (frameworkName != null) { + config.setName(frameworkName.getValue()); + } + StringMemberValue frameworkVersion = (StringMemberValue) annotation.getMemberValue("version"); + if (frameworkVersion != null) { + config.setVersion(frameworkVersion.getValue()); + } + + Annotation descriptionAnnotation = attr.getAnnotation("streamflow.annotations.Description"); + if (descriptionAnnotation != null) { + StringMemberValue frameworkDescription = (StringMemberValue) descriptionAnnotation.getMemberValue("value"); + if (frameworkDescription != null) { + config.setDescription(frameworkDescription.getValue()); + } + } + + } + } else if (frameworkLevel != null && entryName.endsWith(".class") && entryName.endsWith("package-info.class") == false) { + ZipEntry packageInfoEntry = zipFile.getEntry(entryName); + InputStream fileInputStream = zipFile.getInputStream(packageInfoEntry); + DataInputStream dstream = new DataInputStream(fileInputStream); + ClassFile cf = new ClassFile(dstream); + String cfName = cf.getName(); + AnnotationsAttribute attr = (AnnotationsAttribute) cf.getAttribute(AnnotationsAttribute.visibleTag); + if (attr == null) { + continue; + } + Annotation componentAnnotation = attr.getAnnotation("streamflow.annotations.Component"); + + if (componentAnnotation == null) { + continue; + } + + ComponentConfig component = new ComponentConfig(); + component.setMainClass(cf.getName()); + StringMemberValue componentLabel = (StringMemberValue) componentAnnotation.getMemberValue("label"); + if (componentLabel != null) { + component.setLabel(componentLabel.getValue()); + } + StringMemberValue componentName = (StringMemberValue) componentAnnotation.getMemberValue("name"); + if (componentName != null) { + component.setName(componentName.getValue()); + } + StringMemberValue componentType = (StringMemberValue) componentAnnotation.getMemberValue("type"); + if (componentType != null) { + component.setType(componentType.getValue()); + } + StringMemberValue componentIcon = (StringMemberValue) componentAnnotation.getMemberValue("icon"); + if (componentIcon != null) { + component.setIcon(componentIcon.getValue()); + } + + Annotation componentDescriptionAnnotation = attr.getAnnotation("streamflow.annotations.Description"); + if (componentDescriptionAnnotation != null) { + StringMemberValue componentDescription = (StringMemberValue) componentDescriptionAnnotation.getMemberValue("value"); + if (componentDescription != null) { + component.setDescription(componentDescription.getValue()); + } + } + + Annotation componentInputsAnnotation = attr.getAnnotation("streamflow.annotations.ComponentInputs"); + if (componentInputsAnnotation != null) { + ArrayList inputs = new ArrayList(); + ArrayMemberValue componentInputs = (ArrayMemberValue) componentInputsAnnotation.getMemberValue("value"); + for (MemberValue value : componentInputs.getValue()) { + AnnotationMemberValue annotationMember = (AnnotationMemberValue) value; + Annotation annotationValue = annotationMember.getValue(); + StringMemberValue keyAnnotationValue = (StringMemberValue) annotationValue.getMemberValue("key"); + StringMemberValue descriptionAnnotationValue = (StringMemberValue) annotationValue.getMemberValue("description"); + ComponentInterface inputInterface = new ComponentInterface(); + if (keyAnnotationValue != null) { + inputInterface.setKey(keyAnnotationValue.getValue()); + } + if (descriptionAnnotationValue != null) { + inputInterface.setDescription(descriptionAnnotationValue.getValue()); + } + inputs.add(inputInterface); + } + + component.setInputs(inputs); + } + + Annotation componentOutputsAnnotation = attr.getAnnotation("streamflow.annotations.ComponentOutputs"); + if (componentOutputsAnnotation != null) { + ArrayList outputs = new ArrayList(); + ArrayMemberValue componentOutputs = (ArrayMemberValue) componentOutputsAnnotation.getMemberValue("value"); + for (MemberValue value : componentOutputs.getValue()) { + AnnotationMemberValue annotationMember = (AnnotationMemberValue) value; + Annotation annotationValue = annotationMember.getValue(); + StringMemberValue keyAnnotationValue = (StringMemberValue) annotationValue.getMemberValue("key"); + StringMemberValue descriptionAnnotationValue = (StringMemberValue) annotationValue.getMemberValue("description"); + ComponentInterface outputInterface = new ComponentInterface(); + if (keyAnnotationValue != null) { + outputInterface.setKey(keyAnnotationValue.getValue()); + } + if (descriptionAnnotationValue != null) { + outputInterface.setDescription(descriptionAnnotationValue.getValue()); + } + outputs.add(outputInterface); + } + + component.setOutputs(outputs); + } + + List memberMethods = cf.getMethods(); + if (memberMethods != null) { + ArrayList properties = new ArrayList(); + + for (MethodInfo method : memberMethods) { + AnnotationsAttribute methodAttr = (AnnotationsAttribute) method.getAttribute(AnnotationsAttribute.visibleTag); + if (methodAttr == null) { + continue; + } + Annotation propertyAnnotation = methodAttr.getAnnotation("streamflow.annotations.ComponentProperty"); + if (propertyAnnotation == null) { + continue; + } + + ComponentProperty property = new ComponentProperty(); + + StringMemberValue propertyName = (StringMemberValue) propertyAnnotation.getMemberValue("name"); + if (propertyName != null) { + property.setName(propertyName.getValue()); + } + StringMemberValue propertylabel = (StringMemberValue) propertyAnnotation.getMemberValue("label"); + if (propertylabel != null) { + property.setLabel(propertylabel.getValue()); + } + StringMemberValue propertyType = (StringMemberValue) propertyAnnotation.getMemberValue("type"); + if (propertyType != null) { + property.setType(propertyType.getValue()); + } + StringMemberValue propertyDefaultValue = (StringMemberValue) propertyAnnotation.getMemberValue("defaultValue"); + if (propertyDefaultValue != null) { + property.setDefaultValue(propertyDefaultValue.getValue()); + } + BooleanMemberValue propertyRequired = (BooleanMemberValue) propertyAnnotation.getMemberValue("required"); + if (propertyRequired != null) { + property.setRequired(propertyRequired.getValue()); + } + + Annotation methodDescriptionAnnotation = methodAttr.getAnnotation("streamflow.annotations.Description"); + if (methodDescriptionAnnotation != null) { + StringMemberValue methodDescription = (StringMemberValue) methodDescriptionAnnotation.getMemberValue("value"); + if (methodDescription != null) { + property.setDescription(methodDescription.getValue()); + } + } + properties.add(property); + } + component.setProperties(properties); + + } + + components.add(component); + } + } + + config.setComponents(components); + + // return null if no framework annotations were located + if (foundFrameworkAnnotations == false) { + return null; + } + + return config; + } catch (IOException ex) { LOG.error("Error while parsing framework annotations: ", ex); - + throw new EntityInvalidException("Error while parsing framework annotations: " - + ex.getMessage()); - } finally { - if(zipFile != null){ - try { - zipFile.close(); - } catch (IOException e) { - LOG.error("Error while closing framework zip"); - } - } - } - + + ex.getMessage()); + } finally { + if (zipFile != null) { + try { + zipFile.close(); + } catch (IOException e) { + LOG.error("Error while closing framework zip"); + } + } + } + } public FileInfo getFrameworkFileInfo(String frameworkId) { @@ -348,22 +396,19 @@ public FileInfo getFrameworkFileInfo(String frameworkId) { public Framework processFrameworkJar(byte[] frameworkJar, boolean isPublic) { Framework framework = null; - + try { String frameworkHash = DigestUtils.md5Hex(frameworkJar); - + // Write out a temporary file for the jar so it can be processed File tempFrameworkFile = new File(StreamflowEnvironment.getFrameworksDir(), frameworkHash + ".jar"); FileUtils.writeByteArrayToFile(tempFrameworkFile, frameworkJar); - + FrameworkConfig frameworkConfig = processFrameworkConfig(tempFrameworkFile); if (frameworkConfig != null) { - - - String frameworkId = frameworkConfig.getName(); // If the framework already exists, delete it first to clear out children @@ -400,13 +445,13 @@ public Framework processFrameworkJar(byte[] frameworkJar, boolean isPublic) { LOG.error("Exception while processing the framework jar", ex); throw new EntityInvalidException( - "Exception while processing the framework framework: Exception = " - + ex.getMessage()); + "Exception while processing the framework framework: Exception = " + + ex.getMessage()); } return framework; } - + public String storeFrameworkJar(byte[] frameworkJar) { FileInfo frameworkFile = new FileInfo(); frameworkFile.setFileName(IDUtils.randomUUID()); @@ -420,18 +465,18 @@ public String storeFrameworkJar(byte[] frameworkJar) { if (frameworkFile == null) { throw new ServiceException("Unable to save framework jar file"); } - + return frameworkFile.getId(); } - + public FrameworkConfig processFrameworkConfig(File tempFrameworkFile) { FrameworkConfig frameworkConfig = null; try { JarFile frameworkJarFile = new JarFile(tempFrameworkFile.getAbsoluteFile()); - + JarEntry frameworkYamlEntry = frameworkJarFile.getJarEntry("STREAMFLOW-INF/framework.yml"); - + JarEntry frameworkJsonEntry = frameworkJarFile.getJarEntry("STREAMFLOW-INF/framework.json"); if (frameworkYamlEntry != null) { @@ -444,27 +489,27 @@ public FrameworkConfig processFrameworkConfig(File tempFrameworkFile) { } else if (frameworkJsonEntry != null) { String frameworkJson = IOUtils.toString( frameworkJarFile.getInputStream(frameworkJsonEntry)); - + // Attempt to deserialize the inbuilt streams-framework.json frameworkConfig = jsonMapper.readValue( frameworkJson, FrameworkConfig.class); } else { - frameworkConfig = processFrameworkAnnotations(tempFrameworkFile); - if(frameworkConfig == null){ - throw new EntityInvalidException( - "The framework configuration file was not found in the framework jar"); - } + frameworkConfig = processFrameworkAnnotations(tempFrameworkFile); + if (frameworkConfig == null) { + throw new EntityInvalidException( + "The framework configuration file was not found in the framework jar"); + } } } catch (IOException ex) { LOG.error("Error while loaded the framework configuration: ", ex); - + throw new EntityInvalidException("Error while loading the framework configuration: " - + ex.getMessage()); + + ex.getMessage()); } - + return frameworkConfig; } - + public void processFrameworkComponents(Framework framework, FrameworkConfig frameworkConfig, File frameworkFile) { for (ComponentConfig componentConfig : frameworkConfig.getComponents()) { Component component = new Component(); @@ -480,7 +525,7 @@ public void processFrameworkComponents(Framework framework, FrameworkConfig fram componentService.addComponent(component); } } - + public void processFrameworkResources(Framework framework, FrameworkConfig frameworkConfig) { for (ResourceConfig resourceConfig : frameworkConfig.getResources()) { Resource resource = new Resource(); @@ -495,7 +540,7 @@ public void processFrameworkResources(Framework framework, FrameworkConfig frame resourceService.addResource(resource); } } - + public void processFrameworkSerializations(Framework framework, FrameworkConfig frameworkConfig) { // Keep track of the order or the serializations specified in the config int serializationPriority = 0; @@ -513,7 +558,7 @@ public void processFrameworkSerializations(Framework framework, FrameworkConfig serializationService.addSerialization(serialization); } } - + public String loadFrameworkComponentIcon(ComponentConfig componentConfig, File frameworkFile) { String iconId = null; byte[] iconData = null; @@ -521,7 +566,7 @@ public String loadFrameworkComponentIcon(ComponentConfig componentConfig, File f if (componentConfig.getIcon() != null) { try { JarFile frameworkJarFile = new JarFile(frameworkFile); - + JarEntry iconEntry = frameworkJarFile.getJarEntry(componentConfig.getIcon()); if (iconEntry != null) { iconData = IOUtils.toByteArray(frameworkJarFile.getInputStream(iconEntry)); @@ -535,31 +580,31 @@ public String loadFrameworkComponentIcon(ComponentConfig componentConfig, File f try { if (componentConfig.getType().equalsIgnoreCase(Component.STORM_SPOUT_TYPE)) { iconData = IOUtils.toByteArray(Thread.currentThread() - .getContextClassLoader().getResourceAsStream("icons/storm-spout.png")); + .getContextClassLoader().getResourceAsStream("icons/storm-spout.png")); } else if (componentConfig.getType().equalsIgnoreCase(Component.STORM_BOLT_TYPE)) { iconData = IOUtils.toByteArray(Thread.currentThread() - .getContextClassLoader().getResourceAsStream("icons/storm-bolt.png")); + .getContextClassLoader().getResourceAsStream("icons/storm-bolt.png")); } else { iconData = IOUtils.toByteArray(Thread.currentThread() - .getContextClassLoader().getResourceAsStream("icons/storm-trident.png")); + .getContextClassLoader().getResourceAsStream("icons/storm-trident.png")); } } catch (IOException ex) { LOG.error("Error occurred while loading the default component icon: ", ex); } } - + if (iconData != null) { FileInfo iconFile = new FileInfo(); iconFile.setFileName(iconFile.getFileName()); iconFile.setFileType("image/png"); iconFile.setFileSize(iconData.length); iconFile.setContentHash(DigestUtils.md5Hex(iconData)); - + iconFile = fileService.saveFile(iconFile, iconData); iconId = iconFile.getId(); } - + return iconId; } } diff --git a/streamflow-core/streamflow-service/src/main/java/streamflow/service/LogService.java b/streamflow-core/streamflow-service/src/main/java/streamflow/service/LogService.java index 5744b7a..95a1cda 100644 --- a/streamflow-core/streamflow-service/src/main/java/streamflow/service/LogService.java +++ b/streamflow-core/streamflow-service/src/main/java/streamflow/service/LogService.java @@ -166,7 +166,9 @@ public void clearTopologyLog(Topology topology, Cluster cluster) { try { // Delete the local log file from the server - FileUtils.forceDelete(logFile); + if (logFile.exists()) { + FileUtils.forceDelete(logFile); + } } catch (IOException ex) { LOG.error("Error deleting local topology log file: " + logFile.getAbsolutePath()); } diff --git a/streamflow-core/streamflow-service/src/main/java/streamflow/service/TopologyService.java b/streamflow-core/streamflow-service/src/main/java/streamflow/service/TopologyService.java index 1f7eaa7..77e0ef4 100644 --- a/streamflow-core/streamflow-service/src/main/java/streamflow/service/TopologyService.java +++ b/streamflow-core/streamflow-service/src/main/java/streamflow/service/TopologyService.java @@ -112,6 +112,10 @@ public TopologyService(TopologyDao topologyDao, ComponentService componentServic this.streamflowConfig = streamflowConfig; } + public List listAllTopologies() { + return topologyDao.findAll(); + } + public List listTopologies(String userId) { List topologies = topologyDao.findAll(userId); @@ -398,7 +402,7 @@ public Topology initializeTopologyObject(Topology topology) { } catch (ServiceException ex) { LOG.error("Exception while initializing the topology config object", ex); - throw new ServiceException("Exception while intializing the " + throw new ServiceException("Exception while initializing the " + "Topology config object: " + ex.getMessage()); } } @@ -418,9 +422,9 @@ public String generateTopologyJar(Topology topology, Cluster cluster) { jarBuilder.open(); // Keep track of already added dependencies - HashSet frameworkDependencies = new HashSet(); + HashSet frameworkDependencies = new HashSet<>(); - HashSet processedSerializations = new HashSet(); + HashSet processedSerializations = new HashSet<>(); TopologyConfig topologyConfig = topology.getDeployedConfig(); @@ -663,7 +667,7 @@ private void clearTopologyProject(String projectId) { FileUtils.forceDelete(new File(StreamflowEnvironment.getTopologiesDir(), projectId + ".jar")); } catch (IOException ex) { - LOG.error("Exception while clearing the topology project: ", ex); + //LOG.error("Exception while clearing the topology project: ", ex); } } } diff --git a/streamflow-core/streamflow-service/src/main/resources/icons/storm-bolt.png b/streamflow-core/streamflow-service/src/main/resources/icons/storm-bolt.png index 57e410e..7f2e842 100644 Binary files a/streamflow-core/streamflow-service/src/main/resources/icons/storm-bolt.png and b/streamflow-core/streamflow-service/src/main/resources/icons/storm-bolt.png differ diff --git a/streamflow-core/streamflow-service/src/main/resources/icons/storm-spout.png b/streamflow-core/streamflow-service/src/main/resources/icons/storm-spout.png index 11d8d57..7ce1401 100644 Binary files a/streamflow-core/streamflow-service/src/main/resources/icons/storm-spout.png and b/streamflow-core/streamflow-service/src/main/resources/icons/storm-spout.png differ diff --git a/streamflow-core/streamflow-util/src/main/java/streamflow/util/config/ConfigModule.java b/streamflow-core/streamflow-util/src/main/java/streamflow/util/config/ConfigModule.java index 5feeb81..5927608 100644 --- a/streamflow-core/streamflow-util/src/main/java/streamflow/util/config/ConfigModule.java +++ b/streamflow-core/streamflow-util/src/main/java/streamflow/util/config/ConfigModule.java @@ -16,15 +16,9 @@ package streamflow.util.config; import com.google.inject.AbstractModule; -import streamflow.model.config.AuthConfig; -import streamflow.model.config.DatastoreConfig; -import streamflow.model.config.StreamflowConfig; -import streamflow.model.config.LoggerConfig; -import streamflow.model.config.ProxyConfig; -import streamflow.model.config.ServerConfig; +import streamflow.model.config.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import streamflow.model.config.LocalClusterConfig; public class ConfigModule extends AbstractModule { @@ -37,6 +31,7 @@ protected void configure() { bind(StreamflowConfig.class).toInstance(streamflowConfig); bind(ServerConfig.class).toInstance(streamflowConfig.getServer()); bind(AuthConfig.class).toInstance(streamflowConfig.getAuth()); + bind(MonitorConfig.class).toInstance(streamflowConfig.getMonitor()); bind(ProxyConfig.class).toInstance(streamflowConfig.getProxy()); bind(LoggerConfig.class).toInstance(streamflowConfig.getLogger()); bind(LocalClusterConfig.class).toInstance(streamflowConfig.getLocalCluster()); diff --git a/streamflow-frameworks/twitter-framework/src/main/resources/STREAMFLOW-INF/framework.yml b/streamflow-frameworks/twitter-framework/src/main/resources/STREAMFLOW-INF/framework.yml new file mode 100644 index 0000000..df29380 --- /dev/null +++ b/streamflow-frameworks/twitter-framework/src/main/resources/STREAMFLOW-INF/framework.yml @@ -0,0 +1,44 @@ +name: twitter-framework +label: Twitter Framework +version: ${project.version} +description: Spouts and Bolts supporting Twitter functionality + +components: + - name: twitter-sample-spout + label: Twitter Sample Spout + type: storm-spout + description: Utilizes Twitter Streaming API to stream of 1% Twitter data for analysis. Twitter OAuth credentials for you application are required for use. + mainClass: streamflow.spout.twitter.TwitterSampleSpout + icon: icons/twitter.png + properties: + - name: oauth-consumer-key + label: OAuth Consumer Key + type: text + description: Twitter OAuth Consumer Key + defaultValue: + required: true + - name: oauth-consumer-secret + label: OAuth Consumer Secret + type: text + description: Twitter OAuth Consumer Secret + defaultValue: + required: true + - name: oauth-access-token + label: OAuth Access Token + type: text + description: Twitter OAuth Access Token + defaultValue: + required: true + - name: oauth-access-token-secret + label: OAuth Access Token Secret + type: text + description: Twitter OAuth Access Token Secret + defaultValue: + required: true + outputs: + - key: default + description: Twitter Status + + +#serializations: +# - typeClass: twitter4j.Status