Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronize calls to StreamObserver methods #2934

Merged
merged 5 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ public static synchronized void carryOverPreviousCurrentState(HelixDataAccessor
continue;
}

// If the the current state is related to tasks, there is no need to carry it over to new session.
// If the current state is related to tasks, there is no need to carry it over to new session.
// Note: this check is not necessary due to TaskCurrentStates, but keep it for backwards compatibility
if (stateModelDefRef.equals(TaskConstants.STATE_MODEL_NAME)) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,15 @@ public void onNext(ShardStateMessage request) {
@Override
public void onError(Throwable t) {
logger.info("Receive on error, reason: {} message: {}", Status.fromThrowable(t).getCode(), t.getMessage());
// Notify the gateway manager that the client is closed
Pair<String, String> instanceInfo = _reversedObserverMap.get(responseObserver);
onClientClose(instanceInfo.getRight(), instanceInfo.getLeft());
}

@Override
public void onCompleted() {
logger.info("Receive on complete message");
// Notify the gateway manager that the client is closed
Pair<String, String> instanceInfo = _reversedObserverMap.get(responseObserver);
onClientClose(instanceInfo.getRight(), instanceInfo.getLeft());
}
Expand All @@ -119,12 +121,19 @@ public void onCompleted() {
*/
@Override
public void sendStateChangeRequests(String instanceName, ShardChangeRequests requests) {
StreamObserver<ShardChangeRequests> observer = _observerMap.get(instanceName);
if (observer!= null) {
observer.onNext(requests);
} else {
logger.error("Instance {} is not connected to the gateway service", instanceName);
}
_lockRegistry.withLock(instanceName, () -> {
StreamObserver<ShardChangeRequests> observer = _observerMap.get(instanceName);

// If observer is null, this means that the connection is already closed and
// we should not send a ShardChangeRequest
if (observer != null) {
observer.onNext(requests);
} else {
logger.error("Instance {} is not connected to the gateway service", instanceName);
// If the observer is null, we should remove the lock, so we don't keep unnecessary locks
_lockRegistry.removeLock(instanceName);
}
});
}

/**
Expand All @@ -149,30 +158,39 @@ public void completeConnection(String clusterName, String instanceName) {
}

private void closeConnectionHelper(String instanceName, String errorReason, boolean withError) {
StreamObserver<ShardChangeRequests> observer = _observerMap.get(instanceName);
if (observer != null) {
if (withError) {
observer.onError(Status.UNAVAILABLE.withDescription(errorReason).asRuntimeException());
} else {
observer.onCompleted();
_lockRegistry.withLock(instanceName, () -> {
StreamObserver<ShardChangeRequests> observer = _observerMap.get(instanceName);

// If observer is null, this means that the connection is already closed and
// we should not try and close it again.
if (observer != null) {
// Depending on whether the connection is closed with error, send different status
if (withError) {
observer.onError(Status.UNAVAILABLE.withDescription(errorReason).asRuntimeException());
} else {
observer.onCompleted();
}

// Clean up the observer and lock
_reversedObserverMap.remove(_observerMap.get(instanceName));
_observerMap.remove(instanceName);
}
}

// We always remove the lock after the connection is closed regardless of if observer is null or not
_lockRegistry.removeLock(instanceName);
});
}

private void onClientClose(String clusterName, String instanceName) {
if (instanceName == null || clusterName == null) {
// TODO: log error;
logger.error("Cluster: {} or instance: {} is null while handling onClientClose", clusterName,
instanceName);
return;
}
logger.info("Client close connection for instance: {}", instanceName);
GatewayServiceEvent event =
StateTransitionMessageTranslateUtil.translateClientCloseToEvent(clusterName, instanceName);
pushClientEventToGatewayManager(_manager, event);
_lockRegistry.withLock(instanceName, () -> {
_reversedObserverMap.remove(_observerMap.get(instanceName));
_observerMap.remove(instanceName);
_lockRegistry.removeLock(instanceName);
});
}

private void updateObserver(String instanceName, String clusterName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public HelixGatewayServicePollModeChannel(GatewayServiceManager manager, Gateway
* 1. Get the diff of previous and current shard states, and send the state change event to the gateway manager.
* 2. Compare previous liveness and current liveness, and send the connection event to the gateway manager.
*/
protected void fetchUpdates() {
protected void fetchUpdates() {
// 1. get the shard state change
Map<String, Map<String, Map<String, Map<String, String>>>> currentShardStates =
getChangedParticipantsCurrentState(_userCurrentStateFilePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public static Map<String, Map<String, Map<String, Map<String, String>>>> readCur
new TypeReference<Map<String, Map<String, Map<String, Map<String, String>>>>>() {
});
} catch (IOException e) {
logger.warn("Failed to read from file: " + filePath);
logger.warn("Failed to read from file: " + filePath, e);
return new HashMap<>();
}
}
Expand All @@ -107,7 +107,7 @@ public static boolean readInstanceLivenessStatusFromFile(String filePath, int ti
});
return status.isHealthy() && (System.currentTimeMillis()/1000 - status.getLastUpdatedTime()) < timeoutInSec;
} catch (IOException e) {
logger.warn("Failed to read from file: " + filePath);
logger.warn("Failed to read from file: " + filePath, e);
return false;
}
}
Expand Down
Loading