-
Notifications
You must be signed in to change notification settings - Fork 120
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BUG] Requests with Grpc Reactive Client are completed at the end of the flow #222
Comments
Thanks for pointing this out @jorgerod. It's a great example of some subtle and unintuitive interaction between gRPC and Rx. In gRPC, unary blocking RPC is a lie. The only part about gRPC that understands unary blocking RPC is the topmost layer of generated client stub. Below that, everything in gRPC is streaming and asynchronous. A unary blocking request is really just a request and response stream of exactly one item, where the client blocks until exactly one response message or error is received. Similarly, a In Reactor and streaming gRPC, message production and stream termination are separate actions, with separate timelines. By design, reactive operators like With that background out of the way, here's what's happening in your example, and why it is the "correct" behavior from a reactive perspective. I assume Mono.just(new FindRequest(productId, storeId)) //<1>
.transform(productStub::findById) //<2>
.map(product -> { //<3>
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return product.getName();
})
.subscribe(System.out::println); //<4>
TL;DR, the behavior you are seeing is the expected behavior for a reactive system because reactive doesn't follow traditional imperative method call semantics. |
Hello First of all, thank you very much for your answer. I have continued to review the issue in more depth and have tested the behavior in the following cases:
Grpc client Streaming:With the example below, I tried streaming. @Autowired
private GreeterGrpc.GreeterStub greeterStreamingService;
...
greeterStreamingService.sayHelloRespStream(request, new StreamObserver<>() {
@Override
public void onNext(HelloResponse value) {
Span mySpan = tracer.buildSpan("my-span").start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
mySpan.finish();
}
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
LOG.info("Finish");
}
}); I agree with you, the span should be closed when the onComplete event is emitted because you don't know the number of messages. Traces created are the following: Grpc client with future stub @Autowired
private GreeterGrpc.GreeterFutureStub greeterFutureService;
....
Futures.addCallback(greeterFutureService.sayHello(request), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable HelloResponse helloResponse) {
Span mySpan = tracer.buildSpan("my-span").start(); //<1>
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
mySpan.finish();
}
}
@Override
public void onFailure(Throwable throwable) {
}
}, MoreExecutors.directExecutor());
As you can see in the following image, grpc-client span is closed when the response is received and in the callback it opens my-span <1>. I think it's the right behavior Grpc client reactiveFinally, the reactive flavour @Autowired
private ReactorGreeterGrpc.ReactorGreeterStub greeterReactiveService;
...
return greeterReactiveService.sayHello(request)
.map(helloResponse -> {
Span mySpan = tracer.buildSpan("my-span").start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
mySpan.finish();
}
return helloResponse.getMessage();
}); I think the behavior is incorrect and should be similar to Future Stub. @rmichela What do you think? Thank you very much |
Hi @rmichela What do you think? This topic is very important to us. Trank you |
I'm really on the fence about this issue because it changes the internal behavior of reactive-grpc. Do you have an example for how this would look in code? |
Describe the Bug
Requests with Reactive Grpc Client (integration with reactor) are completed at the end of the flow and not when the response to that request is obtained.
This treatment causes the incorrect behaviour of the instrumentation with other components (metrics or traces).
In this example, as the call to the onComplete function is made to the end of flow.
Brave tracing instrumentation or metrics instrumentation are incorrect because the grpc request time is the sum of the grpc client operation <1> plus the map operation <2>
Expected Behaviour
Method OnComplete should be called when the grpcClient request ends and not when the flow ends.
How do you see it?
The text was updated successfully, but these errors were encountered: