-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introducing gRPC #13
Introducing gRPC #13
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking good
@@ -37,6 +40,8 @@ test-log = "0.2.11" | |||
tokio = "1.23.0" | |||
tokio-stream = "0.1.14" | |||
tokio-util = "0.7.4" | |||
tonic = "0.9.2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@olofwalker Is this the Rust gRPC library you mentioned?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So far so good with Tonic. It appears to be quite fully featured. It'll be interesting to benchmark its memory performance with our source provider in particular.
The gRPC machinery is introduced. A small enhancement to the streambed storage based projection is also made so that all types of offset can be processed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
self.delayer = Some(delayer); | ||
} | ||
None | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand the code here (I'm missing some Rust thing). So if the connection is not successful it will be delayed for a while. Then I would expect some kind of retry, but where is that retry?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The caller (the projection task) will call it again.
let seen = offset.seen.iter().flat_map(|pis| pis.persistence_id.parse().ok().map(|pid|(pid, pis.seq_nr as u64))).collect(); | ||
let Some(payload) = streamed_event.payload else { continue }; | ||
if !payload.type_url.starts_with("type.googleapis.com/") { | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will that be a clear error message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've got no error messages in there right now... Would you like a log?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can fail the projection. It would retry over and over, and get stuck on that, but that's the same as if there was a serialization problem.
) -> Pin<Box<dyn Stream<Item = Self::Envelope> + Send + 'async_trait>> | ||
where | ||
F: Fn() -> FR + Send + Sync, | ||
FR: Future<Output = Option<Offset>> + Send, | ||
{ | ||
todo!() | ||
let connection = if let Ok(connection) = | ||
EventProducerServiceClient::connect(self.event_producer_addr.clone()).await |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took a look at that a while back, but just implemented it myself for Streambed as it wasn't hard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, now that I think about it, it is important to return out of the source
method on failure so we get the opportunity to stop the projection by processing its Stop commands.
The gRPC machinery is introduced and has been successfully tested with the new JVM-based IoT example at https://github.com/akka/akka-projection/tree/main/samples/grpc/iot-service-scala.
Filters are not presently supported.
The IoT example has been enhanced to operate either locally, or by connecting to a gRPC service. Please see the example's README for instructions at the bottom.
A small enhancement to the streambed storage based projection is also made so that all types of offset can be processed.
ProjectionId
can now also be parsed from a string.