Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introducing gRPC #13

Merged
merged 2 commits into from
Aug 29, 2023
Merged

Introducing gRPC #13

merged 2 commits into from
Aug 29, 2023

Conversation

huntc
Copy link
Collaborator

@huntc huntc commented Aug 25, 2023

The gRPC machinery is introduced and has been successfully tested with the new JVM-based IoT example at https://github.com/akka/akka-projection/tree/main/samples/grpc/iot-service-scala.

Filters are not presently supported.

The IoT example has been enhanced to operate either locally, or by connecting to a gRPC service. Please see the example's README for instructions at the bottom.

A small enhancement to the streambed storage based projection is also made so that all types of offset can be processed. ProjectionId can now also be parsed from a string.

@huntc huntc added the enhancement New feature or request label Aug 25, 2023
@huntc huntc self-assigned this Aug 25, 2023
Copy link
Member

@patriknw patriknw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking good

@@ -37,6 +40,8 @@ test-log = "0.2.11"
tokio = "1.23.0"
tokio-stream = "0.1.14"
tokio-util = "0.7.4"
tonic = "0.9.2"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@olofwalker Is this the Rust gRPC library you mentioned?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far so good with Tonic. It appears to be quite fully featured. It'll be interesting to benchmark its memory performance with our source provider in particular.

akka-projection-rs-grpc/src/consumer.rs Outdated Show resolved Hide resolved
The gRPC machinery is introduced.

A small enhancement to the streambed storage based projection is also made so that all types of offset can be processed.
@huntc huntc marked this pull request as ready for review August 29, 2023 06:57
Copy link
Member

@patriknw patriknw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

self.delayer = Some(delayer);
}
None
};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand the code here (I'm missing some Rust thing). So if the connection is not successful it will be delayed for a while. Then I would expect some kind of retry, but where is that retry?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The caller (the projection task) will call it again.

let seen = offset.seen.iter().flat_map(|pis| pis.persistence_id.parse().ok().map(|pid|(pid, pis.seq_nr as u64))).collect();
let Some(payload) = streamed_event.payload else { continue };
if !payload.type_url.starts_with("type.googleapis.com/") {
break
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will that be a clear error message?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've got no error messages in there right now... Would you like a log?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can fail the projection. It would retry over and over, and get stuck on that, but that's the same as if there was a serialization problem.

@huntc huntc merged commit 339bfc0 into main Aug 29, 2023
1 check passed
@huntc huntc deleted the grpc branch August 29, 2023 07:30
) -> Pin<Box<dyn Stream<Item = Self::Envelope> + Send + 'async_trait>>
where
F: Fn() -> FR + Send + Sync,
FR: Future<Output = Option<Offset>> + Send,
{
todo!()
let connection = if let Ok(connection) =
EventProducerServiceClient::connect(self.event_producer_addr.clone()).await

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://crates.io/crates/tokio-retry ?

I took a look at that a while back, but just implemented it myself for Streambed as it wasn't hard.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, now that I think about it, it is important to return out of the source method on failure so we get the opportunity to stop the projection by processing its Stop commands.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants