eventTableList = new ArrayList<>();
- SiddhiQLParser.Execution_planContext ctx = (((SiddhiQLParser.ParseContext)tree).execution_plan());
-
- for (SiddhiQLParser.Definition_tableContext executionElementContext : ctx.definition_table()) {
- String query = (String) eval.visit(executionElementContext);
- eventTableList.add(query);
- }
-
- return eventTableList;
- }
-
- /**
- * {@inheritDoc}
- *
- * The default implementation returns the result of calling
- * {@link #visitChildren} on {@code ctx}.
- *
- * @param ctx
- */
- @Override
- public Object visitParse(@NotNull SiddhiQLParser.ParseContext ctx) {
- return visit(ctx.execution_plan());
- }
-
- /**
- * {@inheritDoc}
- *
- * The default implementation returns the result of calling
- * {@link #visitChildren} on {@code ctx}.
- *
- * @param ctx
- */
- @Override
- public List visitExecution_plan(@NotNull SiddhiQLParser.Execution_planContext ctx) {
- List stringQueryList = new ArrayList();
- for (SiddhiQLParser.Execution_elementContext executionElementContext : ctx.execution_element()) {
- String query = (String) visit(executionElementContext);
- stringQueryList.add(query);
- }
- return stringQueryList;
- }
-
- /**
- * {@inheritDoc}
- *
- * Returns the string rule related for this section.
- *
- * @param ctx
- */
- @Override
- public String visitQuery(@NotNull SiddhiQLParser.QueryContext ctx) {
- int a = ctx.start.getStartIndex();
- int b = ctx.stop.getStopIndex();
- Interval interval = new Interval(a,b);
- return ctx.start.getInputStream().getText(interval);
- }
-
- /**
- * {@inheritDoc}
- *
- * Returns the string rule related for this section.
- *
- * @param ctx
- */
- @Override
- public String visitPartition(@NotNull SiddhiQLParser.PartitionContext ctx) {
- int a = ctx.start.getStartIndex();
- int b = ctx.stop.getStopIndex();
- Interval interval = new Interval(a,b);
- return ctx.start.getInputStream().getText(interval);
- }
-
- @Override
- public String visitDefinition_table(@NotNull SiddhiQLParser.Definition_tableContext ctx){
- int a = ctx.start.getStartIndex();
- int b = ctx.stop.getStopIndex();
- Interval interval = new Interval(a,b);
- return ctx.start.getInputStream().getText(interval);
-
- }
-}
diff --git a/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/manager/StormManagerServer.java b/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/manager/StormManagerServer.java
deleted file mode 100644
index c7ff33c99..000000000
--- a/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/manager/StormManagerServer.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.wso2.carbon.event.processor.core.internal.storm.manager;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.hazelcast.core.HazelcastInstance;
-import com.hazelcast.core.ILock;
-import com.hazelcast.core.IMap;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TTransportException;
-import org.wso2.carbon.event.processor.common.storm.manager.service.StormManagerService;
-import org.wso2.carbon.event.processor.core.internal.ds.EventProcessorValueHolder;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-public class StormManagerServer {
-
- private static final String STORM_HZ_MAP_ACTIVE_MANAGER_KEY = "storm.hazelcast.map.active.manager.key";
- private static final String STORM_ROLE_TO_MEMBERSHIP_HZ_MAP = "storm.role.membership.hazelcast.map";
-
- private static Log log = LogFactory.getLog(StormManagerServer.class);
- private TThreadPoolServer stormManagerServer;
- private StormManagerServiceImpl stormManagerService;
- private IMap roleToMembershipMap;
- HazelcastInstance hazelcastInstance;
- private String myHazelcastId;
- private Future stateChecker = null;
- private ScheduledExecutorService executorService;
-
- public StormManagerServer(String hostName, int port) {
-
- try {
- stormManagerService = new StormManagerServiceImpl(hostName + ":" + port);
- TServerSocket serverTransport = new TServerSocket(
- new InetSocketAddress(hostName, port));
- StormManagerService.Processor processor =
- new StormManagerService.Processor(stormManagerService);
- stormManagerServer = new TThreadPoolServer(
- new TThreadPoolServer.Args(serverTransport).processor(processor));
- Thread thread = new Thread(new ServerThread(stormManagerServer));
- thread.start();
-
- log.info("CEP Storm Management Thrift Server started on " + hostName + ":" + port);
- executorService = new ScheduledThreadPoolExecutor(3,new ThreadFactoryBuilder().
- setNameFormat("Thread pool- component - StormManagerServer.executorService").build());
- } catch (TTransportException e) {
- log.error("Cannot start Storm Manager Server on " + hostName + ":" + port, e);
- }
- }
-
- public void setHzaelCastInstance(HazelcastInstance hazelcastInstance){
- this.hazelcastInstance = hazelcastInstance;
- this.roleToMembershipMap = hazelcastInstance.getMap(STORM_ROLE_TO_MEMBERSHIP_HZ_MAP);
- myHazelcastId = hazelcastInstance.getCluster().getLocalMember().getUuid();
-
- }
-
- /**
- * To stop the server
- */
- public void stop() {
- stormManagerServer.stop();
- executorService.shutdown();
-
- if (stateChecker != null){
- stateChecker.cancel(false);
- }
- }
-
- public void onExecutionPlanRemove(String excPlanName, int tenantId){
- // Delete all end points of the removed execution plan from manager service.
- stormManagerService.deleteExecPlanEndpoints(tenantId, excPlanName);
- }
-
- public void setStormCoordinator(boolean isCoordinator) {
- stormManagerService.setStormCoordinator(isCoordinator);
-
- if (!isCoordinator){
- stateChecker = executorService.schedule(new PeriodicStateChanger(), 10000, TimeUnit.MILLISECONDS);
- }
- }
-
- public boolean isStormCoordinator() {
- return stormManagerService.isStormCoordinator();
- }
-
- static class ServerThread implements Runnable {
- private TServer server;
-
- ServerThread(TServer server) {
- this.server = server;
- }
-
- public void run() {
- this.server.serve();
- }
- }
-
- public void verifyState(){
- if (isStormCoordinator() && roleToMembershipMap != null &&
- roleToMembershipMap.get(STORM_HZ_MAP_ACTIVE_MANAGER_KEY) != null &&
- !roleToMembershipMap.get(STORM_HZ_MAP_ACTIVE_MANAGER_KEY).equals(myHazelcastId)){
-
- log.info("Resigning as storm coordinator as there's another storm coordinator available in the cluster with member id "
- + roleToMembershipMap.get(STORM_HZ_MAP_ACTIVE_MANAGER_KEY));
-
- setStormCoordinator(false);
- }
- }
-
- public synchronized void tryBecomeCoordinator() {
- HazelcastInstance hazelcastInstance = EventProcessorValueHolder.getHazelcastInstance();
- if (hazelcastInstance != null) {
- if(!isStormCoordinator()) {
- ILock lock = hazelcastInstance.getLock("StormCoordinator");
- boolean isCoordinator = lock.tryLock();
- setStormCoordinator(isCoordinator);
- if (isCoordinator) {
- log.info("Node became the Storm coordinator with member id " + myHazelcastId);
- if (roleToMembershipMap != null){
- roleToMembershipMap.put(STORM_HZ_MAP_ACTIVE_MANAGER_KEY, myHazelcastId);
- }
- }
- }
- }
- }
-
- class PeriodicStateChanger implements Runnable {
-
- @Override
- public void run() {
- tryBecomeCoordinator();
- }
- }
-
-
-}
diff --git a/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/manager/StormManagerServiceImpl.java b/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/manager/StormManagerServiceImpl.java
deleted file mode 100644
index 52e948e41..000000000
--- a/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/manager/StormManagerServiceImpl.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.wso2.carbon.event.processor.core.internal.storm.manager;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.thrift.TException;
-import org.wso2.carbon.event.processor.common.storm.manager.service.StormManagerService;
-import org.wso2.carbon.event.processor.common.storm.manager.service.exception.EndpointNotFoundException;
-import org.wso2.carbon.event.processor.common.storm.manager.service.exception.NotStormCoordinatorException;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class StormManagerServiceImpl implements StormManagerService.Iface {
- private static Log log = LogFactory.getLog(StormManagerServiceImpl.class);
- public static final long MILLISECONDS_PER_MINUTE = 60000;
- private ConcurrentHashMap> stormReceivers = new ConcurrentHashMap>();
- private ConcurrentHashMap> cepPublishers = new ConcurrentHashMap>();
- private boolean isStormCoordinator;
- private String hostPort;
-
- public StormManagerServiceImpl(String hostPort) {
- this.hostPort = hostPort;
- }
-
- @Override
- public void registerStormReceiver(int tenantId, String executionPlanName, String hostName, int port) throws NotStormCoordinatorException, TException {
- if (!isStormCoordinator) {
- throw new NotStormCoordinatorException(hostPort + " not a storm coordinator");
- }
- insertToCollection(stormReceivers, constructKey(tenantId, executionPlanName), new Endpoint(port, hostName));
- }
-
- @Override
- public void registerCEPPublisher(int tenantId, String executionPlanName, String hostName, int port) throws NotStormCoordinatorException, TException {
- if (!isStormCoordinator) {
- throw new NotStormCoordinatorException(hostPort + " not a storm coordinator");
- }
- insertToCollection(cepPublishers, constructKey(tenantId, executionPlanName), new Endpoint(port, hostName));
- }
-
- @Override
- public synchronized String getStormReceiver(int tenantId, String executionPlanName, String cepReceiverHostName) throws NotStormCoordinatorException, EndpointNotFoundException, TException {
- if (!isStormCoordinator) {
- throw new NotStormCoordinatorException(hostPort + " not a storm coordinator");
- }
- Set endpointSet = stormReceivers.get(constructKey(tenantId, executionPlanName));
- Endpoint selectedEndpoint = getEndpoint(endpointSet, cepReceiverHostName);
-
- if (null != selectedEndpoint) {
- return selectedEndpoint.getHostName() + ":" + selectedEndpoint.getPort();
- } else {
- throw new EndpointNotFoundException("No Storm Receiver for executionPlanName: " + executionPlanName + " of tenantId:" + tenantId + " for CEP Receiver form:" + cepReceiverHostName);
- }
-
- }
-
- @Override
- public synchronized String getCEPPublisher(int tenantId, String executionPlanName, String stormPublisherHostName) throws NotStormCoordinatorException, EndpointNotFoundException, TException {
- if (!isStormCoordinator) {
- throw new NotStormCoordinatorException(hostPort + " not a storm coordinator");
- }
- Set endpointSet = cepPublishers.get(constructKey(tenantId, executionPlanName));
- Endpoint selectedEndpoint = getEndpoint(endpointSet, stormPublisherHostName);
-
- if (null != selectedEndpoint) {
- return selectedEndpoint.getHostName() + ":" + selectedEndpoint.getPort();
- } else {
- throw new EndpointNotFoundException("No CEP Publisher for executionPlanName: " + executionPlanName + " of tenantId:" + tenantId + " for Storm Publisher form:" + stormPublisherHostName);
- }
- }
-
- public synchronized void deleteExecPlanEndpoints(int tenantId, String executionPlanName){
- Set endpointSet = cepPublishers.get(constructKey(tenantId, executionPlanName));
- if (endpointSet != null){
- cepPublishers.remove(constructKey(tenantId, executionPlanName));
- }
-
- endpointSet = stormReceivers.get(constructKey(tenantId, executionPlanName));
- if (endpointSet != null){
- stormReceivers.remove(constructKey(tenantId, executionPlanName));
- }
-
- log.info("Removed all end point details related to '" + constructKey(tenantId, executionPlanName) + "' from Manager service.");
- }
-
- private synchronized Endpoint getEndpoint(Set endpointSet, String requesterIp) {
- Endpoint selectedEndpoint = null;
-
- Set sameHostEndpoints = new HashSet();
- if (endpointSet != null && !endpointSet.isEmpty()) {
-
- if (!"".equals(requesterIp)) {
- for (Endpoint endpoint : endpointSet) {
- if (endpoint.getHostName().equals(requesterIp)) {
- sameHostEndpoints.add(endpoint);
- }
- }
- }
-
- // If there's a storm receivers/cep publishers in the same host as requester IP select among them
- if (!sameHostEndpoints.isEmpty()) {
- selectedEndpoint = selectEndpoint(sameHostEndpoints);
- }else{
- selectedEndpoint = selectEndpoint(endpointSet);
- }
-
- if (selectedEndpoint != null) {
- selectedEndpoint.setConnectionCount(selectedEndpoint.getConnectionCount() + 1);
- }
- }
- return selectedEndpoint;
- }
-
- private synchronized Endpoint selectEndpoint(Set endpointSet) {
- Endpoint selectedEndpoint = null;
- int minConnectionCount = Integer.MAX_VALUE;
- for (Endpoint endpoint : endpointSet) {
- if (endpoint.getConnectionCount() < minConnectionCount){
- if (endpoint.getLastRegisterTimestamp() >= (System.currentTimeMillis() - MILLISECONDS_PER_MINUTE)){
- minConnectionCount = endpoint.getConnectionCount();
- selectedEndpoint = endpoint;
- }else{
-
- log.warn("Ignoring endpoint " + endpoint.getHostName() + ":" + endpoint.getPort() + " because it has not sent a heart beat for "
- + (int) Math.floor((System.currentTimeMillis() - endpoint.getLastRegisterTimestamp()) / MILLISECONDS_PER_MINUTE) + " min(s)");
- }
- }
- }
- return selectedEndpoint;
- }
-
- private static synchronized void insertToCollection(ConcurrentHashMap> collection, String key, Endpoint endpoint) {
- Set endpointSet = collection.get(key);
- boolean isHeartbeat = false;
-
- if (endpointSet == null) {
- endpointSet = new HashSet();
- collection.put(key, endpointSet);
- }else{
- for (Endpoint currentEndpoint : endpointSet){
- if (currentEndpoint.equals(endpoint)){
- isHeartbeat = true;
- currentEndpoint.updateLastRegisteredTimestamp();
- break;
- }
- }
- }
-
- if (!isHeartbeat){
- endpointSet.add(endpoint);
- }
- }
-
- private static String constructKey(int tenantId, String executionPlanName) {
- return tenantId + ":" + executionPlanName;
- }
-
- public void setStormCoordinator(boolean isStormCoordinator) {
- this.isStormCoordinator = isStormCoordinator;
- }
-
- private class Endpoint {
- private int port;
- private String hostName;
- private int connectionCount = 0;
- private long lastRegisterTimestamp;
-
- Endpoint(int port, String hostName) {
- this.port = port;
- this.hostName = hostName;
- this.lastRegisterTimestamp = System.currentTimeMillis();
- }
-
- public long getLastRegisterTimestamp(){return lastRegisterTimestamp;}
-
- public void updateLastRegisteredTimestamp(){
- lastRegisterTimestamp = System.currentTimeMillis();
- }
-
- public String getHostName() {
- return hostName;
- }
-
- public int getPort() {
- return port;
- }
-
- public void setConnectionCount(int connections) {
- connectionCount = connections;
- }
-
- public int getConnectionCount() {
- return connectionCount;
- }
-
- @Override
- public boolean equals(Object object){
- if (object == null || (this.getClass() != object.getClass())){
- return false;
- }
- final Endpoint argument = (Endpoint)object;
-
- return ((this.hostName.equals(argument.getHostName())) && (this.port == argument.getPort()));
- }
- }
-
- public boolean isStormCoordinator() {
- return isStormCoordinator;
- }
-}
diff --git a/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/status/monitor/StormStatusHolderInitializer.java b/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/status/monitor/StormStatusHolderInitializer.java
deleted file mode 100644
index 1feb2c6f9..000000000
--- a/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/status/monitor/StormStatusHolderInitializer.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.wso2.carbon.event.processor.core.internal.storm.status.monitor;
-
-import com.hazelcast.core.HazelcastInstance;
-import com.hazelcast.core.IMap;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.wso2.carbon.event.processor.core.internal.ds.EventProcessorValueHolder;
-import org.wso2.carbon.event.processor.core.internal.storm.StormTopologyManager;
-import org.wso2.carbon.event.processor.core.util.DistributedModeConstants;
-import org.wso2.carbon.event.processor.core.util.ExecutionPlanStatusHolder;
-
-/**
- * Utility to initialize the statusHolder.
- */
-public class StormStatusHolderInitializer {
- private static Log log = LogFactory.getLog(StormStatusHolderInitializer.class);
-
- public static void initializeStatusHolder(String executionPlanName, int tenantId,
- int parallel) {
- String stormTopologyName = StormTopologyManager.getTopologyName(executionPlanName, tenantId);
-
- HazelcastInstance hazelcastInstance = EventProcessorValueHolder.getHazelcastInstance();
- if (hazelcastInstance != null && hazelcastInstance.getLifecycleService().isRunning()) {
- IMap executionPlanStatusHolderIMap = hazelcastInstance.getMap(DistributedModeConstants.STORM_STATUS_MAP);
- ExecutionPlanStatusHolder executionPlanStatusHolder = new ExecutionPlanStatusHolder(parallel);
- executionPlanStatusHolderIMap.put(stormTopologyName, executionPlanStatusHolder);
- } else {
- log.error("Couldn't initialize status info object for execution plan: " + executionPlanName +
- ", for tenant-ID: " + tenantId
- + " as the hazelcast instance is not active or not available.");
- }
- }
-}
diff --git a/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/status/monitor/StormStatusMapListener.java b/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/status/monitor/StormStatusMapListener.java
deleted file mode 100644
index 779adb1d7..000000000
--- a/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/status/monitor/StormStatusMapListener.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.wso2.carbon.event.processor.core.internal.storm.status.monitor;
-
-import com.hazelcast.core.EntryEvent;
-import com.hazelcast.core.HazelcastInstance;
-import com.hazelcast.map.listener.EntryAddedListener;
-import com.hazelcast.map.listener.EntryUpdatedListener;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.wso2.carbon.event.processor.core.internal.ds.EventProcessorValueHolder;
-import org.wso2.carbon.event.processor.core.internal.storm.StormTopologyManager;
-import org.wso2.carbon.event.processor.core.internal.storm.status.monitor.exception.DeploymentStatusMonitorException;
-import org.wso2.carbon.event.processor.core.util.DistributedModeConstants;
-
-public class StormStatusMapListener {
-
- private static final Log log = LogFactory.getLog(StormStatusMapListener.class);
-
- private final String listenerId;
- private final HazelcastInstance hazelcastInstance;
- private final StormStatusMonitor stormStatusMonitor;
- private String executionPlanName;
- private int tenantId;
-
- public StormStatusMapListener(String executionPlanName, int tenantId,
- StormStatusMonitor stormStatusMonitor)
- throws DeploymentStatusMonitorException {
- hazelcastInstance = EventProcessorValueHolder.getHazelcastInstance();
- if (hazelcastInstance == null) {
- throw new DeploymentStatusMonitorException("Couldn't initialize Distributed Deployment Status monitor as" +
- " the hazelcast instance is not available. Enable clustering and restart the server"); //not giving context info, since this is not a per execution plan or tenant specific exception.
- } else if (!hazelcastInstance.getLifecycleService().isRunning()) {
- throw new DeploymentStatusMonitorException("Couldn't initialize Distributed Deployment Status monitor as" +
- " the hazelcast instance is not active."); //not giving context info, since this is not a per execution plan or tenant specific exception.
- }
- this.executionPlanName = executionPlanName;
- this.tenantId = tenantId;
- String stormTopologyName = StormTopologyManager.getTopologyName(executionPlanName, tenantId);
- listenerId = hazelcastInstance.getMap(DistributedModeConstants.STORM_STATUS_MAP).
- addEntryListener(new MapListenerImpl(), stormTopologyName, true);
- this.stormStatusMonitor = stormStatusMonitor;
- }
-
- /**
- * Clean up method, removing the entry listener.
- */
- public void removeEntryListener() {
- if (hazelcastInstance == null) {
- log.error("Couldn't unregister entry listener for execution plan: " + executionPlanName +
- ", for tenant-ID: " + tenantId
- + " as the hazelcast instance is not available.");
- } else if (hazelcastInstance.getLifecycleService().isRunning()) {
- log.error("Couldn't unregister entry listener for execution plan: " + executionPlanName +
- ", for tenant-ID: " + tenantId
- + " as the hazelcast instance is not active.");
- } else {
- hazelcastInstance.getMap(DistributedModeConstants.STORM_STATUS_MAP).removeEntryListener(listenerId);
- }
- }
-
- private class MapListenerImpl implements EntryAddedListener, EntryUpdatedListener {
- @Override
- public void entryAdded(EntryEvent entryEvent) {
- if (!entryEvent.getMember().localMember()) {
- stormStatusMonitor.hazelcastListenerCallback();
- }
- }
-
- @Override
- public void entryUpdated(EntryEvent entryEvent) {
- if (!entryEvent.getMember().localMember()) {
- stormStatusMonitor.hazelcastListenerCallback();
- }
- }
- }
-}
diff --git a/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/status/monitor/StormStatusMonitor.java b/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/status/monitor/StormStatusMonitor.java
deleted file mode 100644
index 9af4fcf50..000000000
--- a/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/status/monitor/StormStatusMonitor.java
+++ /dev/null
@@ -1,426 +0,0 @@
-/*
- * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.wso2.carbon.event.processor.core.internal.storm.status.monitor;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.hazelcast.core.HazelcastInstance;
-import com.hazelcast.core.IMap;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.wso2.carbon.context.PrivilegedCarbonContext;
-import org.wso2.carbon.databridge.commons.thrift.utils.HostAddressFinder;
-import org.wso2.carbon.event.processor.core.internal.ds.EventProcessorValueHolder;
-import org.wso2.carbon.event.processor.core.internal.storm.StormTopologyManager;
-import org.wso2.carbon.event.processor.core.internal.storm.status.monitor.exception.DeploymentStatusMonitorException;
-import org.wso2.carbon.event.processor.core.util.DistributedModeConstants;
-import org.wso2.carbon.event.processor.core.util.ExecutionPlanStatusHolder;
-import org.wso2.carbon.event.processor.manager.commons.transport.server.ConnectionCallback;
-
-import java.net.SocketException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class StormStatusMonitor implements ConnectionCallback {
-
- private static final Log log = LogFactory.getLog(StormStatusMonitor.class);
-
- private final String stormTopologyName;
- private final String executionPlanName;
- private final String executionPlanStatusHolderKey;
- private final ExecutorService executorService;
- private final int lockTimeout;
- private final String tenantDomain;
- private String hostIp = null;
- private AtomicInteger connectedCepReceiversCount;
- private int importedStreamsCount = 0;
- private AtomicInteger connectedPublisherBoltsCount;
-
- public StormStatusMonitor(int tenantId, String executionPlanName, int importedStreamsCount)
- throws DeploymentStatusMonitorException {
- if (EventProcessorValueHolder.getHazelcastInstance() == null) {
- throw new DeploymentStatusMonitorException("Couldn't initialize Distributed Deployment Status monitor as" +
- " the hazelcast instance is null. Enable clustering and restart the server");
- }
- executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().
- setNameFormat("Thread pool- component - StormStatusMonitor.executorService;tenantId - " +
- tenantId + ";executionPlanName - " + executionPlanName).build());
- tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
- connectedCepReceiversCount = new AtomicInteger(0);
- connectedPublisherBoltsCount = new AtomicInteger(0);
- try {
- hostIp = HostAddressFinder.findAddress("localhost");
- } catch (SocketException e) {
- //do nothing. Let this be retried in the callbacks.
- }
- this.importedStreamsCount = importedStreamsCount;
- this.executionPlanName = executionPlanName;
- this.stormTopologyName = StormTopologyManager.getTopologyName(executionPlanName, tenantId);
- this.executionPlanStatusHolderKey = DistributedModeConstants.STORM_STATUS_MAP + "." + stormTopologyName;
- lockTimeout = EventProcessorValueHolder.getStormDeploymentConfiguration().getStatusLockTimeout();
- executorService.execute(new GlobalStatUpdater());
- }
-
- @Override
- public void onCepReceiverConnect() {
- HazelcastInstance hazelcastInstance = EventProcessorValueHolder.getHazelcastInstance();
- if (hazelcastInstance != null && hazelcastInstance.getLifecycleService().isRunning()) {
- IMap executionPlanStatusHolderIMap = hazelcastInstance.getMap(DistributedModeConstants.STORM_STATUS_MAP);
- try {
- if (hostIp == null) {
- hostIp = HostAddressFinder.findAddress("localhost");
- }
- if (executionPlanStatusHolderIMap.tryLock(executionPlanStatusHolderKey, lockTimeout, TimeUnit.MILLISECONDS)) {
- try {
- ExecutionPlanStatusHolder executionPlanStatusHolder =
- executionPlanStatusHolderIMap.get(stormTopologyName);
- if (executionPlanStatusHolder == null) {
- log.error("Couldn't increment connected CEP receivers count for execution plan: " + executionPlanName +
- ", for tenant-domain: " + tenantDomain
- + " as status object not initialized by manager.");
- } else {
- executionPlanStatusHolder.setCEPReceiverStatus(hostIp, connectedCepReceiversCount.incrementAndGet(), importedStreamsCount);
- executionPlanStatusHolderIMap.replace(stormTopologyName, executionPlanStatusHolder);
- if (log.isDebugEnabled()) {
- log.debug("Incremented connected CEP receiver count as " + connectedCepReceiversCount.get() +
- " for execution plan: " + executionPlanName + ", for tenant-domain: " + tenantDomain
- + ", for IP address: " + hostIp);
- }
- }
- } finally {
- executionPlanStatusHolderIMap.unlock(executionPlanStatusHolderKey);
- }
- } else {
- log.error("Couldn't increment connected CEP receivers count for execution plan: " + executionPlanName +
- ", for tenant-domain: " + tenantDomain
- + " as the hazelcast lock acquisition failed.");
- }
- } catch (InterruptedException e) {
- log.error("Couldn't increment connected CEP receivers count for execution plan: " + executionPlanName +
- ", for tenant-domain: " + tenantDomain
- + " as the hazelcast lock acquisition was interrupted.", e);
- Thread.currentThread().interrupt();
- } catch (SocketException e) {
- log.error("Couldn't increment connected CEP receivers count for execution plan: " + executionPlanName +
- ", for tenant-domain: " + tenantDomain
- + " as the host IP couldn't be found for this node.", e);
- }
- } else {
- log.error("Couldn't increment connected CEP receivers count for execution plan: " + executionPlanName +
- ", for tenant-domain: " + tenantDomain
- + " as the hazelcast instance is not active or not available.");
- }
- }
-
- @Override
- public void onCepReceiverDisconnect() {
- HazelcastInstance hazelcastInstance = EventProcessorValueHolder.getHazelcastInstance();
- if (hazelcastInstance != null && hazelcastInstance.getLifecycleService().isRunning()) {
- IMap executionPlanStatusHolderIMap = hazelcastInstance.getMap(DistributedModeConstants.STORM_STATUS_MAP);
- try {
- if (hostIp == null) {
- hostIp = HostAddressFinder.findAddress("localhost");
- }
- if (executionPlanStatusHolderIMap.tryLock(executionPlanStatusHolderKey, lockTimeout, TimeUnit.MILLISECONDS)) {
- try {
- ExecutionPlanStatusHolder executionPlanStatusHolder =
- executionPlanStatusHolderIMap.get(stormTopologyName);
- if (executionPlanStatusHolder == null) {
- log.error("Couldn't decrement connected CEP receivers count for execution plan: " + executionPlanName +
- ", for tenant-domain: " + tenantDomain
- + " as status object not initialized by manager.");
- } else {
- executionPlanStatusHolder.setCEPReceiverStatus(hostIp, connectedCepReceiversCount.decrementAndGet(), importedStreamsCount);
- executionPlanStatusHolderIMap.replace(stormTopologyName, executionPlanStatusHolder);
- if (log.isDebugEnabled()) {
- log.debug("Decremented connected CEP receiver count as " + connectedCepReceiversCount.get() +
- " for execution plan: " + executionPlanName + ", for tenant-domain: " + tenantDomain
- + ", for IP address: " + hostIp);
- }
- }
- } finally {
- executionPlanStatusHolderIMap.unlock(executionPlanStatusHolderKey);
- }
- } else {
- log.error("Couldn't decrement connected CEP receivers count for execution plan: " + executionPlanName +
- ", for tenant-domain: " + tenantDomain
- + " as the hazelcast lock acquisition failed.");
- }
- } catch (InterruptedException e) {
- log.error("Couldn't decrement connected CEP receivers count for execution plan: " + executionPlanName +
- ", for tenant-domain: " + tenantDomain
- + " as the hazelcast lock acquisition was interrupted.", e);
- Thread.currentThread().interrupt();
- } catch (SocketException e) {
- log.error("Couldn't decrement connected CEP receivers count for execution plan: " + executionPlanName +
- ", for tenant-domain: " + tenantDomain
- + " as the host IP couldn't be found for this node.", e);
- }
- } else {
- log.error("Couldn't decrement connected CEP receivers count for execution plan: " + executionPlanName +
- ", for tenant-domain: " + tenantDomain
- + " as the hazelcast instance is not active or not available.");
- }
- }
-
- @Override
- public void onPublisherBoltConnect() {
- HazelcastInstance hazelcastInstance = EventProcessorValueHolder.getHazelcastInstance();
- if (hazelcastInstance != null && hazelcastInstance.getLifecycleService().isRunning()) {
- IMap executionPlanStatusHolderIMap = hazelcastInstance.getMap(DistributedModeConstants.STORM_STATUS_MAP);
- try {
- if (hostIp == null) {
- hostIp = HostAddressFinder.findAddress("localhost");
- }
- if (executionPlanStatusHolderIMap.tryLock(executionPlanStatusHolderKey, lockTimeout, TimeUnit.MILLISECONDS)) {
- try {
- ExecutionPlanStatusHolder executionPlanStatusHolder =
- executionPlanStatusHolderIMap.get(stormTopologyName);
- if (executionPlanStatusHolder == null) {
- log.error("Couldn't increment connected publisher bolts count for execution plan: " + executionPlanName +
- ", for tenant-domain: " + tenantDomain
- + " as status object not initialized by manager.");
- } else {
- executionPlanStatusHolder.setConnectedPublisherBoltsCount(hostIp, connectedPublisherBoltsCount.incrementAndGet());
- executionPlanStatusHolderIMap.replace(stormTopologyName, executionPlanStatusHolder);
- if (log.isDebugEnabled()) {
- log.debug("Incremented connected publisher bolt count as " + connectedPublisherBoltsCount.get() +
- " for execution plan: " + executionPlanName + ", for tenant-domain: " + tenantDomain
- + ", for IP address: " + hostIp);
- }
- }
- } finally {
- executionPlanStatusHolderIMap.unlock(executionPlanStatusHolderKey);
- }
- } else {
- log.error("Couldn't increment connected publisher bolts count for execution plan: " + executionPlanName +
- ", for tenant-domain: " + tenantDomain
- + " as the hazelcast lock acquisition failed.");
- }
- } catch (InterruptedException e) {
- log.error("Couldn't increment connected publisher bolts count for execution plan: " + executionPlanName +
- ", for tenant-domain: " + tenantDomain
- + " as the hazelcast lock acquisition was interrupted.", e);
- Thread.currentThread().interrupt();
- } catch (SocketException e) {
- log.error("Couldn't increment connected publisher bolts count for execution plan: " + executionPlanName +
- ", for tenant-domain: " + tenantDomain
- + " as the host IP couldn't be found for this node.", e);
- }
- } else {
- log.error("Couldn't increment connected publisher bolts count for execution plan: " + executionPlanName +
- ", for tenant-domain: " + tenantDomain
- + " as the hazelcast instance is not active or not available.");
- }
- }
-
- @Override
- public void onPublisherBoltDisconnect() {
- HazelcastInstance hazelcastInstance = EventProcessorValueHolder.getHazelcastInstance();
- if (hazelcastInstance != null && hazelcastInstance.getLifecycleService().isRunning()) {
- IMap executionPlanStatusHolderIMap = hazelcastInstance.getMap(DistributedModeConstants.STORM_STATUS_MAP);
- try {
- if (hostIp == null) {
- hostIp = HostAddressFinder.findAddress("localhost");
- }
- if (executionPlanStatusHolderIMap.tryLock(executionPlanStatusHolderKey, lockTimeout, TimeUnit.MILLISECONDS)) {
- try {
- ExecutionPlanStatusHolder executionPlanStatusHolder =
- executionPlanStatusHolderIMap.get(stormTopologyName);
- if (executionPlanStatusHolder == null) {
- log.error("Couldn't decrement connected publisher bolts count for execution plan: " + executionPlanName +
- ", for tenant-domain: " + tenantDomain
- + " as status object not initialized by manager.");
- } else {
- executionPlanStatusHolder.setConnectedPublisherBoltsCount(hostIp, connectedPublisherBoltsCount.decrementAndGet());
- executionPlanStatusHolderIMap.replace(stormTopologyName, executionPlanStatusHolder);
- if (log.isDebugEnabled()) {
- log.debug("Decremented connected publisher bolt count as " + connectedPublisherBoltsCount.get() +
- " for execution plan: " + executionPlanName + ", for tenant-domain: " + tenantDomain
- + ", for IP address: " + hostIp);
- }
- }
- } finally {
- executionPlanStatusHolderIMap.unlock(executionPlanStatusHolderKey);
- }
- } else {
- log.error("Couldn't decrement connected publisher bolts count for execution plan: " + executionPlanName +
- ", for tenant-domain: " + tenantDomain
- + " as the hazelcast lock acquisition failed.");
- }
- } catch (InterruptedException e) {
- log.error("Couldn't decrement connected publisher bolts count for execution plan: " + executionPlanName +
- ", for tenant-domain: " + tenantDomain
- + " as the hazelcast lock acquisition was interrupted.", e);
- Thread.currentThread().interrupt();
- } catch (SocketException e) {
- log.error("Couldn't decrement connected publisher bolts count for execution plan: " + executionPlanName +
- ", for tenant-domain: " + tenantDomain
- + " as the host IP couldn't be found for this node.", e);
- }
- } else {
- log.error("Couldn't decrement connected publisher bolts count for execution plan: " + executionPlanName +
- ", for tenant-domain: " + tenantDomain
- + " as the hazelcast instance is not active or not available.");
- }
- }
-
- public void hazelcastListenerCallback() {
- HazelcastInstance hazelcastInstance = EventProcessorValueHolder.getHazelcastInstance();
- if (hazelcastInstance != null && hazelcastInstance.getLifecycleService().isRunning()) {
- IMap executionPlanStatusHolderIMap = hazelcastInstance.getMap(DistributedModeConstants.STORM_STATUS_MAP);
- try {
- if (hostIp == null) {
- hostIp = HostAddressFinder.findAddress("localhost");
- }
- if (executionPlanStatusHolderIMap.tryLock(executionPlanStatusHolderKey, lockTimeout, TimeUnit.MILLISECONDS)) {
- try {
- ExecutionPlanStatusHolder executionPlanStatusHolder =
- executionPlanStatusHolderIMap.get(stormTopologyName);
- if (executionPlanStatusHolder == null) {
- log.error("Couldn't update distributed deployment status for execution plan: " + executionPlanName +
- ", for tenant-domain: " + tenantDomain
- + " as status object not initialized by manager.");
- } else {
- executionPlanStatusHolder.setCEPReceiverStatus(hostIp, connectedCepReceiversCount.get(), importedStreamsCount);
- executionPlanStatusHolder.setConnectedPublisherBoltsCount(hostIp, connectedPublisherBoltsCount.get());
- executionPlanStatusHolderIMap.replace(stormTopologyName, executionPlanStatusHolder);
- if (log.isDebugEnabled()) {
- log.debug("Updated distributed deployment status as follows. " +
- "\nConnected CEP receivers count: " + connectedCepReceiversCount.get() +
- "\nConnected publisher bolts count: " + connectedPublisherBoltsCount.get() +
- "\nfor execution plan: " + executionPlanName + ", for tenant-domain: " + tenantDomain
- + ", for IP address: " + hostIp);
- }
- }
- } finally {
- executionPlanStatusHolderIMap.unlock(executionPlanStatusHolderKey);
- }
- } else {
- log.error("Couldn't update distributed deployment status for execution plan: " + executionPlanName +
- ", for tenant-domain: " + tenantDomain
- + " as the hazelcast lock acquisition failed.");
- }
- } catch (InterruptedException e) {
- log.error("Couldn't update distributed deployment status for execution plan: " + executionPlanName +
- ", for tenant-domain: " + tenantDomain
- + " as the hazelcast lock acquisition was interrupted.", e);
- Thread.currentThread().interrupt();
- } catch (SocketException e) {
- log.error("Couldn't update distributed deployment status for execution plan: " + executionPlanName +
- ", for tenant-domain: " + tenantDomain
- + " as the host IP couldn't be found for this node.", e);
- }
- } else {
- log.error("Couldn't update distributed deployment status for execution plan: " + executionPlanName +
- ", for tenant-domain: " + tenantDomain
- + " as the hazelcast instance is not active or not available.");
- }
- }
-
- public void shutdown() {
- executorService.shutdownNow();
- }
-
- /**
- * Updates the ExecutionPlanStatusHolder periodically.
- */
- class GlobalStatUpdater implements Runnable {
-
- private final int updateRate;
-
- GlobalStatUpdater() {
- updateRate = EventProcessorValueHolder.getStormDeploymentConfiguration().getStatusUpdateInterval();
- }
-
- @Override
- public void run() {
- while (true) {
-
- /**
- * Update
- */
- HazelcastInstance hazelcastInstance = EventProcessorValueHolder.getHazelcastInstance();
- if (hazelcastInstance != null && hazelcastInstance.getLifecycleService().isRunning()) {
- IMap executionPlanStatusHolderIMap = hazelcastInstance.getMap(DistributedModeConstants.STORM_STATUS_MAP);
- try {
- if (hostIp == null) {
- hostIp = HostAddressFinder.findAddress("localhost");
- }
- if (executionPlanStatusHolderIMap.tryLock(executionPlanStatusHolderKey, lockTimeout, TimeUnit.MILLISECONDS)) {
- try {
- ExecutionPlanStatusHolder executionPlanStatusHolder =
- executionPlanStatusHolderIMap.get(stormTopologyName);
- if (executionPlanStatusHolder == null) {
- log.error("Couldn't update distributed deployment status for execution plan: " + executionPlanName +
- ", for tenant-domain: " + tenantDomain
- + " as status object not initialized by manager.");
- } else {
- executionPlanStatusHolder.setCEPReceiverStatus(hostIp, connectedCepReceiversCount.get(), importedStreamsCount);
- executionPlanStatusHolder.setConnectedPublisherBoltsCount(hostIp, connectedPublisherBoltsCount.get());
- executionPlanStatusHolderIMap.replace(stormTopologyName, executionPlanStatusHolder);
- if (log.isDebugEnabled()) {
- log.debug("Updated distributed deployment status as follows. " +
- "\nConnected CEP receivers count: " + connectedCepReceiversCount.get() +
- "\nConnected publisher bolts count: " + connectedPublisherBoltsCount.get() +
- "\nfor execution plan: " + executionPlanName + ", for tenant-domain: " + tenantDomain
- + ", for IP address: " + hostIp);
- }
- }
- } finally {
- executionPlanStatusHolderIMap.unlock(executionPlanStatusHolderKey);
- }
- } else {
- log.error("Couldn't update distributed deployment status for execution plan: " + executionPlanName +
- ", for tenant-domain: " + tenantDomain
- + " as the hazelcast lock acquisition failed.");
- }
- } catch (InterruptedException e) {
- log.error("Couldn't update distributed deployment status for execution plan: " + executionPlanName +
- ", for tenant-domain: " + tenantDomain
- + " as the hazelcast lock acquisition was interrupted.", e);
- Thread.currentThread().interrupt();
- return;
- } catch (SocketException e) {
- log.error("Couldn't update distributed deployment status for execution plan: " + executionPlanName +
- ", for tenant-domain: " + tenantDomain
- + " as the host IP couldn't be found for this node.", e);
- }
- } else {
- log.error("Couldn't update distributed deployment status for execution plan: " + executionPlanName +
- ", for tenant-domain: " + tenantDomain
- + " as the hazelcast instance is not active or not available.");
- }
-
- /**
- * Sleep
- */
- try {
- Thread.sleep(updateRate);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- if (log.isDebugEnabled()) {
- log.debug("GlobalStatUpdater was interrupted, hence returning. " +
- "Details: execution plan name: " + executionPlanName + ", tenant domain: " + tenantDomain);
- }
- return;
- }
- }
- }
- }
-}
diff --git a/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/status/monitor/exception/DeploymentStatusMonitorException.java b/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/status/monitor/exception/DeploymentStatusMonitorException.java
deleted file mode 100644
index 860d1a7bc..000000000
--- a/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/status/monitor/exception/DeploymentStatusMonitorException.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.wso2.carbon.event.processor.core.internal.storm.status.monitor.exception;
-
-public class DeploymentStatusMonitorException extends Exception{
-
- public DeploymentStatusMonitorException(){
- }
-
- public DeploymentStatusMonitorException(String message){
- super(message);
- }
-}
diff --git a/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/util/ComponentInfoHolder.java b/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/util/ComponentInfoHolder.java
deleted file mode 100644
index ad1663fe3..000000000
--- a/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/util/ComponentInfoHolder.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.wso2.carbon.event.processor.core.internal.storm.util;
-
-import org.wso2.siddhi.query.api.definition.StreamDefinition;
-import org.wso2.siddhi.query.compiler.SiddhiCompiler;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class ComponentInfoHolder {
-
-
-
- public enum ComponentType {EVENT_RECEIVER_SPOUT, SIDDHI_BOLT, EVENT_PUBLISHER_BOLT, TRIGGER_SPOUT;}
-
- private ComponentType componentType;
- private String componentName = null;
- private Object declarer;
- private Map inputStreams = new HashMap();
- private Map inputStreamPartitoningFields = new HashMap();
- private Map outputStreams = new HashMap();
- private int parallelism = 1;
- private String query;
-
- public ComponentInfoHolder(String componentName, ComponentType componentType) {
- this.componentName = componentName;
- this.componentType = componentType;
- }
-
- public void addSiddhiQuery(String query){
- this.query = query;
- }
-
- public void setParallelism(int parallelism) {
- this.parallelism = parallelism;
- }
-
- public void addInputStream(String streamDefinition) {
- StreamDefinition siddhiStreamDefinition = SiddhiCompiler.parseStreamDefinition(streamDefinition);
- inputStreams.put(siddhiStreamDefinition.getId(), siddhiStreamDefinition);
- }
-
- public void addStreamPartitioningField(String streamId, String partitioningField){
- if (this.componentType != ComponentType.EVENT_RECEIVER_SPOUT){
- inputStreamPartitoningFields.put(streamId, partitioningField);
- }
- }
-
- public void addOutputStream(String streamDefinition) {
- StreamDefinition siddhiStreamDefinition = SiddhiCompiler.parseStreamDefinition(streamDefinition);
- outputStreams.put(siddhiStreamDefinition.getId(), siddhiStreamDefinition);
- }
-
- public String[] getInputStreamIds() {
- return inputStreams.keySet().toArray(new String[inputStreams.size()]);
- }
-
- public String[] getOutputStreamIds() {
- return outputStreams.keySet().toArray(new String[inputStreams.size()]);
- }
-
- public String getPartionenedField(String streamId){
- return inputStreamPartitoningFields.get(streamId);
- }
-
- public int getParallelism() {
- return parallelism;
- }
-
- public String getComponentName() {
- return componentName;
- }
-
- public ComponentType getComponentType() {
- return componentType;
- }
-
- public void setDeclarer(Object declarer) {
- this.declarer = declarer;
- }
-
- public Object getDeclarer() {
- return declarer;
- }
-
-}
diff --git a/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/util/ExecutionElementInfoHolder.java b/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/util/ExecutionElementInfoHolder.java
deleted file mode 100644
index 7e2392aa8..000000000
--- a/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/util/ExecutionElementInfoHolder.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.wso2.carbon.event.processor.core.internal.storm.util;
-
-import org.wso2.siddhi.query.api.execution.ExecutionElement;
-
-/**
- * Class to store execution element info
- */
-public class ExecutionElementInfoHolder {
- private ExecutionElement executionElement;
- private ParallelismInfoHolder parallelismInfoHolder;
-
- public ExecutionElementInfoHolder(ExecutionElement executionElement, int parallelism, Boolean isParallelismEnforced) {
- this.executionElement = executionElement;
- this.parallelismInfoHolder = new ParallelismInfoHolder(parallelism, isParallelismEnforced);
- }
-
- public ExecutionElement getExecutionElement() {
- return executionElement;
- }
-
- public ParallelismInfoHolder getParallelismInfoHolder() {
- return parallelismInfoHolder;
- }
-}
diff --git a/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/util/ParallelismInfoHolder.java b/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/util/ParallelismInfoHolder.java
deleted file mode 100644
index 010129263..000000000
--- a/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/util/ParallelismInfoHolder.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.wso2.carbon.event.processor.core.internal.storm.util;
-
-/**
- * Class to store storm parallelism info
- */
-public class ParallelismInfoHolder {
- private int parallelism;
- private Boolean isEnforced;
-
- public ParallelismInfoHolder(int parallelism, Boolean isEnforced) {
- this.parallelism = parallelism;
- this.isEnforced = isEnforced;
- }
-
- public int getParallelism() {
- return parallelism;
- }
-
- public Boolean getIsEnforced() {
- return isEnforced;
- }
-}
diff --git a/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/util/QueryGroupInfoHolder.java b/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/util/QueryGroupInfoHolder.java
deleted file mode 100644
index 2ad56f9b1..000000000
--- a/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/util/QueryGroupInfoHolder.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.wso2.carbon.event.processor.core.internal.storm.util;
-
-import org.wso2.carbon.event.processor.core.exception.StormQueryConstructionException;
-import org.wso2.siddhi.query.api.execution.partition.Partition;
-import org.wso2.siddhi.query.api.execution.partition.PartitionType;
-import org.wso2.siddhi.query.api.execution.partition.RangePartitionType;
-import org.wso2.siddhi.query.api.execution.partition.ValuePartitionType;
-import org.wso2.siddhi.query.api.execution.query.Query;
-import org.wso2.siddhi.query.api.expression.Variable;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-
-/**
- * Class to hold necessary information regarding query grouping which is needed
- * for Storm query plan generation. Here single query is treated as a query group
- * with group id = query name. Also a Partition is considered as a group.
- */
-public class QueryGroupInfoHolder {
- private String groupId;
- private List stringQueries;
- private List executionElements;
- private Set inputDefinitionIds;
- private Set outputDefinitionIds;
- private Map partitionFieldMap = null;
-
- public QueryGroupInfoHolder(String groupId) {
- this.groupId = groupId;
- stringQueries = new ArrayList();
- executionElements = new ArrayList();
- inputDefinitionIds = new HashSet();
- outputDefinitionIds = new HashSet();
- }
-
- public void addQueryString(String query) {
- stringQueries.add(query);
- }
-
- public void addExecutionElement(ExecutionElementInfoHolder infoHolder) throws StormQueryConstructionException {
- if (infoHolder.getExecutionElement() instanceof Query) {
- inputDefinitionIds.addAll(((Query) infoHolder.getExecutionElement()).getInputStream().getUniqueStreamIds());
- outputDefinitionIds.add(((Query) infoHolder.getExecutionElement()).getOutputStream().getId());
- } else {
- Partition partition = (Partition) infoHolder.getExecutionElement();
- for (Query query : partition.getQueryList()) {
- for (String id : query.getInputStream().getUniqueStreamIds()) {
- if (!id.contains("#")) { //if not an inner stream
- inputDefinitionIds.add(id);
- }
- }
- outputDefinitionIds.add(query.getOutputStream().getId());
- }
- for (PartitionType type : partition.getPartitionTypeMap().values()) {
- if (type instanceof RangePartitionType) {
- throw new StormQueryConstructionException("Error in deploying Partition:" + this.getStringQueries
- ().get(0) + " Range partitioning is not supported in distributed deployment");
- } else if (type instanceof ValuePartitionType) {
- if (partitionFieldMap == null) {
- partitionFieldMap = new HashMap();
- }
- if (((ValuePartitionType) type).getExpression() instanceof Variable) {
- Variable variable = (Variable) ((ValuePartitionType) type).getExpression();
- partitionFieldMap.put(type.getStreamId(), variable.getAttributeName());
- } else {
- throw new StormQueryConstructionException("Error in deploying partition:" + this
- .getStringQueries().get(0) + ". Only Expressions of Type Variable will be admitted" +
- " for distributed processing");
- }
- }
- }
- }
- executionElements.add(infoHolder);
- }
-
- public List getStringQueries() {
- return stringQueries;
- }
-
- public List getExecutionElements() {
- return executionElements;
- }
-
- public Set getInputDefinitionIds() {
- return inputDefinitionIds;
- }
-
- public Set getOutputDefinitionIds() {
- return outputDefinitionIds;
- }
-
- public Map getPartitionFieldMap() {
- return partitionFieldMap;
- }
-}
diff --git a/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/util/StormQueryPlanBuilder.java b/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/util/StormQueryPlanBuilder.java
deleted file mode 100644
index 052494373..000000000
--- a/components/event-processor/org.wso2.carbon.event.processor.core/src/main/java/org/wso2/carbon/event/processor/core/internal/storm/util/StormQueryPlanBuilder.java
+++ /dev/null
@@ -1,575 +0,0 @@
-/*
- * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.wso2.carbon.event.processor.core.internal.storm.util;
-
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.wso2.carbon.event.processor.core.ExecutionPlanConfiguration;
-import org.wso2.carbon.event.processor.core.exception.StormQueryConstructionException;
-import org.wso2.carbon.event.processor.core.internal.ds.EventProcessorValueHolder;
-import org.wso2.carbon.event.processor.core.internal.storm.compiler.SiddhiQLStormQuerySplitter;
-import org.wso2.carbon.event.processor.core.internal.util.EventProcessorConstants;
-import org.wso2.carbon.event.processor.core.internal.util.EventProcessorUtil;
-import org.wso2.carbon.event.stream.core.exception.EventStreamConfigurationException;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.query.api.ExecutionPlan;
-import org.wso2.siddhi.query.api.annotation.Annotation;
-import org.wso2.siddhi.query.api.definition.AbstractDefinition;
-import org.wso2.siddhi.query.api.definition.StreamDefinition;
-import org.wso2.siddhi.query.api.definition.TriggerDefinition;
-import org.wso2.siddhi.query.api.execution.ExecutionElement;
-import org.wso2.siddhi.query.api.execution.partition.Partition;
-import org.wso2.siddhi.query.api.execution.query.Query;
-import org.wso2.siddhi.query.api.execution.query.input.stream.BasicSingleInputStream;
-import org.wso2.siddhi.query.compiler.SiddhiCompiler;
-import org.wso2.siddhi.query.compiler.exception.SiddhiParserException;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-import java.util.*;
-
-public class StormQueryPlanBuilder {
-
- /**
- * Gets Siddhi queries and construct storm query plan which can be used to build a storm topology.
- * query plan essentially comprise of three main elements. Receiver element, event processor element and
- * publisher element. Each of that will be constructed in separate methods and integrated here.
- *
- * @param configuration Execution plan configuration
- * @return
- * @throws StormQueryConstructionException
- */
- public static Document constructStormQueryPlanXML(ExecutionPlanConfiguration configuration,
- List importStreams, List exportStreams)
- throws StormQueryConstructionException {
- Document document;
- try {
- DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance();
- DocumentBuilder documentBuilder = documentBuilderFactory.newDocumentBuilder();
- document = documentBuilder.newDocument();
- Element rootElement = document.createElement(EventProcessorConstants.STORM_QUERY_PLAN);
- document.appendChild(rootElement);
-
- Element receiverElement;
- List processorElements;
- List triggerProcessorElements;
- Element publisherElement;
-
- receiverElement = constructReceiverElement(document, configuration.getExecutionPlan(), importStreams);
- publisherElement = constructPublisherElement(document, configuration.getExecutionPlan(), exportStreams);
- triggerProcessorElements = constructTriggerElement(document, configuration.getExecutionPlan());
- processorElements = constructProcessorElement(document, configuration.getExecutionPlan(), importStreams,
- exportStreams);
-
-
- rootElement.appendChild(receiverElement);
- for (Element processorElement : processorElements) {
- rootElement.appendChild(processorElement);
- }
- for (Element triggerProcessorElement : triggerProcessorElements){
- rootElement.appendChild(triggerProcessorElement);
- }
- rootElement.appendChild(publisherElement);
-
- StormQueryPlanValidator.validateQueryPlan(document);
- } catch (ParserConfigurationException e) {
- throw new StormQueryConstructionException("Error when creating storm query configuration.", e);
- } catch (EventStreamConfigurationException e) {
- throw new StormQueryConstructionException("Error when retrieving stream definitions in order to create storm " +
- "query configuration", e);
- } catch (SiddhiParserException e) {
- throw new StormQueryConstructionException("Provided Siddhi query contains errors", e);
- }
- return document;
- }
-
- /**
- * Create receiver element. Assume that imported streams contains all the receiver elements.
- *
- * @param document
- * @param queryExpressions
- * @param importedStreams @return
- * @throws EventStreamConfigurationException
- */
- private static Element constructReceiverElement(Document document, String queryExpressions, List importedStreams)
- throws EventStreamConfigurationException {
- Element receiverElement = document.createElement(EventProcessorConstants.EVENT_RECEIVER);
- receiverElement.setAttribute(EventProcessorConstants.NAME, EventProcessorConstants.EVENT_RECEIVER_SPOUT);
- ExecutionPlan executionPlan = SiddhiCompiler.parse(queryExpressions);
- receiverElement.setAttribute(EventProcessorConstants.PARALLEL, String.valueOf(getParallelism(executionPlan.getAnnotations(),
- EventProcessorConstants.RECEIVER_PARALLELISM)));
- Element streams = document.createElement(EventProcessorConstants.STREAMS);
- for (String definition : importedStreams) {
- Element stream = getStreamElement(document, definition);
- streams.appendChild(stream);
- }
- receiverElement.appendChild(streams);
- return receiverElement;
- }
-
- /**
- * Create publisher element. Assumes that exported streams contains all publisher streams.
- *
- * @param exportedStreams
- * @return
- * @throws EventStreamConfigurationException
- */
- private static Element constructPublisherElement(Document document, String queryExpressions, List exportedStreams)
- throws EventStreamConfigurationException {
- Element publisherElement = document.createElement(EventProcessorConstants.EVENT_PUBLISHER);
- Element publisherInputStream = document.createElement(EventProcessorConstants.INPUT_STREAMS);
- Element publisherOutputStream = document.createElement(EventProcessorConstants.OUTPUT_STREAMS);
- publisherElement.setAttribute(EventProcessorConstants.NAME, EventProcessorConstants.EVENT_PUBLISHER_BOLT);
- ExecutionPlan executionPlan = SiddhiCompiler.parse(queryExpressions);
- publisherElement.setAttribute(EventProcessorConstants.PARALLEL, String.valueOf(getParallelism(executionPlan.getAnnotations(),
- EventProcessorConstants.PUBLISHER_PARALLELISM)));
- for (String definition : exportedStreams) {
- Element stream = getStreamElement(document, definition);
- publisherOutputStream.appendChild(stream);
- Element clonedStream = (Element) stream.cloneNode(true);
- publisherInputStream.appendChild(clonedStream);
- }
- publisherElement.appendChild(publisherInputStream);
- publisherElement.appendChild(publisherOutputStream);
- return publisherElement;
- }
-
- private static List constructTriggerElement(Document document, String queryExpression) throws StormQueryConstructionException {
- ExecutionPlanRuntime executionPlanRuntime = EventProcessorValueHolder.getSiddhiManager().createExecutionPlanRuntime(queryExpression);
- Map streamDefinitionMap = executionPlanRuntime.getStreamDefinitionMap();
- executionPlanRuntime.shutdown();
-
- ExecutionPlan executionPlan = SiddhiCompiler.parse(queryExpression);
- List