Skip to content

Commit

Permalink
add e2e test
Browse files Browse the repository at this point in the history
  • Loading branch information
xyuanlu committed Sep 15, 2024
1 parent 54d687f commit 73a9ef1
Show file tree
Hide file tree
Showing 7 changed files with 289 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
* under the License.
*/

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;
Expand Down Expand Up @@ -110,7 +112,6 @@ protected void fetchUpdates() {
boolean prevLiveness =
_livenessResults.get(clusterName) != null && _livenessResults.get(clusterName).get(instanceName);
boolean liveness = fetchInstanceLivenessStatus(clusterName, instanceName);

if (prevLiveness && !liveness) { // previously connected, now disconnected
logger.warn("Host {} is not healthy, sending event to gateway manager", instanceName);
pushClientEventToGatewayManager(_manager,
Expand Down Expand Up @@ -172,7 +173,6 @@ public void run() {
public void stop() {
logger.info("Stopping Helix Gateway Service Poll Mode Channel...");
// Shutdown the scheduler gracefully when done (e.g., on app termination)
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
_scheduler.shutdown();
try {
if (!_scheduler.awaitTermination(1, TimeUnit.MINUTES)) {
Expand All @@ -181,7 +181,12 @@ public void stop() {
} catch (InterruptedException e) {
_scheduler.shutdownNow();
}
}));
// remove files
if (_shardStateChannelType == GatewayServiceChannelConfig.ChannelType.FILE) {
File file = new File(_targetStateFilePath);
boolean res = file.delete();
logger.info("Delete target state file: " + file + " res :" + res);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
import org.apache.helix.gateway.channel.HelixGatewayServicePollModeChannel;
import org.apache.helix.gateway.service.GatewayServiceManager;
import org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory;
import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil;
import org.apache.helix.manager.zk.HelixManagerStateListener;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.model.Message;
import org.apache.helix.participant.statemachine.StateTransitionError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand All @@ -43,6 +46,7 @@
* transitions signaled by remote participant.
*/
public class HelixGatewayParticipant implements HelixManagerStateListener {
private static final Logger logger = LoggerFactory.getLogger(HelixGatewayParticipant.class);
public static final String UNASSIGNED_STATE = "UNASSIGNED";
private final HelixGatewayServiceChannel _gatewayServiceChannel;
private final HelixManager _helixManager;
Expand Down Expand Up @@ -113,6 +117,7 @@ public String getInstanceName() {
* Completes the state transition with the given transitionId.
*/
public void completeStateTransition(String resourceId, String shardId, String currentState) {
logger.info("Completing state transition for shard: {}{} to state: {}", resourceId, shardId, currentState);
String concatenatedShardName = resourceId + shardId;
CompletableFuture<String> future = _stateTransitionResultMap.get(concatenatedShardName);
if (future != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,10 @@ public GatewayServiceManager(String zkAddress, GatewayServiceChannelConfig gatew
*/
public void setGatewayServiceChannel(HelixGatewayServiceChannel channel) {
if (_gatewayServiceChannel != null) {
_gatewayServiceChannel.stop();
return;
throw new IllegalStateException(
"Gateway service channel is already set, it can only be set once.");
}
throw new IllegalStateException(
"Gateway service channel is already set, it can only be set once.");
_gatewayServiceChannel = channel;
}

/**
Expand Down Expand Up @@ -163,6 +162,7 @@ private ShardStateUpdator(GatewayServiceEvent event) {

@Override
public void run() {
System.out.println("Processing state transition result " + _event.getInstanceName());
HelixGatewayParticipant participant =
getHelixGatewayParticipant(_event.getClusterName(), _event.getInstanceName());
if (participant == null) {
Expand Down Expand Up @@ -201,7 +201,11 @@ public void run() {
public void stopManager() {
_connectionEventProcessor.shutdown();
_participantStateTransitionResultUpdator.shutdown();
_helixGatewayParticipantMap.clear();
_helixGatewayParticipantMap.forEach((clusterName, participantMap) -> {
participantMap.forEach((instanceName, participant) -> {
participant.disconnect();
});
});
}

public void startService() throws IOException {
Expand Down Expand Up @@ -231,9 +235,13 @@ private void removeHelixGatewayParticipant(String clusterName, String instanceNa
HelixGatewayParticipant participant = getHelixGatewayParticipant(clusterName, instanceName);
if (participant != null) {
participant.disconnect();
_helixGatewayParticipantMap.get(clusterName).remove(instanceName);
if (_helixGatewayParticipantMap.containsKey(clusterName)) {
_helixGatewayParticipantMap.get(clusterName).remove(instanceName);
}
}
if (_currentStateCacheMap.containsKey(clusterName)) {
_currentStateCacheMap.get(clusterName).removeInstanceTargetDataFromCache(instanceName);
}
_currentStateCacheMap.get(clusterName).removeInstanceTargetDataFromCache(instanceName);
}

private HelixGatewayParticipant getHelixGatewayParticipant(String clusterName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ private void updateShardStateMapWithDiff(Map<String, ShardStateMap> stateMap, St
* example : {"instance1":{"resource1":{"shard1":"ONLINE","shard2":"OFFLINE"}}}}
*/
public synchronized ObjectNode serializeTargetAssignmentsToJSONNode() {
ObjectNode root = mapper.createObjectNode();
for (Map.Entry<String, ShardStateMap> entry : _targetStateMap.entrySet()) {
root.set(entry.getKey(), entry.getValue().toJSONNode());
}
Expand Down Expand Up @@ -183,6 +184,7 @@ private Map<String, Map<String, String>> getDiff(Map<String, Map<String, String>
* @return a JSON object representing the shard state map. Example: {"shard1":"ONLINE","shard2":"OFFLINE"}
*/
public synchronized ObjectNode toJSONNode() {
ObjectNode root = mapper.createObjectNode();
for (Map.Entry<String, Map<String, String>> entry : _stateMap.entrySet()) {
String resource = entry.getKey();
ObjectNode resourceNode = mapper.createObjectNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public static boolean fetchLivenessStatusFromGrpcService(String service, HealthG
public static void flushAssignmentToFile(String targetAssignment, String filePath) {
try (FileWriter fileWriter = new FileWriter(filePath)) {
fileWriter.write(targetAssignment);
fileWriter.close();
} catch (IOException e) {
logger.warn("Failed to write to file: " + filePath, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public static GatewayServiceEvent translateShardStateMessageToEventAndUpdateCach
* @param clusterName the cluster name
* @return GatewayServiceEvent
*/
public static GatewayServiceEvent translateClientCloseToEvent(String instanceName, String clusterName) {
public static GatewayServiceEvent translateClientCloseToEvent( String clusterName, String instanceName) {
GatewayServiceEvent.GateWayServiceEventBuilder builder =
new GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.DISCONNECT).setClusterName(
clusterName).setParticipantName(instanceName);
Expand All @@ -144,7 +144,7 @@ public static GatewayServiceEvent translateClientCloseToEvent(String instanceNam
* @param shardStateMap
* @return
*/
public static GatewayServiceEvent translateCurrentStateChangeToEvent(String instanceName, String clusterName,
public static GatewayServiceEvent translateCurrentStateChangeToEvent( String clusterName, String instanceName,
Map<String, Map<String, String>> shardStateMap) {
List<GatewayServiceEvent.StateTransitionResult> stResult = new ArrayList<>();
shardStateMap.forEach((resourceName, value) -> value.forEach((key, value1) -> {
Expand All @@ -165,7 +165,7 @@ public static GatewayServiceEvent translateCurrentStateChangeToEvent(String inst
* @param shardStateMap the initial state of shards on the participant. Could be empty map
* @return
*/
public static GatewayServiceEvent translateCurrentStateDiffToInitConnectEvent(String instanceName, String clusterName,
public static GatewayServiceEvent translateCurrentStateDiffToInitConnectEvent( String clusterName, String instanceName,
Map<String, Map<String, String>> shardStateMap) {
GatewayServiceEvent.GateWayServiceEventBuilder builder =
new GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.CONNECT).setClusterName(
Expand Down
Loading

0 comments on commit 73a9ef1

Please sign in to comment.