Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
xyuanlu committed Sep 5, 2024
1 parent 86868f8 commit 98a3b6d
Show file tree
Hide file tree
Showing 8 changed files with 251 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ public class GatewayServiceConfigConstant {
public static final int DEFAULT_AMX_ALLOWED_CLIENT_HEARTBEAT_INTERVAL = 60;
public static final int DEFAULT_CLIENT_TIMEOUT = 5 * 60;
public static final int DEFAULT_POLL_INTERVAL_SEC = 60;
public static final int DEFAULT_HEALTH_TIMEOUT_SEC = 60;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@
* under the License.
*/




import io.grpc.health.v1.HealthGrpc;
import java.util.Map;
import java.util.Properties;

Expand Down Expand Up @@ -52,11 +48,11 @@ public enum ChannelType {
// service configs

// service mode for inbound information.
private ChannelMode _channelMode;
private final ChannelMode _channelMode;
// channel type for participant liveness detection
private ChannelType _participantConnectionChannelType;
private final ChannelType _participantConnectionChannelType;
// channel for sending and receiving shard state transition request and shard state response
private ChannelType _shardStateChannelType;
private final ChannelType _shardStateChannelType;

// grpc server configs
private final int _grpcServerPort;
Expand All @@ -67,21 +63,27 @@ public enum ChannelType {

// poll mode config
private final int _pollIntervalSec;
Properties _pollModeConfigs;
public enum fileBasedConfigType{
private final int _pollStartDelaySec;
private final int _pollHealthCheckTimeoutSec;
private final int _targetFileUpdateIntervalSec;
private final Map<String, Map<String, String>> _healthCheckEndpointMap;
private final Properties _pollModeConfigs;

public enum FileBasedConfigType {
PARTICIPANT_CURRENT_STATE_PATH,
SHARD_STATE_PATH,
SHARD_TARGET_STATE_PATH,
PARTICIPANT_LIVENESS_PATH
}

// cluster -> host -> grpc health stub for query liveness
Map<String, Map<String, String>> _healthCheckGrpcConnectionMap;


// getters

public ChannelMode getChannelMode() {
return _channelMode;
}

public ChannelType getParticipantConnectionChannelType() {
return _participantConnectionChannelType;
}
Expand Down Expand Up @@ -114,14 +116,31 @@ public int getPollIntervalSec() {
return _pollIntervalSec;
}

public Map<String, Map<String, String>> getHealthCheckGrpcConnectionMap() {
return _healthCheckGrpcConnectionMap;
public Map<String, Map<String, String>> getHealthCheckEndpointMap() {
return _healthCheckEndpointMap;
}

public int getPollStartDelaySec() {
return _pollStartDelaySec;
}

public int getPollHealthCheckTimeoutSec() {
return _pollHealthCheckTimeoutSec;
}

public int getTargetFileUpdateIntervalSec() {
return _targetFileUpdateIntervalSec;
}

private GatewayServiceChannelConfig(int grpcServerPort, ChannelMode channelMode, ChannelType participantConnectionChannelType,
ChannelType shardStateChannelType, int serverHeartBeatInterval, int maxAllowedClientHeartBeatInterval,
int clientTimeout, boolean enableReflectionService, int pollIntervalSec, Properties pollModeConfigs) {
public String getPollModeConfig(FileBasedConfigType type) {
return _pollModeConfigs.getProperty(type.toString());
}

private GatewayServiceChannelConfig(int grpcServerPort, ChannelMode channelMode,
ChannelType participantConnectionChannelType, ChannelType shardStateChannelType, int serverHeartBeatInterval,
int maxAllowedClientHeartBeatInterval, int clientTimeout, boolean enableReflectionService, int pollIntervalSec,
int pollStartDelaySec, int pollHealthCheckTimeoutSec, int targetFileUpdateIntervalSec,
Properties pollModeConfigs, Map<String, Map<String, String>> healthCheckEndpointMap) {
_grpcServerPort = grpcServerPort;
_channelMode = channelMode;
_participantConnectionChannelType = participantConnectionChannelType;
Expand All @@ -131,7 +150,11 @@ private GatewayServiceChannelConfig(int grpcServerPort, ChannelMode channelMode,
_clientTimeout = clientTimeout;
_enableReflectionService = enableReflectionService;
_pollIntervalSec = pollIntervalSec;
_pollStartDelaySec = pollStartDelaySec;
_pollHealthCheckTimeoutSec = pollHealthCheckTimeoutSec;
_targetFileUpdateIntervalSec = targetFileUpdateIntervalSec;
_pollModeConfigs = pollModeConfigs;
_healthCheckEndpointMap = healthCheckEndpointMap;
}

public static class GatewayServiceProcessorConfigBuilder {
Expand All @@ -152,7 +175,10 @@ public static class GatewayServiceProcessorConfigBuilder {
private int _pollIntervalSec = DEFAULT_POLL_INTERVAL_SEC;
// poll mode config
private Properties _pollModeConfigs;

private int _pollStartDelaySec = DEFAULT_POLL_INTERVAL_SEC;
private int _pollHealthCheckTimeoutSec = DEFAULT_HEALTH_TIMEOUT_SEC;
private int _targetFileUpdateIntervalSec = DEFAULT_POLL_INTERVAL_SEC;
private Map<String, Map<String, String>> _healthCheckEndpointMap;

public GatewayServiceProcessorConfigBuilder setChannelMode(ChannelMode channelMode) {
_channelMode = channelMode;
Expand Down Expand Up @@ -200,14 +226,34 @@ public GatewayServiceProcessorConfigBuilder setPollIntervalSec(int pollIntervalS
return this;
}

public GatewayServiceProcessorConfigBuilder addPollModeConfig(fileBasedConfigType type, String value) {
public GatewayServiceProcessorConfigBuilder addPollModeConfig(FileBasedConfigType type, String value) {
if (_pollModeConfigs == null) {
_pollModeConfigs = new Properties();
}
_pollModeConfigs.put(type.toString(), value);
return this;
}

public GatewayServiceProcessorConfigBuilder setPollStartDelaySec(int pollStartDelaySec) {
_pollStartDelaySec = pollStartDelaySec;
return this;
}

public GatewayServiceProcessorConfigBuilder setPollHealthCheckTimeout(int pollHealthCheckTimeout) {
_pollHealthCheckTimeoutSec = pollHealthCheckTimeout;
return this;
}

public GatewayServiceProcessorConfigBuilder setTargetFileUpdateIntervalSec(int targetFileUpdateIntervalSec) {
_targetFileUpdateIntervalSec = targetFileUpdateIntervalSec;
return this;
}

public GatewayServiceProcessorConfigBuilder setHealthCheckEndpointMap(Map<String, Map<String, String>> healthCheckEndpointMap) {
_healthCheckEndpointMap = healthCheckEndpointMap;
return this;
}

public void validate() {
if ((_participantConnectionChannelType == ChannelType.GRPC_SERVER
&& _shardStatenChannelType != ChannelType.GRPC_SERVER) || (
Expand All @@ -219,13 +265,16 @@ public void validate() {
if (_participantConnectionChannelType == ChannelType.GRPC_SERVER && _grpcServerPort == 0) {
throw new IllegalArgumentException("Grpc server port must be set for grpc server channel type");
}

// TODO: validate for file
}

public GatewayServiceChannelConfig build() {
validate();
return new GatewayServiceChannelConfig(_grpcServerPort, _channelMode, _participantConnectionChannelType,
_shardStatenChannelType, _serverHeartBeatInterval, _maxAllowedClientHeartBeatInterval, _clientTimeout,
_enableReflectionService, _pollIntervalSec, _pollModeConfigs);
_enableReflectionService, _pollIntervalSec, _pollStartDelaySec, _pollHealthCheckTimeoutSec,
_targetFileUpdateIntervalSec, _pollModeConfigs, _healthCheckEndpointMap);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,11 @@
* under the License.
*/

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.File;
import java.io.IOException;
import java.time.Instant;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
Expand All @@ -40,6 +35,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.helix.gateway.channel.GatewayServiceChannelConfig.FileBasedConfigType.*;
import static org.apache.helix.gateway.util.PollChannelUtil.*;


Expand All @@ -48,7 +44,6 @@
*
*/
public class HelixGatewayServicePollModeChannel implements HelixGatewayServiceChannel {
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final Logger logger = LoggerFactory.getLogger(HelixGatewayServicePollModeChannel.class);

// cluster -> host -> liveness result
Expand Down Expand Up @@ -78,45 +73,48 @@ public HelixGatewayServicePollModeChannel(GatewayServiceManager manager, Gateway
_scheduler = Executors.newSingleThreadScheduledExecutor();
_participantConnectionStatusChannelType = _config.getParticipantConnectionChannelType();
_shardStateChannelType = _config.getShardStateChannelType();
_healthCheckGrpcEndpointMap = _config.getHealthCheckEndpointMap();
_userCurrentStateFilePath = _config.getPollModeConfig(PARTICIPANT_CURRENT_STATE_PATH);
_targetStateFilePath = _config.getPollModeConfig(SHARD_TARGET_STATE_PATH);
_livenessResults = new HashMap<>();
}

private void fetchUpdates() {
void fetchUpdates() {
// 1. get the shard state change
Map<String, Map<String, Map<String, Map<String, String>>>> currentShardStates =
getChangedParticipantsCurrentState(_userCurrentStateFilePath);

Map<String, Map<String, Map<String, Map<String, String>>>> currentStateDiff = new HashMap<>();
for (String clusterName : currentShardStates.keySet()) {
currentStateDiff.put(clusterName,
_manager.getCache(clusterName).updateCacheWithNewCurrentStateAndGetDiff(currentShardStates.get(clusterName)));
}
for (String clusterName : currentStateDiff.keySet()) {
for (String instanceName : currentStateDiff.get(clusterName).keySet()) {
// if the instance is previously connected, send state change event
if (_livenessResults.get(clusterName) != null && _livenessResults.get(clusterName).get(instanceName)) {
logger.info("Host {} has state change, sending event to gateway manager", instanceName);
pushClientEventToGatewayManager(_manager,
StateTransitionMessageTranslateUtil.translateCurrentStateChangeToEvent(clusterName, instanceName,
currentStateDiff.get(clusterName).get(instanceName)));
}
}
Map<String, Map<String, Map<String, Map<String, String>>>> currentStateDiff = new HashMap<>();
for (String clusterName : currentShardStates.keySet()) {
Map<String, Map<String, Map<String, String>>> clusterDiffMap =
_manager.updateCacheWithNewCurrentStateAndGetDiff(clusterName, currentShardStates.get(clusterName));
if (clusterDiffMap == null || clusterDiffMap.isEmpty()) {
continue;
}
for (String instanceName : clusterDiffMap.keySet()) {
// if the instance is previously connected, send state change event
if (_livenessResults.get(clusterName) != null && _livenessResults.get(clusterName).get(instanceName)) {
logger.info("Host {} has state change, sending event to gateway manager", instanceName);
pushClientEventToGatewayManager(_manager,
StateTransitionMessageTranslateUtil.translateCurrentStateChangeToEvent(clusterName, instanceName,
clusterDiffMap.get(instanceName)));
}
}
currentStateDiff.put(clusterName, clusterDiffMap);
}

// 2. fetch host health
for (String clusterName : _healthCheckGrpcEndpointMap.keySet()) {
// iterate through each instance
for (String instanceName : _healthCheckGrpcEndpointMap.get(clusterName).keySet()) {
// first check liveness endpoint, compare and send liveness change event, for init connection or close connection
boolean prevLiveness =
_livenessResults.get(clusterName) != null && _livenessResults.get(clusterName).get(instanceName);
boolean liveness = fetchInstanceLiveness(clusterName, instanceName);
boolean liveness = fetchInstanceHealthStatus(clusterName, instanceName);

if (prevLiveness && !liveness) {
if (prevLiveness && !liveness) { // previously connected, now disconnected
logger.warn("Host {} is not healthy, sending event to gateway manager", instanceName);
pushClientEventToGatewayManager(_manager,
StateTransitionMessageTranslateUtil.translateClientCloseToEvent(clusterName, instanceName));
} else if (!prevLiveness && liveness) {
// init connection
} else if (!prevLiveness && liveness) { // new connection.
logger.info("Host {} is newly connected, sending init connection event to gateway manager", instanceName);
pushClientEventToGatewayManager(_manager,
StateTransitionMessageTranslateUtil.translateCurrentStateDiffToInitConnectEvent(clusterName, instanceName,
Expand Down Expand Up @@ -148,9 +146,24 @@ public void run() {
fetchUpdates();
}
};
_scheduler.scheduleAtFixedRate(fetchUpdatesTask, 60, // init delay
_scheduler.scheduleAtFixedRate(fetchUpdatesTask, _config.getPollStartDelaySec(), // init delay
_config.getPollIntervalSec(), // poll interval
TimeUnit.SECONDS);
scheduleTargetStateUpdateTask();
}

private void scheduleTargetStateUpdateTask() {
if (_shardStateChannelType == GatewayServiceChannelConfig.ChannelType.FILE) {
final Runnable writeTargetStateTask = new Runnable() {
@Override
public void run() {
flushAssignmentToFile(_manager.serializeTargetState(), _targetStateFilePath);
}
};
_scheduler.scheduleAtFixedRate(writeTargetStateTask, _config.getPollStartDelaySec(), // init delay
_config.getTargetFileUpdateIntervalSec(), // poll interval
TimeUnit.SECONDS);
}
}

@Override
Expand Down Expand Up @@ -180,54 +193,28 @@ public void completeConnection(String clusterName, String instanceName) {
/**
* Read current state
*/
private Map<String, Map<String, Map<String, Map<String, String>>>> getChangedParticipantsCurrentState(
Map<String, Map<String, Map<String, Map<String, String>>>> getChangedParticipantsCurrentState(
String userCurrentStateFilePath) {
Map<String, Map<String, Map<String, Map<String, String>>>> currentShardStates;
switch (_shardStateChannelType) {
case FILE:
// read current state from a file, compare with in memory current state, update the in memory current state and return diff.
// Current state file format: {"cluster1" : { "instance_1" : { "resource1" : {"shard1” : “online" }, "timestamp": "t1" }}}
try {
// read from file path
File file = new File(userCurrentStateFilePath);
currentShardStates = objectMapper.readValue(file,
new TypeReference<Map<String, Map<String, Map<String, Map<String, String>>>>>() {
});
} catch (IOException e) {
logger.warn("Failed to read from file: " + userCurrentStateFilePath, e);
return new HashMap<>();
}
currentShardStates = readCurrentStateFromFile(userCurrentStateFilePath);

// write target state
// TODO: for simplicity, we write the target state in the same thread. Later we can move it to a separate thread.
ObjectNode targetStateNode = new ObjectMapper().createObjectNode();
for (String clusterName : _manager.getCacheMap().keySet()) {
// add the json node to the target state node
targetStateNode.set(clusterName, _manager.getCache(clusterName).serializeTargetAssignmentsToJSON());
}
targetStateNode.set("timestamp", objectMapper.valueToTree(System.currentTimeMillis()));
flushAssignmentToFile(targetStateNode.toString(), _targetStateFilePath);
flushAssignmentToFile(_manager.serializeTargetState(), _targetStateFilePath);
return currentShardStates;

default:
throw new NotImplementedException("Only support file based channel shard state for now");
}
}

private boolean fetchInstanceLiveness(String clusterName, String instanceName) {
boolean fetchInstanceHealthStatus(String clusterName, String instanceName) {
String endpoint = _healthCheckGrpcEndpointMap.get(clusterName).get(instanceName);
switch (_participantConnectionStatusChannelType) {
case FILE:
try {
// read from file path
File file = new File(endpoint);
HostHealthState status = objectMapper.readValue(file, new TypeReference<HostHealthState>() {
});
return status.isHealthy() && (Instant.now().toEpochMilli() - status.getLastUpdatedTime()*1000) < 10000 ;
} catch (IOException e) {
logger.warn("Failed to read from file: " + endpoint, e);
return false;
}
return readInstanceHealthStatusFromFile(endpoint, _config.getPollHealthCheckTimeoutSec());
default:
throw new NotImplementedException("Only support grpc based channel for now");
}
Expand Down
Loading

0 comments on commit 98a3b6d

Please sign in to comment.