Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
xyuanlu committed Sep 4, 2024
1 parent 85d93a0 commit 86868f8
Show file tree
Hide file tree
Showing 12 changed files with 381 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ default void pushClientEventToGatewayManager(GatewayServiceManager gatewayServic
* @param instanceName instance name
* @param reason reason for closing connection
*/
public void closeConnectionWithError(String instanceName, String reason);
public void closeConnectionWithError(String clusterName, String instanceName, String reason);

/**
* Gateway service close client connection with success. This function is called when manager wants to close client
* connection gracefully, e.g., when gateway service is shutting down.
* @param instanceName instance name
*/
public void completeConnection(String instanceName);
public void completeConnection(String clusterName, String instanceName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
*/




import io.grpc.health.v1.HealthGrpc;
import java.util.Map;
import java.util.Properties;

import static org.apache.helix.gateway.api.constant.GatewayServiceConfigConstant.*;


Expand Down Expand Up @@ -61,9 +67,15 @@ public enum ChannelType {

// poll mode config
private final int _pollIntervalSec;
// TODO: configs for pull mode grpc client
Properties _pollModeConfigs;
public enum fileBasedConfigType{
PARTICIPANT_CURRENT_STATE_PATH,
SHARD_STATE_PATH,
PARTICIPANT_LIVENESS_PATH
}
// cluster -> host -> grpc health stub for query liveness
Map<String, Map<String, String>> _healthCheckGrpcConnectionMap;

// TODO: configs for pull mode with file

// getters

Expand Down Expand Up @@ -102,9 +114,14 @@ public int getPollIntervalSec() {
return _pollIntervalSec;
}

public Map<String, Map<String, String>> getHealthCheckGrpcConnectionMap() {
return _healthCheckGrpcConnectionMap;
}


private GatewayServiceChannelConfig(int grpcServerPort, ChannelMode channelMode, ChannelType participantConnectionChannelType,
ChannelType shardStateChannelType, int serverHeartBeatInterval, int maxAllowedClientHeartBeatInterval,
int clientTimeout, boolean enableReflectionService, int pollIntervalSec) {
int clientTimeout, boolean enableReflectionService, int pollIntervalSec, Properties pollModeConfigs) {
_grpcServerPort = grpcServerPort;
_channelMode = channelMode;
_participantConnectionChannelType = participantConnectionChannelType;
Expand All @@ -114,6 +131,7 @@ private GatewayServiceChannelConfig(int grpcServerPort, ChannelMode channelMode,
_clientTimeout = clientTimeout;
_enableReflectionService = enableReflectionService;
_pollIntervalSec = pollIntervalSec;
_pollModeConfigs = pollModeConfigs;
}

public static class GatewayServiceProcessorConfigBuilder {
Expand All @@ -132,9 +150,8 @@ public static class GatewayServiceProcessorConfigBuilder {

// poll mode config
private int _pollIntervalSec = DEFAULT_POLL_INTERVAL_SEC;
// poll mode grpc client configs

// poll mode file configs
// poll mode config
private Properties _pollModeConfigs;


public GatewayServiceProcessorConfigBuilder setChannelMode(ChannelMode channelMode) {
Expand Down Expand Up @@ -183,6 +200,14 @@ public GatewayServiceProcessorConfigBuilder setPollIntervalSec(int pollIntervalS
return this;
}

public GatewayServiceProcessorConfigBuilder addPollModeConfig(fileBasedConfigType type, String value) {
if (_pollModeConfigs == null) {
_pollModeConfigs = new Properties();
}
_pollModeConfigs.put(type.toString(), value);
return this;
}

public void validate() {
if ((_participantConnectionChannelType == ChannelType.GRPC_SERVER
&& _shardStatenChannelType != ChannelType.GRPC_SERVER) || (
Expand All @@ -200,7 +225,7 @@ public GatewayServiceChannelConfig build() {
validate();
return new GatewayServiceChannelConfig(_grpcServerPort, _channelMode, _participantConnectionChannelType,
_shardStatenChannelType, _serverHeartBeatInterval, _maxAllowedClientHeartBeatInterval, _clientTimeout,
_enableReflectionService, _pollIntervalSec);
_enableReflectionService, _pollIntervalSec, _pollModeConfigs);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public static HelixGatewayServiceChannel createServiceChannel(GatewayServiceChan
return new HelixGatewayServiceGrpcService(manager, config);
}
} else {
return new HelixGatewayServicePollModeChannel(config);
return new HelixGatewayServicePollModeChannel(manager, config);
}
throw new IllegalArgumentException(
"Unsupported channel mode and type combination: " + config.getChannelMode() + " , "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
*/
public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.HelixGatewayServiceImplBase
implements HelixGatewayServiceChannel {
// create LOGGER
private static final Logger logger = LoggerFactory.getLogger(HelixGatewayServiceGrpcService.class);

// Map to store the observer for each instance
Expand Down Expand Up @@ -118,7 +117,7 @@ public void onCompleted() {
* The instance must already have established a connection to the gateway service.
*
* @param instanceName the instance name to send the message to
* @param message the message to convert to the transition message
* @param requests the state transition request to send
*/
@Override
public void sendStateChangeRequests(String instanceName, ShardChangeRequests requests) {
Expand All @@ -137,7 +136,7 @@ public void sendStateChangeRequests(String instanceName, ShardChangeRequests req
* @param errorReason error reason for close
*/
@Override
public void closeConnectionWithError(String instanceName, String errorReason) {
public void closeConnectionWithError(String clusterName, String instanceName, String errorReason) {
logger.info("Close connection for instance: {} with error reason: {}", instanceName, errorReason);
closeConnectionHelper(instanceName, errorReason, true);
}
Expand All @@ -147,7 +146,7 @@ public void closeConnectionWithError(String instanceName, String errorReason) {
* @param instanceName instance name
*/
@Override
public void completeConnection(String instanceName) {
public void completeConnection(String clusterName, String instanceName) {
logger.info("Complete connection for instance: {}", instanceName);
closeConnectionHelper(instanceName, null, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,217 @@
* under the License.
*/

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.File;
import java.io.IOException;
import java.time.Instant;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

// TODO: implement this class
import org.apache.helix.gateway.service.GatewayServiceManager;
import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.helix.gateway.util.PollChannelUtil.*;


/**
* A channel that communicates with the Helix Gateway Service in Poll mode.
*
*/
public class HelixGatewayServicePollModeChannel implements HelixGatewayServiceChannel {
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final Logger logger = LoggerFactory.getLogger(HelixGatewayServicePollModeChannel.class);

// cluster -> host -> liveness result
Map<String, Map<String, Boolean>> _livenessResults;

GatewayServiceChannelConfig _config;

public HelixGatewayServicePollModeChannel(GatewayServiceChannelConfig config) {
// cluster -> host -> endpoint for query liveness
// It is the file pass if _participantConnectionStatusChannelType is FILE, grpc endpoint if it is GRPC_CLIENT
Map<String, Map<String, String>> _healthCheckGrpcEndpointMap;

// cluster -> file for user to report shards' current states
String _userCurrentStateFilePath;

// cluster -> file path to store the shards' target states
String _targetStateFilePath;

private final GatewayServiceManager _manager;
ScheduledExecutorService _scheduler;

GatewayServiceChannelConfig.ChannelType _participantConnectionStatusChannelType;
GatewayServiceChannelConfig.ChannelType _shardStateChannelType;

public HelixGatewayServicePollModeChannel(GatewayServiceManager manager, GatewayServiceChannelConfig config) {
_manager = manager;
_config = config;
_scheduler = Executors.newSingleThreadScheduledExecutor();
_participantConnectionStatusChannelType = _config.getParticipantConnectionChannelType();
_shardStateChannelType = _config.getShardStateChannelType();
}

private void fetchUpdates() {
// 1. get the shard state change
Map<String, Map<String, Map<String, Map<String, String>>>> currentShardStates =
getChangedParticipantsCurrentState(_userCurrentStateFilePath);

Map<String, Map<String, Map<String, Map<String, String>>>> currentStateDiff = new HashMap<>();
for (String clusterName : currentShardStates.keySet()) {
currentStateDiff.put(clusterName,
_manager.getCache(clusterName).updateCacheWithNewCurrentStateAndGetDiff(currentShardStates.get(clusterName)));
}
for (String clusterName : currentStateDiff.keySet()) {
for (String instanceName : currentStateDiff.get(clusterName).keySet()) {
// if the instance is previously connected, send state change event
if (_livenessResults.get(clusterName) != null && _livenessResults.get(clusterName).get(instanceName)) {
logger.info("Host {} has state change, sending event to gateway manager", instanceName);
pushClientEventToGatewayManager(_manager,
StateTransitionMessageTranslateUtil.translateCurrentStateChangeToEvent(clusterName, instanceName,
currentStateDiff.get(clusterName).get(instanceName)));
}
}
}

// 2. fetch host health
for (String clusterName : _healthCheckGrpcEndpointMap.keySet()) {
// iterate through each instance
for (String instanceName : _healthCheckGrpcEndpointMap.get(clusterName).keySet()) {
// first check liveness endpoint, compare and send liveness change event, for init connection or close connection
boolean prevLiveness =
_livenessResults.get(clusterName) != null && _livenessResults.get(clusterName).get(instanceName);
boolean liveness = fetchInstanceLiveness(clusterName, instanceName);

if (prevLiveness && !liveness) {
logger.warn("Host {} is not healthy, sending event to gateway manager", instanceName);
pushClientEventToGatewayManager(_manager,
StateTransitionMessageTranslateUtil.translateClientCloseToEvent(clusterName, instanceName));
} else if (!prevLiveness && liveness) {
// init connection
logger.info("Host {} is newly connected, sending init connection event to gateway manager", instanceName);
pushClientEventToGatewayManager(_manager,
StateTransitionMessageTranslateUtil.translateCurrentStateDiffToInitConnectEvent(clusterName, instanceName,
currentStateDiff.containsKey(clusterName) ? currentStateDiff.get(clusterName).get(instanceName)
: new HashMap<>()));
}
_livenessResults.computeIfAbsent(clusterName, k -> new HashMap<>()).put(instanceName, liveness);
}
}
}

@Override
public void sendStateChangeRequests(String instanceName,
HelixGatewayServiceOuterClass.ShardChangeRequests shardChangeRequests) {
switch (_shardStateChannelType) {
case FILE:
break;
default:
throw new NotImplementedException("Only support file based channel for now");
}
}

@Override
public void start() throws IOException {

logger.info("Starting Helix Gateway Service Poll Mode Channel...");
final Runnable fetchUpdatesTask = new Runnable() {
@Override
public void run() {
fetchUpdates();
}
};
_scheduler.scheduleAtFixedRate(fetchUpdatesTask, 60, // init delay
_config.getPollIntervalSec(), // poll interval
TimeUnit.SECONDS);
}

@Override
public void stop() {

logger.info("Stopping Helix Gateway Service Poll Mode Channel...");
// Shutdown the scheduler gracefully when done (e.g., on app termination)
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
_scheduler.shutdown();
try {
if (!_scheduler.awaitTermination(1, TimeUnit.MINUTES)) {
_scheduler.shutdownNow();
}
} catch (InterruptedException e) {
_scheduler.shutdownNow();
}
}));
}

@Override
public void closeConnectionWithError(String instanceName, String reason) {

public void closeConnectionWithError(String clusterName, String instanceName, String reason) {
}

@Override
public void completeConnection(String instanceName) {
public void completeConnection(String clusterName, String instanceName) {
}

/**
* Read current state
*/
private Map<String, Map<String, Map<String, Map<String, String>>>> getChangedParticipantsCurrentState(
String userCurrentStateFilePath) {
Map<String, Map<String, Map<String, Map<String, String>>>> currentShardStates;
switch (_shardStateChannelType) {
case FILE:
// read current state from a file, compare with in memory current state, update the in memory current state and return diff.
// Current state file format: {"cluster1" : { "instance_1" : { "resource1" : {"shard1” : “online" }, "timestamp": "t1" }}}
try {
// read from file path
File file = new File(userCurrentStateFilePath);
currentShardStates = objectMapper.readValue(file,
new TypeReference<Map<String, Map<String, Map<String, Map<String, String>>>>>() {
});
} catch (IOException e) {
logger.warn("Failed to read from file: " + userCurrentStateFilePath, e);
return new HashMap<>();
}

// write target state
// TODO: for simplicity, we write the target state in the same thread. Later we can move it to a separate thread.
ObjectNode targetStateNode = new ObjectMapper().createObjectNode();
for (String clusterName : _manager.getCacheMap().keySet()) {
// add the json node to the target state node
targetStateNode.set(clusterName, _manager.getCache(clusterName).serializeTargetAssignmentsToJSON());
}
targetStateNode.set("timestamp", objectMapper.valueToTree(System.currentTimeMillis()));
flushAssignmentToFile(targetStateNode.toString(), _targetStateFilePath);
return currentShardStates;

default:
throw new NotImplementedException("Only support file based channel shard state for now");
}
}

private boolean fetchInstanceLiveness(String clusterName, String instanceName) {
String endpoint = _healthCheckGrpcEndpointMap.get(clusterName).get(instanceName);
switch (_participantConnectionStatusChannelType) {
case FILE:
try {
// read from file path
File file = new File(endpoint);
HostHealthState status = objectMapper.readValue(file, new TypeReference<HostHealthState>() {
});
return status.isHealthy() && (Instant.now().toEpochMilli() - status.getLastUpdatedTime()*1000) < 10000 ;
} catch (IOException e) {
logger.warn("Failed to read from file: " + endpoint, e);
return false;
}
default:
throw new NotImplementedException("Only support grpc based channel for now");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,15 +184,15 @@ public void onConnected(HelixManager helixManager) throws Exception {
@Override
public void onDisconnected(HelixManager helixManager, Throwable error) throws Exception {
_onDisconnectedCallback.run();
_gatewayServiceChannel.closeConnectionWithError(_helixManager.getInstanceName(),
_gatewayServiceChannel.closeConnectionWithError(_helixManager.getClusterName(), _helixManager.getInstanceName(),
error.getMessage());
}

public void disconnect() {
if (_helixManager.isConnected()) {
_helixManager.disconnect();
}
_gatewayServiceChannel.completeConnection(_helixManager.getInstanceName());
_gatewayServiceChannel.completeConnection(_helixManager.getClusterName(), _helixManager.getInstanceName());
}

public static class Builder {
Expand Down
Loading

0 comments on commit 86868f8

Please sign in to comment.