From d09d40b94b61373dfb833badf1de49e35d4d9ca2 Mon Sep 17 00:00:00 2001 From: xyuanlu Date: Tue, 24 Sep 2024 07:33:05 +0800 Subject: [PATCH] Add an end to end test for helix gateway (#2922) Add an end to end test for helix gateway --- .../HelixGatewayServicePollModeChannel.java | 11 +- .../participant/HelixGatewayParticipant.java | 5 + .../service/GatewayServiceManager.java | 22 +- .../util/GatewayCurrentStateCache.java | 2 + .../helix/gateway/util/PollChannelUtil.java | 1 + .../StateTransitionMessageTranslateUtil.java | 12 +- .../integration/TestFilePullChannelE2E.java | 255 ++++++++++++++++++ 7 files changed, 292 insertions(+), 16 deletions(-) create mode 100644 helix-gateway/src/test/java/org/apache/helix/gateway/integration/TestFilePullChannelE2E.java diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java index 3acd9e83bd..77caf0c17f 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java @@ -19,7 +19,9 @@ * under the License. */ +import java.io.File; import java.io.IOException; +import java.nio.file.Files; import org.apache.commons.lang3.NotImplementedException; import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel; import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass; @@ -110,7 +112,6 @@ protected void fetchUpdates() { boolean prevLiveness = _livenessResults.get(clusterName) != null && _livenessResults.get(clusterName).get(instanceName); boolean liveness = fetchInstanceLivenessStatus(clusterName, instanceName); - if (prevLiveness && !liveness) { // previously connected, now disconnected logger.warn("Host {} is not healthy, sending event to gateway manager", instanceName); pushClientEventToGatewayManager(_manager, @@ -172,7 +173,6 @@ public void run() { public void stop() { logger.info("Stopping Helix Gateway Service Poll Mode Channel..."); // Shutdown the scheduler gracefully when done (e.g., on app termination) - Runtime.getRuntime().addShutdownHook(new Thread(() -> { _scheduler.shutdown(); try { if (!_scheduler.awaitTermination(1, TimeUnit.MINUTES)) { @@ -181,7 +181,12 @@ public void stop() { } catch (InterruptedException e) { _scheduler.shutdownNow(); } - })); + // remove files + if (_shardStateChannelType == GatewayServiceChannelConfig.ChannelType.FILE) { + File file = new File(_targetStateFilePath); + boolean res = file.delete(); + logger.info("Delete target state file: " + file + " res :" + res); + } } @Override diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java b/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java index 8dd04644b4..4d3b975c0e 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java @@ -27,6 +27,7 @@ import org.apache.helix.HelixManager; import org.apache.helix.InstanceType; import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel; +import org.apache.helix.gateway.channel.HelixGatewayServicePollModeChannel; import org.apache.helix.gateway.service.GatewayServiceManager; import org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory; import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil; @@ -34,6 +35,8 @@ import org.apache.helix.manager.zk.ZKHelixManager; import org.apache.helix.model.Message; import org.apache.helix.participant.statemachine.StateTransitionError; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -43,6 +46,7 @@ * transitions signaled by remote participant. */ public class HelixGatewayParticipant implements HelixManagerStateListener { + private static final Logger logger = LoggerFactory.getLogger(HelixGatewayParticipant.class); public static final String UNASSIGNED_STATE = "UNASSIGNED"; private final HelixGatewayServiceChannel _gatewayServiceChannel; private final HelixManager _helixManager; @@ -113,6 +117,7 @@ public String getInstanceName() { * Completes the state transition with the given transitionId. */ public void completeStateTransition(String resourceId, String shardId, String currentState) { + logger.info("Completing state transition for shard: {}{} to state: {}", resourceId, shardId, currentState); String concatenatedShardName = resourceId + shardId; CompletableFuture future = _stateTransitionResultMap.get(concatenatedShardName); if (future != null) { diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java index e4d207fd78..9d4430ba1a 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java @@ -88,11 +88,10 @@ public GatewayServiceManager(String zkAddress, GatewayServiceChannelConfig gatew */ public void setGatewayServiceChannel(HelixGatewayServiceChannel channel) { if (_gatewayServiceChannel != null) { - _gatewayServiceChannel.stop(); - return; + throw new IllegalStateException( + "Gateway service channel is already set, it can only be set once."); } - throw new IllegalStateException( - "Gateway service channel is already set, it can only be set once."); + _gatewayServiceChannel = channel; } /** @@ -163,6 +162,7 @@ private ShardStateUpdator(GatewayServiceEvent event) { @Override public void run() { + System.out.println("Processing state transition result " + _event.getInstanceName()); HelixGatewayParticipant participant = getHelixGatewayParticipant(_event.getClusterName(), _event.getInstanceName()); if (participant == null) { @@ -201,7 +201,11 @@ public void run() { public void stopManager() { _connectionEventProcessor.shutdown(); _participantStateTransitionResultUpdator.shutdown(); - _helixGatewayParticipantMap.clear(); + _helixGatewayParticipantMap.forEach((clusterName, participantMap) -> { + participantMap.forEach((instanceName, participant) -> { + participant.disconnect(); + }); + }); } public void startService() throws IOException { @@ -231,9 +235,13 @@ private void removeHelixGatewayParticipant(String clusterName, String instanceNa HelixGatewayParticipant participant = getHelixGatewayParticipant(clusterName, instanceName); if (participant != null) { participant.disconnect(); - _helixGatewayParticipantMap.get(clusterName).remove(instanceName); + if (_helixGatewayParticipantMap.containsKey(clusterName)) { + _helixGatewayParticipantMap.get(clusterName).remove(instanceName); + } + } + if (_currentStateCacheMap.containsKey(clusterName)) { + _currentStateCacheMap.get(clusterName).removeInstanceTargetDataFromCache(instanceName); } - _currentStateCacheMap.get(clusterName).removeInstanceTargetDataFromCache(instanceName); } private HelixGatewayParticipant getHelixGatewayParticipant(String clusterName, diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java index 2b8b1c978e..bbec7f3e4d 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/util/GatewayCurrentStateCache.java @@ -116,6 +116,7 @@ private void updateShardStateMapWithDiff(Map stateMap, St * example : {"instance1":{"resource1":{"shard1":"ONLINE","shard2":"OFFLINE"}}}} */ public synchronized ObjectNode serializeTargetAssignmentsToJSONNode() { + ObjectNode root = mapper.createObjectNode(); for (Map.Entry entry : _targetStateMap.entrySet()) { root.set(entry.getKey(), entry.getValue().toJSONNode()); } @@ -183,6 +184,7 @@ private Map> getDiff(Map * @return a JSON object representing the shard state map. Example: {"shard1":"ONLINE","shard2":"OFFLINE"} */ public synchronized ObjectNode toJSONNode() { + ObjectNode root = mapper.createObjectNode(); for (Map.Entry> entry : _stateMap.entrySet()) { String resource = entry.getKey(); ObjectNode resourceNode = mapper.createObjectNode(); diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java index 26de6db0fe..2e8d277a80 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java @@ -69,6 +69,7 @@ public static boolean fetchLivenessStatusFromGrpcService(String service, HealthG public static void flushAssignmentToFile(String targetAssignment, String filePath) { try (FileWriter fileWriter = new FileWriter(filePath)) { fileWriter.write(targetAssignment); + fileWriter.close(); } catch (IOException e) { logger.warn("Failed to write to file: " + filePath, e); } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java index a2d07085bb..5fc319737b 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java @@ -130,7 +130,7 @@ public static GatewayServiceEvent translateShardStateMessageToEventAndUpdateCach * @param clusterName the cluster name * @return GatewayServiceEvent */ - public static GatewayServiceEvent translateClientCloseToEvent(String instanceName, String clusterName) { + public static GatewayServiceEvent translateClientCloseToEvent(String clusterName, String instanceName) { GatewayServiceEvent.GateWayServiceEventBuilder builder = new GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.DISCONNECT).setClusterName( clusterName).setParticipantName(instanceName); @@ -144,7 +144,7 @@ public static GatewayServiceEvent translateClientCloseToEvent(String instanceNam * @param shardStateMap * @return */ - public static GatewayServiceEvent translateCurrentStateChangeToEvent(String instanceName, String clusterName, + public static GatewayServiceEvent translateCurrentStateChangeToEvent(String clusterName, String instanceName, Map> shardStateMap) { List stResult = new ArrayList<>(); shardStateMap.forEach((resourceName, value) -> value.forEach((key, value1) -> { @@ -165,12 +165,12 @@ public static GatewayServiceEvent translateCurrentStateChangeToEvent(String inst * @param shardStateMap the initial state of shards on the participant. Could be empty map * @return */ - public static GatewayServiceEvent translateCurrentStateDiffToInitConnectEvent(String instanceName, String clusterName, + public static GatewayServiceEvent translateCurrentStateDiffToInitConnectEvent(String clusterName, String instanceName, Map> shardStateMap) { GatewayServiceEvent.GateWayServiceEventBuilder builder = - new GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.CONNECT).setClusterName( - clusterName).setParticipantName(instanceName).setShardStateMap(shardStateMap); + new GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.CONNECT).setClusterName(clusterName) + .setParticipantName(instanceName) + .setShardStateMap(shardStateMap); return builder.build(); } - } diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/integration/TestFilePullChannelE2E.java b/helix-gateway/src/test/java/org/apache/helix/gateway/integration/TestFilePullChannelE2E.java new file mode 100644 index 0000000000..3066dcc91e --- /dev/null +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/integration/TestFilePullChannelE2E.java @@ -0,0 +1,255 @@ +package org.apache.helix.gateway.integration; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.PropertyKey; +import org.apache.helix.TestHelper; +import org.apache.helix.gateway.base.HelixGatewayTestBase; +import org.apache.helix.gateway.channel.GatewayServiceChannelConfig; +import org.apache.helix.gateway.service.GatewayServiceManager; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.manager.zk.ZKHelixManager; +import org.apache.helix.manager.zk.ZkBaseDataAccessor; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; +import org.testng.Assert; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.apache.helix.gateway.channel.GatewayServiceChannelConfig.ChannelMode.*; + + +public class TestFilePullChannelE2E extends HelixGatewayTestBase { + + private static final String CLUSTER_NAME = "CLUSTER_" + TestFilePullChannelE2E.class.getSimpleName(); + private static final int START_NUM_NODE = 3; + private static final String TEST_DB = "TestDB"; + private static final String TEST_STATE_MODEL = "OnlineOffline"; + private static final String CONTROLLER_PREFIX = "controller"; + private static final String currentStatePath = "tmpcurrentState"; + private static final String targetStatePath = "tmptargetState"; + GatewayServiceManager manager1, manager2, manager0; + ArrayList csPaths = new ArrayList(); + ArrayList targetPaths = new ArrayList(); + ArrayList healthPaths = new ArrayList(); + private ClusterControllerManager _controller; + + @BeforeClass + public void beforeClass() { + super.beforeClass(); + + // Set up the Helix cluster + ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); + _gSetupTool.addCluster(CLUSTER_NAME, true); + + ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME); + clusterConfig.getRecord().setSimpleField(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, "true"); + configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); + + // Start the controller + String controllerName = CONTROLLER_PREFIX + '_' + CLUSTER_NAME; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + // Enable best possible assignment persistence + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + } + + @Test + public void testE2E() throws Exception { + // create files for health state + try { + for (int i = 0; i < START_NUM_NODE; i++) { + csPaths.add(createTempFile(currentStatePath + i, ".txt", "")); + targetPaths.add(createTempFile(targetStatePath + i, ".txt", "")); + String currentTime = String.valueOf(System.currentTimeMillis()); + String content = "{\"IsAlive\":" + true + ",\"LastUpdateTime\":" + currentTime + "}"; + healthPaths.add(createTempFile("tmphealthCheck" + i, ".txt", content)); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + + String fileName0 = healthPaths.get(0).toAbsolutePath().toString(); + String fileName1 = healthPaths.get(1).toAbsolutePath().toString(); + String fileName2 = healthPaths.get(2).toAbsolutePath().toString(); + + GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder builder = + new GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder().setChannelMode(POLL_MODE) + .setParticipantConnectionChannelType(GatewayServiceChannelConfig.ChannelType.FILE) + .setShardStateProcessorType(GatewayServiceChannelConfig.ChannelType.FILE) + .setPollIntervalSec(1) // set a larger number to avoid recurrent polling + .setPollStartDelaySec(1) + .setTargetFileUpdateIntervalSec(1); + + // create empty file for shard state + + // create 3 manager instances + manager0 = new GatewayServiceManager(ZK_ADDR, + builder.addPollModeConfig(GatewayServiceChannelConfig.FileBasedConfigType.PARTICIPANT_CURRENT_STATE_PATH, + csPaths.get(0).toAbsolutePath().toString()) + .addPollModeConfig(GatewayServiceChannelConfig.FileBasedConfigType.SHARD_TARGET_STATE_PATH, + targetPaths.get(0).toAbsolutePath().toString()) + .setHealthCheckEndpointMap(Map.of(CLUSTER_NAME, Map.of("instance0", fileName0))) + .build()); + manager1 = new GatewayServiceManager(ZK_ADDR, + builder.addPollModeConfig(GatewayServiceChannelConfig.FileBasedConfigType.PARTICIPANT_CURRENT_STATE_PATH, + csPaths.get(1).toAbsolutePath().toString()) + .addPollModeConfig(GatewayServiceChannelConfig.FileBasedConfigType.SHARD_TARGET_STATE_PATH, + targetPaths.get(1).toAbsolutePath().toString()) + .setHealthCheckEndpointMap(Map.of(CLUSTER_NAME, Map.of("instance1", fileName1))) + .build()); + manager2 = new GatewayServiceManager(ZK_ADDR, + builder.addPollModeConfig(GatewayServiceChannelConfig.FileBasedConfigType.PARTICIPANT_CURRENT_STATE_PATH, + csPaths.get(2).toAbsolutePath().toString()) + .addPollModeConfig(GatewayServiceChannelConfig.FileBasedConfigType.SHARD_TARGET_STATE_PATH, + targetPaths.get(2).toAbsolutePath().toString()) + .setHealthCheckEndpointMap(Map.of(CLUSTER_NAME, Map.of("instance2", fileName2))) + .build()); + + System.out.println("Starting all managers"); + manager0.startService(); + manager1.startService(); + manager2.startService(); + + // verify we see live instances + verifyInstances(CLUSTER_NAME, List.of("instance0", "instance1", "instance2")); + + // create an DB on cluster + createDB(); + + // read the target state file and verify the target state is updated + verifyTargetState(); + + // write current state to file + for (int i = 0; i < 3; i++) { + String content = + "{\"" + CLUSTER_NAME + "\" : { \"instance" + i + "\" : { \"TestDB\" : {\"TestDB_0\" : \"ONLINE\" }}}} "; + Files.write(csPaths.get(i), content.getBytes()); + } + + // check no pending messages for partitions + verifyNoPendingMessages(List.of("instance0", "instance1", "instance2")); + + // change health state to false on one instance + String currentTime = String.valueOf(System.currentTimeMillis()); + String content = "{\"IsAlive\":" + false + ",\"LastUpdateTime\":" + currentTime + "}"; + Files.write(healthPaths.get(0), content.getBytes()); + + // check live instance for that instance is gone + Assert.assertTrue(TestHelper.verify(() -> { + List liveInstance = getLiveInstances(); + return !liveInstance.contains("instance0") && liveInstance.contains("instance1") && liveInstance.contains( + "instance2"); + }, TestHelper.WAIT_DURATION)); + + // stop all manager + manager0.stopService(); + manager1.stopService(); + manager2.stopService(); + + + // check target state files are gone + for (int i = 0; i < 3; i++) { + Assert.assertFalse(Files.exists(targetPaths.get(i))); + } + + // check all live instances are gone + Assert.assertTrue(TestHelper.verify(() -> { + List liveInstance = getLiveInstances(); + return !liveInstance.contains("instance0") && !liveInstance.contains("instance1") && !liveInstance.contains( + "instance2"); + }, TestHelper.WAIT_DURATION)); + + for (int i = 0; i < 3; i++) { + try { + Files.deleteIfExists(csPaths.get(i)); + Files.deleteIfExists(targetPaths.get(i)); + Files.deleteIfExists(healthPaths.get(i)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + + private void verifyInstances(String clusterName, List instance0) throws Exception { + for (String instance : instance0) { + Assert.assertTrue(TestHelper.verify( + () -> _gSetupTool.getClusterManagementTool().getInstancesInCluster(clusterName).contains(instance), + TestHelper.WAIT_DURATION)); + } + } + + private List getLiveInstances() { + ZKHelixDataAccessor dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<>(_gZkClient)); + PropertyKey liveInstances = dataAccessor.keyBuilder().liveInstances(); + return dataAccessor.getChildNames(liveInstances); + } + + private void verifyNoPendingMessages(List participants) throws Exception { + ZKHelixDataAccessor dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor<>(_gZkClient)); + for (String participant : participants) { + PropertyKey messagesNode = dataAccessor.keyBuilder().messages(participant); + Assert.assertTrue( + TestHelper.verify(() -> dataAccessor.getChildNames(messagesNode).isEmpty(), TestHelper.WAIT_DURATION)); + } + } + + private void verifyTargetState() throws Exception { + for (int i = 0; i < 3; i++) { + int finalI = i; + Assert.assertTrue(TestHelper.verify(() -> { + String content = Files.readString(targetPaths.get(finalI)); + return content.contains("{\"TestDB\":{\"TestDB_0\":\"ONLINE\"}}}"); + }, TestHelper.WAIT_DURATION)); + } + } + + public static Path createTempFile(String prefix, String suffix, String content) throws IOException { + // Create a temporary file + Path tempFile = Files.createTempFile(prefix, suffix); + + // Write content to the temporary file + Files.write(tempFile, content.getBytes()); + + return tempFile; + } + + private void createDB() { + createDBInSemiAuto(_gSetupTool, CLUSTER_NAME, TEST_DB, List.of("instance0", "instance1", "instance2"), + TEST_STATE_MODEL, 1, 3); + + _clusterVerifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setResources(new HashSet<>(_gSetupTool.getClusterManagementTool().getResourcesInCluster(CLUSTER_NAME))) + .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME) + .build(); + } +}