Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronize calls to StreamObserver methods #2934

Merged
merged 5 commits into from
Oct 8, 2024

Conversation

zpinto
Copy link
Contributor

@zpinto zpinto commented Oct 2, 2024

Issues

  • After investigation, we were seeing grpc failures related to concurrent invocations of onNext. onNext, onError, and onComplete are not thread safe so we need to synchronize invocations to these.

Tests

NA

Changes that Break Backward Compatibility (Optional)

NA

Documentation (Optional)

NA

Commits

  • My commits all reference appropriate Apache Helix GitHub issues in their subject lines. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters (not including Jira issue reference)
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

Code Quality

  • My diff has been formatted using helix-style.xml
    (helix-style-intellij.xml if IntelliJ IDE is used)

Copy link
Contributor

@junkaixue junkaixue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if concurrent call happens? Will client missed one call? Or it messed up the order?

@zpinto
Copy link
Contributor Author

zpinto commented Oct 2, 2024

@junkaixue If concurrent call happens, one of the messages will fail to send and the state transition will result in ERROR. When multiple state transition threads call sendStateChangeRequests which call onNext we see the following issue as both invocations of onNext first try to sendHeaders.

2024/10/01 10:15:12.542 ERROR [HelixStateTransitionHandler] [HelixTaskExecutor-message_handle_thread_25] [helix-gateway] [] Exception while executing a state transition task test_resource_1
java.lang.reflect.InvocationTargetException: null
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
        at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
        at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
        at org.apache.helix.messaging.handling.HelixStateTransitionHandler.invoke(HelixStateTransitionHandler.java:351) ~[org.apache.helix.helix-gateway-1.4.2-gateway-dev-202409270710.jar:1.4.2-gateway-dev-202409270710]
        at org.apache.helix.messaging.handling.HelixStateTransitionHandler.handleMessage(HelixStateTransitionHandler.java:279) [org.apache.helix.helix-gateway-1.4.2-gateway-dev-202409270710.jar:1.4.2-gateway-dev-202409270710]
        at org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:97) [org.apache.helix.helix-gateway-1.4.2-gateway-dev-202409270710.jar:1.4.2-gateway-dev-202409270710]
        at org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:49) [org.apache.helix.helix-gateway-1.4.2-gateway-dev-202409270710.jar:1.4.2-gateway-dev-202409270710]
        at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
        at java.lang.Thread.run(Thread.java:833) [?:?]
Caused by: java.lang.IllegalStateException: sendHeaders has already been called
        at com.google.common.base.Preconditions.checkState(Preconditions.java:512) ~[com.google.guava.guava-33.0.0-jre.jar:?]
        at io.grpc.internal.ServerCallImpl.sendHeadersInternal(ServerCallImpl.java:109) ~[io.grpc.grpc-core-1.59.1.jar:1.59.1]
        at io.grpc.internal.ServerCallImpl.sendHeaders(ServerCallImpl.java:104) ~[io.grpc.grpc-core-1.59.1.jar:1.59.1]
        at io.grpc.PartialForwardingServerCall.sendHeaders(PartialForwardingServerCall.java:38) ~[io.grpc.grpc-api-1.59.1.jar:1.59.1]
        at io.grpc.ForwardingServerCall.sendHeaders(ForwardingServerCall.java:22) ~[io.grpc.grpc-api-1.59.1.jar:1.59.1]
        at io.grpc.ForwardingServerCall$SimpleForwardingServerCall.sendHeaders(ForwardingServerCall.java:44) ~[io.grpc.grpc-api-1.59.1.jar:1.59.1]
        at io.grpc.PartialForwardingServerCall.sendHeaders(PartialForwardingServerCall.java:38) ~[io.grpc.grpc-api-1.59.1.jar:1.59.1]
        at io.grpc.ForwardingServerCall.sendHeaders(ForwardingServerCall.java:22) ~[io.grpc.grpc-api-1.59.1.jar:1.59.1]
        at io.grpc.ForwardingServerCall$SimpleForwardingServerCall.sendHeaders(ForwardingServerCall.java:44) ~[io.grpc.grpc-api-1.59.1.jar:1.59.1]
        at com.linkedin.grpc.interceptor.ic.ServerICInterceptor$1.lambda$sendHeaders$0(ServerICInterceptor.java:88) ~[com.linkedin.grpc-infra.si-grpc-impl-38.1.291.jar:?]
        at com.linkedin.container.servicecall.internal.ICFilterHelperImpl.serverResponse(ICFilterHelperImpl.java:168) ~[com.linkedin.container.container-servicecall-filter-impl-38.12.49.jar:?]
        at com.linkedin.grpc.interceptor.ic.ServerICInterceptor$1.sendHeaders(ServerICInterceptor.java:78) ~[com.linkedin.grpc-infra.si-grpc-impl-38.1.291.jar:?]
        at io.grpc.PartialForwardingServerCall.sendHeaders(PartialForwardingServerCall.java:38) ~[io.grpc.grpc-api-1.59.1.jar:1.59.1]
        at io.grpc.ForwardingServerCall.sendHeaders(ForwardingServerCall.java:22) ~[io.grpc.grpc-api-1.59.1.jar:1.59.1]
        at io.grpc.ForwardingServerCall$SimpleForwardingServerCall.sendHeaders(ForwardingServerCall.java:44) ~[io.grpc.grpc-api-1.59.1.jar:1.59.1]
        at com.linkedin.grpc.interceptor.trace.ServerMessageTraceInterceptor$1.sendHeaders(ServerMessageTraceInterceptor.java:119) ~[com.linkedin.container.grpc-impl-38.12.49.jar:?]
        at io.grpc.PartialForwardingServerCall.sendHeaders(PartialForwardingServerCall.java:38) ~[io.grpc.grpc-api-1.59.1.jar:1.59.1]
        at io.grpc.ForwardingServerCall.sendHeaders(ForwardingServerCall.java:22) ~[io.grpc.grpc-api-1.59.1.jar:1.59.1]
        at io.grpc.ForwardingServerCall$SimpleForwardingServerCall.sendHeaders(ForwardingServerCall.java:44) ~[io.grpc.grpc-api-1.59.1.jar:1.59.1]
        at io.grpc.PartialForwardingServerCall.sendHeaders(PartialForwardingServerCall.java:38) ~[io.grpc.grpc-api-1.59.1.jar:1.59.1]
        at io.grpc.ForwardingServerCall.sendHeaders(ForwardingServerCall.java:22) ~[io.grpc.grpc-api-1.59.1.jar:1.59.1]
        at io.grpc.ForwardingServerCall$SimpleForwardingServerCall.sendHeaders(ForwardingServerCall.java:44) ~[io.grpc.grpc-api-1.59.1.jar:1.59.1]
        at io.grpc.PartialForwardingServerCall.sendHeaders(PartialForwardingServerCall.java:38) ~[io.grpc.grpc-api-1.59.1.jar:1.59.1]
        at io.grpc.ForwardingServerCall.sendHeaders(ForwardingServerCall.java:22) ~[io.grpc.grpc-api-1.59.1.jar:1.59.1]
        at io.grpc.ForwardingServerCall$SimpleForwardingServerCall.sendHeaders(ForwardingServerCall.java:44) ~[io.grpc.grpc-api-1.59.1.jar:1.59.1]
        at io.grpc.PartialForwardingServerCall.sendHeaders(PartialForwardingServerCall.java:38) ~[io.grpc.grpc-api-1.59.1.jar:1.59.1]
        at io.grpc.ForwardingServerCall.sendHeaders(ForwardingServerCall.java:22) ~[io.grpc.grpc-api-1.59.1.jar:1.59.1]
        at io.grpc.ForwardingServerCall$SimpleForwardingServerCall.sendHeaders(ForwardingServerCall.java:44) ~[io.grpc.grpc-api-1.59.1.jar:1.59.1]
        at io.grpc.PartialForwardingServerCall.sendHeaders(PartialForwardingServerCall.java:38) ~[io.grpc.grpc-api-1.59.1.jar:1.59.1]
        at io.grpc.ForwardingServerCall.sendHeaders(ForwardingServerCall.java:22) ~[io.grpc.grpc-api-1.59.1.jar:1.59.1]
        at io.grpc.ForwardingServerCall$SimpleForwardingServerCall.sendHeaders(ForwardingServerCall.java:44) ~[io.grpc.grpc-api-1.59.1.jar:1.59.1]
        at io.grpc.PartialForwardingServerCall.sendHeaders(PartialForwardingServerCall.java:38) ~[io.grpc.grpc-api-1.59.1.jar:1.59.1]
        at io.grpc.ForwardingServerCall.sendHeaders(ForwardingServerCall.java:22) ~[io.grpc.grpc-api-1.59.1.jar:1.59.1]
        at io.grpc.ForwardingServerCall$SimpleForwardingServerCall.sendHeaders(ForwardingServerCall.java:44) ~[io.grpc.grpc-api-1.59.1.jar:1.59.1]
        at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:377) ~[io.grpc.grpc-stub-1.59.1.jar:1.59.1]
        at org.apache.helix.gateway.channel.HelixGatewayServiceGrpcService.sendStateChangeRequests(HelixGatewayServiceGrpcService.java:124) ~[org.apache.helix.helix-gateway-1.4.2-gateway-dev-202409270710.jar:1.4.2-gateway-dev-202409270710]
        at org.apache.helix.gateway.participant.HelixGatewayParticipant.processStateTransitionMessage(HelixGatewayParticipant.java:84) ~[org.apache.helix.helix-gateway-1.4.2-gateway-dev-202409270710.jar:1.4.2-gateway-dev-202409270710]
        at org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModel.genericStateTransitionHandler(HelixGatewayMultiTopStateStateModel.java:47) ~[org.apache.helix.helix-gateway-1.4.2-gateway-dev-202409270710.jar:1.4.2-gateway-dev-202409270710]
        ... 12 more

@junkaixue
Copy link
Contributor

Just want to make sure the behavior is right. Because once the synchronization happens, not sure whether the locking can lead any ordering problem.

@zpinto
Copy link
Contributor Author

zpinto commented Oct 3, 2024

@junkaixue I have opted to use lockRegistry to synchronize calls to onNext, onError, and onComplete in addition to moving modifications and gets to _observerMap and _reverseObserverMap into the related critical sections. This will ensure that onNext is never called after a connection is closed and we never attempt to close a connection more than once.

It is okay if one thread closes connection before another thread sends the StateChangeRequest. In this case, the message will not be sent and the state transition will not be processed.

Copy link
Contributor

@junkaixue junkaixue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logics looks good. But make sure the lock retrieve logic is not get or create. Then the lock will come back.

@zpinto
Copy link
Contributor Author

zpinto commented Oct 4, 2024

The retrieve lock logic is getOrCreate. This is okay because we remove the lock in sendStateChangeRequests if the get from the _observerMap returns null. If it returns null, we know that the connection has been closed and we can safely remove the lock.

Same for closeConnectionHelper, we always remove the lock in this method.

It is unlikely that closeConnectionHelper gets called concurrently or sendStateChangeRequests gets called after the connection is closed, so it should be unlikely that we create the lock and also remove it in a single call to one of these methods. In the rare case that this happens, we will remove the lock to ensure we don't keep the lock around if it isn't needed.

@zpinto
Copy link
Contributor Author

zpinto commented Oct 7, 2024

All the failing tests are flaky:
#2925 (@GrantPSpencer fixed but we have not rebased off master yet)
#2795
#2906

This PR is ready to be merged.

Final Commit Message:

Fix concurrent invocations of StreamObserver methods
- Synchronized calls to onNext, onError, and onComplete to prevent grpc failures
- Ensured thread safety for StreamObserver method invocations

@xyuanlu xyuanlu merged commit 2439837 into apache:helix-gateway-service Oct 8, 2024
1 of 2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants