Skip to content

Commit

Permalink
Expose setting gateway service channel to allow external managment of…
Browse files Browse the repository at this point in the history
… the lifecycle of the channel. (#2913)

Expose setting gateway service channel to allow external managment of the lifecycle of the channel.
  • Loading branch information
zpinto authored Sep 12, 2024
1 parent 97a9c48 commit 11b34db
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli
private final GatewayServiceManager _manager;

// A fine grain lock register on instance level
private final PerKeyLockRegistry _lockRegistry;
private final PerKeyLockRegistry _lockRegistry = new PerKeyLockRegistry();;

private final GatewayServiceChannelConfig _config;

Expand All @@ -69,7 +69,6 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli
public HelixGatewayServiceGrpcService(GatewayServiceManager manager, GatewayServiceChannelConfig config) {
_manager = manager;
_config = config;
_lockRegistry = new PerKeyLockRegistry();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,35 +57,52 @@ public class GatewayServiceManager {
private final ExecutorService _participantStateTransitionResultUpdator;

// link to grpc service
private final HelixGatewayServiceChannel _gatewayServiceChannel;
private HelixGatewayServiceChannel _gatewayServiceChannel;

// a per key executor for connection event. All event for the same instance will be executed in sequence.
// It is used to ensure for each instance, the connect/disconnect event won't start until the previous one is done.
private final PerKeyBlockingExecutor _connectionEventProcessor;

private final GatewayServiceChannelConfig _gatewayServiceChannelConfig;

private final Map<String, GatewayCurrentStateCache> _currentStateCacheMap;

public GatewayServiceManager(String zkAddress, GatewayServiceChannelConfig gatewayServiceChannelConfig) {
public GatewayServiceManager(String zkAddress) {
_helixGatewayParticipantMap = new ConcurrentHashMap<>();
_zkAddress = zkAddress;
_participantStateTransitionResultUpdator = Executors.newSingleThreadExecutor();
_gatewayServiceChannel = HelixGatewayServiceChannelFactory.createServiceChannel(gatewayServiceChannelConfig, this);
_connectionEventProcessor =
new PerKeyBlockingExecutor(CONNECTION_EVENT_THREAD_POOL_SIZE); // todo: make it configurable
_gatewayServiceChannelConfig = gatewayServiceChannelConfig;
_currentStateCacheMap = new HashMap<>();
}

public GatewayServiceManager(String zkAddress, GatewayServiceChannelConfig gatewayServiceChannelConfig) {
this(zkAddress);
_gatewayServiceChannel = HelixGatewayServiceChannelFactory.createServiceChannel(gatewayServiceChannelConfig, this);
}

/**
* Set the gateway service channel. This can only be called once.
* The channel is used to send state transition message to the participant.
*
* @param channel the gateway service channel
* @throws IllegalStateException if the channel is already set
*/
public void setGatewayServiceChannel(HelixGatewayServiceChannel channel) {
if (_gatewayServiceChannel != null) {
_gatewayServiceChannel.stop();
return;
}
throw new IllegalStateException(
"Gateway service channel is already set, it can only be set once.");
}

/**
* Process the event from Grpc service and dispatch to async executor for processing.
*
* @param event
*/
public void onGatewayServiceEvent(GatewayServiceEvent event) {
if (event.getEventType().equals(GatewayServiceEventType.UPDATE)) {
_participantStateTransitionResultUpdator.submit(new shardStateUpdator(event));
_participantStateTransitionResultUpdator.submit(new ShardStateUpdator(event));
} else {
_connectionEventProcessor.offerEvent(event.getInstanceName(), new ParticipantConnectionProcessor(event));
}
Expand Down Expand Up @@ -117,11 +134,11 @@ public String serializeTargetState() {
/**
* Update in memory shard state
*/
class shardStateUpdator implements Runnable {
class ShardStateUpdator implements Runnable {

private final GatewayServiceEvent _event;

private shardStateUpdator(GatewayServiceEvent event) {
private ShardStateUpdator(GatewayServiceEvent event) {
_event = event;
}

Expand Down Expand Up @@ -162,15 +179,19 @@ public void run() {
}
}

public void stopManager() {
_connectionEventProcessor.shutdown();
_participantStateTransitionResultUpdator.shutdown();
_helixGatewayParticipantMap.clear();
}

public void startService() throws IOException {
_gatewayServiceChannel.start();
}

public void stopService() {
_gatewayServiceChannel.stop();
_connectionEventProcessor.shutdown();
_participantStateTransitionResultUpdator.shutdown();
_helixGatewayParticipantMap.clear();
stopManager();
}

private void createHelixGatewayParticipant(String clusterName, String instanceName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.tuple.ImmutablePair;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
import org.apache.helix.gateway.base.HelixGatewayTestBase;
import org.apache.helix.gateway.channel.GatewayServiceChannelConfig;
import org.apache.helix.gateway.api.constant.GatewayServiceEventType;
import org.apache.helix.gateway.channel.HelixGatewayServiceChannelFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
import proto.org.apache.helix.gateway.HelixGatewayServiceGrpc;
Expand All @@ -40,7 +43,7 @@ public class TestGatewayServiceConnection extends HelixGatewayTestBase {
CountDownLatch disconnectLatch = new CountDownLatch(1);

@Test
public void TestLivenessDetection() throws IOException, InterruptedException {
public void testLivenessDetection() throws IOException, InterruptedException {
// start the gateway service
GatewayServiceChannelConfig config =
new GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder().setGrpcServerPort(50051).build();
Expand Down Expand Up @@ -136,7 +139,10 @@ public void shutdownByClosingConnection() {
class DummyGatewayServiceManager extends GatewayServiceManager {

public DummyGatewayServiceManager(GatewayServiceChannelConfig gatewayServiceChannelConfig) {
super("dummyZkAddress", gatewayServiceChannelConfig);
super("dummyZkAddress");
this.setGatewayServiceChannel(
HelixGatewayServiceChannelFactory.createServiceChannel(gatewayServiceChannelConfig,
this));
}

@Override
Expand Down

0 comments on commit 11b34db

Please sign in to comment.