Skip to content

Commit

Permalink
Introduces dynamic filtering (#35)
Browse files Browse the repository at this point in the history
Filtering can be be specified dynamically through the use of a Tokio watch channel. Watch is known as "spmc" (Single Producer, Multi Consumer), and has the special property that it remembers the last value sent. If multiple producers were required then the watch could easily be composed within an mpsc (Multi Producer, Single Consumer).

The test illustrates how both an initial filter can be set up, and how it can then be subsequently replaced.
  • Loading branch information
huntc authored Oct 4, 2023
1 parent dfaf6ce commit 539a184
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 18 deletions.
101 changes: 83 additions & 18 deletions akka-projection-rs-grpc/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use prost_types::Timestamp;
use std::{future::Future, marker::PhantomData, ops::Range, pin::Pin};
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::watch;
use tokio_stream::Stream;
use tokio_stream::StreamExt;
use tonic::transport::Channel;
Expand All @@ -29,9 +30,9 @@ use crate::EventEnvelope;
use crate::StreamId;

pub struct GrpcSourceProvider<E, EP> {
consumer_filters: Option<watch::Receiver<Vec<FilterCriteria>>>,
delayer: Option<Delayer>,
event_producer_channel: EP,
initial_consumer_filter: Vec<proto::FilterCriteria>,
offset_store: mpsc::Sender<EntityMessage<offset_store::Command>>,
slice_range: Range<u32>,
stream_id: StreamId,
Expand Down Expand Up @@ -65,24 +66,21 @@ where
slice_range: Range<u32>,
) -> Self {
Self {
consumer_filters: None,
delayer: None,
event_producer_channel,
initial_consumer_filter: vec![],
offset_store,
slice_range,
stream_id,
phantom: PhantomData,
}
}

pub fn with_initial_consumer_filter(
pub fn with_consumer_filters(
mut self,
initial_consumer_filter: Vec<FilterCriteria>,
consumer_filters: watch::Receiver<Vec<FilterCriteria>>,
) -> Self {
self.initial_consumer_filter = initial_consumer_filter
.into_iter()
.map(|f| f.into())
.collect();
self.consumer_filters = Some(consumer_filters);
self
}
}
Expand Down Expand Up @@ -142,17 +140,49 @@ where
}
});

let stream_consumer_filters = self.consumer_filters.as_ref().cloned();

let consumer_filters = stream! {
if let Some(mut consumer_filters) = stream_consumer_filters {
while consumer_filters.changed().await.is_ok() {
let criteria: Vec<proto::FilterCriteria> = consumer_filters
.borrow()
.clone()
.into_iter()
.map(|c| c.into())
.collect();
yield proto::StreamIn {
message: Some(proto::stream_in::Message::Filter(proto::FilterReq {
criteria,
})),
};
}
} else {
futures::future::pending::<()>().await;
}
};

let request = Request::new(
tokio_stream::iter(vec![proto::StreamIn {
message: Some(proto::stream_in::Message::Init(proto::InitReq {
stream_id: self.stream_id.to_string(),
slice_min: self.slice_range.start as i32,
slice_max: self.slice_range.end as i32 - 1,
offset,
filter: self.initial_consumer_filter.clone(),
filter: self
.consumer_filters
.as_ref()
.map_or(vec![], |consumer_filters| {
consumer_filters
.borrow()
.clone()
.into_iter()
.map(|c| c.into())
.collect()
}),
})),
}])
.chain(tokio_stream::pending()),
.chain(consumer_filters),
);

let result = connection.events_by_slices(request).await;
Expand Down Expand Up @@ -267,7 +297,7 @@ mod tests {

use super::*;
use akka_persistence_rs::{EntityId, EntityType, PersistenceId};
use akka_projection_rs::consumer_filter::EntityIdOffset;
use akka_projection_rs::consumer_filter::{self, EntityIdOffset};
use async_stream::stream;
use chrono::{DateTime, Utc};
use prost_types::Any;
Expand All @@ -289,8 +319,36 @@ mod tests {

async fn events_by_slices(
&self,
_request: Request<Streaming<proto::StreamIn>>,
request: Request<Streaming<proto::StreamIn>>,
) -> Result<Response<Self::EventsBySlicesStream>, Status> {
let mut inner = request.into_inner();

let Some(Ok(proto::StreamIn {
message: Some(proto::stream_in::Message::Init(proto::InitReq { filter, .. })),
})) = inner.next().await
else {
return Err(Status::aborted("Expected the initial request"));
};

if filter.is_empty() {
return Err(Status::aborted(
"Expected the initial request to have a filter",
));
}

let Some(Ok(proto::StreamIn {
message: Some(proto::stream_in::Message::Filter(proto::FilterReq { criteria, .. })),
})) = inner.next().await
else {
return Err(Status::aborted("Expected the criteria to be updated"));
};

if criteria.is_empty() {
return Err(Status::aborted(
"Expected the filter request to have a filter",
));
}

let stream_event_time = self.event_time;
let stream_event_seen_by = self.event_seen_by.clone();
Ok(Response::new(Box::pin(stream!({
Expand Down Expand Up @@ -432,18 +490,25 @@ mod tests {
}
});

let (consumer_filters, consumer_filters_receiver) =
watch::channel(vec![FilterCriteria::IncludeEntityIds {
entity_id_offsets: vec![EntityIdOffset {
entity_id: entity_id.clone(),
seq_nr: 0,
}],
}]);

let channel = Channel::from_static("http://127.0.0.1:50051");
let mut source_provider = GrpcSourceProvider::<u32, _>::new(
|| channel.connect(),
StreamId::from("some-string-id"),
offset_store,
)
.with_initial_consumer_filter(vec![FilterCriteria::IncludeEntityIds {
entity_id_offsets: vec![EntityIdOffset {
entity_id: entity_id.clone(),
seq_nr: 0,
}],
}]);
.with_consumer_filters(consumer_filters_receiver);

assert!(consumer_filters
.send(vec![consumer_filter::exclude_all()])
.is_ok());

let mut tried = 0;

Expand Down
2 changes: 2 additions & 0 deletions akka-projection-rs/src/consumer_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
use akka_persistence_rs::EntityId;
use smol_str::SmolStr;

#[derive(Clone)]
pub struct EntityIdOffset {
pub entity_id: EntityId,
// If this is defined (> 0) events are replayed from the given
Expand All @@ -48,6 +49,7 @@ pub type TopicMatcher = SmolStr;
/// If an exclude criteria is matching the include criteria are evaluated.
/// If no matching include criteria the event is discarded.
/// If matching include criteria the event is emitted.
#[derive(Clone)]
pub enum FilterCriteria {
/// Exclude events with any of the given tags, unless there is a
/// matching include filter that overrides the exclude.
Expand Down

0 comments on commit 539a184

Please sign in to comment.