Skip to content

Commit

Permalink
Consumer supplied filters when producing, along with a producer filter (
Browse files Browse the repository at this point in the history
#36)

* Consumer supplied filters when producing

When producing from the edge, the consumer may also supply filters. The changes here implement that by providing a watch channel for the upstream gRPC producer.

* Possibly better name for flowing gRPC events

* Beginnings of fleshing out the filtering further

Note quite working yet.

* Replace WithEntityId and streamlined EventType usage

By conveying a PersistenceId instead of an EntityId for envelopes, we can reduce the places where we need to declare the event type.

* Includes a test for the consumer filter

* Makes the filter update-able as per the JVM

* Doc

* Provides a producer filter

* Formatting

* Correctly merge criteria into filters

* Filter tests

* Use more refs - reduces the clones

* Improved bench

Was previously running the stream only once

* Improved some names

* Restrict the max size of a filter's fields

We should always endeavour to avoid running out of heap

* Improve the filter dx

* More DX tidy-up

* MQTT protocol is now hidden

The MQTT TopicFilter is hidden to the outside so that we can freely change it, as we're not necessarily supporting all of MQTT's capabilities.

* Filter by entity ids for edge based consumers
  • Loading branch information
huntc authored Oct 10, 2023
1 parent 539a184 commit f12f526
Show file tree
Hide file tree
Showing 15 changed files with 778 additions and 123 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ itoa = "1.0"
js-sys = "0.3.60"
log = "0.4.17"
lru = "0.11.0"
mqtt-protocol = "0.11.2"
postcard = { version = "1.0.6", default-features = false }
prost = { version = "0.12.0" }
prost-build = { version = "0.12.0" }
prost-types = { version = "0.12.0" }
rand = "0.8"
regex = "1.9.6"
scopeguard = "1.1"
serde = "1.0.151"
serde_json = "1.0.107"
Expand Down
65 changes: 34 additions & 31 deletions akka-persistence-rs-commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

use akka_persistence_rs::{
entity_manager::{EventEnvelope as EntityManagerEventEnvelope, Handler, SourceProvider},
EntityId, Offset, TimestampOffset, WithEntityId, WithOffset, WithSeqNr, WithTimestampOffset,
EntityId, EntityType, Offset, PersistenceId, Tag, TimestampOffset, WithOffset,
WithPersistenceId, WithSeqNr, WithTags, WithTimestampOffset,
};
use async_stream::stream;
use async_trait::async_trait;
Expand All @@ -19,39 +20,22 @@ use streambed::{
use tokio_stream::{Stream, StreamExt};

/// An envelope wraps a commit log event associated with a specific entity.
/// Tags are not presently considered useful at the edge. A remote consumer would be interested
/// in all events from the edge in most cases, and the edge itself decides what to publish
/// (producer defined filter).
#[derive(Clone, Debug, PartialEq)]
pub struct EventEnvelope<E> {
pub entity_id: EntityId,
pub persistence_id: PersistenceId,
pub seq_nr: u64,
pub timestamp: DateTime<Utc>,
pub event: E,
pub offset: CommitLogOffset,
pub tags: Vec<Tag>,
}

impl<E> EventEnvelope<E> {
pub fn new<EI>(
entity_id: EI,
seq_nr: u64,
timestamp: DateTime<Utc>,
event: E,
offset: CommitLogOffset,
) -> Self
where
EI: Into<EntityId>,
{
Self {
entity_id: entity_id.into(),
seq_nr,
timestamp,
event,
offset,
}
}
}

impl<E> WithEntityId for EventEnvelope<E> {
fn entity_id(&self) -> EntityId {
self.entity_id.clone()
impl<E> WithPersistenceId for EventEnvelope<E> {
fn persistence_id(&self) -> &PersistenceId {
&self.persistence_id
}
}

Expand All @@ -67,6 +51,12 @@ impl<E> WithSeqNr for EventEnvelope<E> {
}
}

impl<E> WithTags for EventEnvelope<E> {
fn tags(&self) -> &[akka_persistence_rs::Tag] {
&self.tags
}
}

impl<E> WithTimestampOffset for EventEnvelope<E> {
fn timestamp_offset(&self) -> TimestampOffset {
TimestampOffset {
Expand All @@ -84,6 +74,9 @@ pub trait CommitLogMarshaler<E>
where
for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait,
{
/// Declares the entity type to the marshaler.
fn entity_type(&self) -> EntityType;

/// Provide a key we can use for the purposes of log compaction.
/// A key would generally comprise and event type value held in
/// the high bits, and the entity id in the lower bits.
Expand Down Expand Up @@ -159,8 +152,13 @@ where
}
});
seq_nr.and_then(|seq_nr| {
record.timestamp.map(|timestamp| {
EventEnvelope::new(entity_id, seq_nr, timestamp, event, record.offset)
record.timestamp.map(|timestamp| EventEnvelope {
persistence_id: PersistenceId::new(self.entity_type(), entity_id),
seq_nr,
timestamp,
event,
offset: record.offset,
tags: vec![],
})
})
})
Expand Down Expand Up @@ -291,7 +289,7 @@ where
marshaler.envelope(record_entity_id, consumer_record).await
{
yield EntityManagerEventEnvelope::new(
envelope.entity_id,
envelope.persistence_id.entity_id,
envelope.seq_nr,
envelope.timestamp,
envelope.event,
Expand Down Expand Up @@ -328,7 +326,7 @@ where
marshaler.envelope(record_entity_id, consumer_record).await
{
yield EntityManagerEventEnvelope::new(
envelope.entity_id,
envelope.persistence_id.entity_id,
envelope.seq_nr,
envelope.timestamp,
envelope.event,
Expand Down Expand Up @@ -476,6 +474,10 @@ mod tests {

#[async_trait]
impl CommitLogMarshaler<MyEvent> for MyEventMarshaler {
fn entity_type(&self) -> EntityType {
EntityType::from("some-entity-type")
}

fn to_compaction_key(&self, _entity_id: &EntityId, _event: &MyEvent) -> Option<Key> {
panic!("should not be called")
}
Expand All @@ -496,11 +498,12 @@ mod tests {
let value = String::from_utf8(record.value).ok()?;
let event = MyEvent { value };
record.timestamp.map(|timestamp| EventEnvelope {
entity_id,
persistence_id: PersistenceId::new(self.entity_type(), entity_id),
seq_nr: 1,
timestamp,
event,
offset: 0,
tags: vec![],
})
}

Expand Down
21 changes: 15 additions & 6 deletions akka-persistence-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,29 @@ use std::{

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use smol_str::SmolStr;

pub mod effect;
pub mod entity;
pub mod entity_manager;

/// Uniquely identifies the type of an Entity.
pub type EntityType = smol_str::SmolStr;
pub type EntityType = SmolStr;

/// Uniquely identifies an entity, or entity instance.
pub type EntityId = smol_str::SmolStr;
pub type EntityId = SmolStr;

/// Implemented by structures that can return an entity id.
pub trait WithEntityId {
fn entity_id(&self) -> EntityId;
/// Tags annotate an entity's events
pub type Tag = SmolStr;

/// Implemented by structures that can return a persistence id.
pub trait WithPersistenceId {
fn persistence_id(&self) -> &PersistenceId;
}

/// Implemented by structures that can return tags.
pub trait WithTags {
fn tags(&self) -> &[Tag];
}

/// A slice is deterministically defined based on the persistence id.
Expand Down Expand Up @@ -63,7 +72,7 @@ fn jdk_string_hashcode(s: &str) -> i32 {
}

/// A namespaced entity id given an entity type.
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq, Hash)]
#[derive(Clone, Debug, Deserialize, PartialOrd, Ord, Serialize, PartialEq, Eq, Hash)]
pub struct PersistenceId {
pub entity_type: EntityType,
pub entity_id: EntityId,
Expand Down
29 changes: 12 additions & 17 deletions akka-projection-rs-commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub mod offset_store;

use std::{future::Future, marker::PhantomData, ops::Range, pin::Pin};

use akka_persistence_rs::{EntityType, Offset, PersistenceId};
use akka_persistence_rs::{Offset, PersistenceId};
use akka_persistence_rs_commitlog::{CommitLogMarshaler, EventEnvelope};
use akka_projection_rs::SourceProvider;
use async_stream::stream;
Expand All @@ -17,7 +17,6 @@ use tokio_stream::{Stream, StreamExt};
pub struct CommitLogSourceProvider<CL, E, M> {
commit_log: CL,
consumer_group_name: String,
entity_type: EntityType,
marshaler: M,
slice_range: Range<u32>,
topic: Topic,
Expand All @@ -30,13 +29,7 @@ where
M: CommitLogMarshaler<E> + Sync,
for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait,
{
pub fn new(
commit_log: CL,
marshaler: M,
consumer_group_name: &str,
topic: Topic,
entity_type: EntityType,
) -> Self {
pub fn new(commit_log: CL, marshaler: M, consumer_group_name: &str, topic: Topic) -> Self {
// When it comes to having a projection sourced from a local
// commit log, there's little benefit if having many of them.
// We therefore manage all slices from just one projection.
Expand All @@ -47,7 +40,6 @@ where
marshaler,
consumer_group_name,
topic,
entity_type,
slice_range.get(0).cloned().unwrap(),
)
}
Expand All @@ -57,7 +49,6 @@ where
marshaler: M,
consumer_group_name: &str,
topic: Topic,
entity_type: EntityType,
slice_range: Range<u32>,
) -> Self {
Self {
Expand All @@ -66,7 +57,6 @@ where
marshaler,
slice_range,
topic,
entity_type,
phantom: PhantomData,
}
}
Expand Down Expand Up @@ -102,7 +92,7 @@ where
while let Some(consumer_record) = records.next().await {
if let Some(record_entity_id) = marshaler.to_entity_id(&consumer_record) {
let persistence_id =
PersistenceId::new(self.entity_type.clone(), record_entity_id);
PersistenceId::new(marshaler.entity_type(), record_entity_id);
if self.slice_range.contains(&persistence_id.slice()) {
if let Some(envelope) = marshaler
.envelope(persistence_id.entity_id, consumer_record)
Expand Down Expand Up @@ -144,7 +134,7 @@ mod tests {
use std::{env, fs};

use super::*;
use akka_persistence_rs::EntityId;
use akka_persistence_rs::{EntityId, EntityType};
use chrono::{DateTime, Utc};
use serde::Deserialize;
use streambed::commit_log::{ConsumerRecord, Header, Key, ProducerRecord};
Expand Down Expand Up @@ -173,6 +163,10 @@ mod tests {

#[async_trait]
impl CommitLogMarshaler<MyEvent> for MyEventMarshaler {
fn entity_type(&self) -> EntityType {
EntityType::from("some-topic")
}

fn to_compaction_key(&self, _entity_id: &EntityId, _event: &MyEvent) -> Option<Key> {
panic!("should not be called")
}
Expand All @@ -193,11 +187,12 @@ mod tests {
let value = String::from_utf8(record.value).ok()?;
let event = MyEvent { value };
record.timestamp.map(|timestamp| EventEnvelope {
entity_id,
persistence_id: PersistenceId::new(self.entity_type(), entity_id),
seq_nr: 1,
timestamp,
event,
offset: 0,
tags: vec![],
})
}

Expand Down Expand Up @@ -239,6 +234,7 @@ mod tests {

let entity_type = EntityType::from("some-topic");
let entity_id = EntityId::from("some-entity");
let persistence_id = PersistenceId::new(entity_type.clone(), entity_id.clone());
let topic = Topic::from("some-topic");
let event_value = "some value".to_string();

Expand All @@ -263,13 +259,12 @@ mod tests {
MyEventMarshaler,
"some-consumer",
topic,
entity_type,
);

let mut envelopes = source_provider.source(|| async { None }).await;
let envelope = envelopes.next().await.unwrap();

assert_eq!(envelope.entity_id, entity_id,);
assert_eq!(envelope.persistence_id, persistence_id);
assert_eq!(envelope.event, MyEvent { value: event_value },);
}
}
16 changes: 13 additions & 3 deletions akka-projection-rs-commitlog/src/offset_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use std::num::NonZeroUsize;

use akka_persistence_rs::{entity_manager, EntityId, Message};
use akka_persistence_rs::{entity_manager, EntityId, EntityType, Message, PersistenceId};
use akka_persistence_rs_commitlog::{CommitLogMarshaler, CommitLogTopicAdapter, EventEnvelope};
use akka_projection_rs::offset_store;
use async_trait::async_trait;
Expand All @@ -13,6 +13,7 @@ use streambed_logged::{compaction::KeyBasedRetention, FileLog};
use tokio::sync::mpsc;

struct OffsetStoreEventMarshaler<F> {
entity_type: EntityType,
to_compaction_key: F,
}

Expand All @@ -21,6 +22,10 @@ impl<F> CommitLogMarshaler<offset_store::Event> for OffsetStoreEventMarshaler<F>
where
F: Fn(&EntityId, &offset_store::Event) -> Option<Key> + Send + Sync,
{
fn entity_type(&self) -> EntityType {
self.entity_type.clone()
}

fn to_compaction_key(&self, entity_id: &EntityId, event: &offset_store::Event) -> Option<Key> {
(self.to_compaction_key)(entity_id, event)
}
Expand All @@ -41,11 +46,12 @@ where
let value = u64::from_be_bytes(record.value.try_into().ok()?);
let event = offset_store::Event::Saved { seq_nr: value };
record.timestamp.map(|timestamp| EventEnvelope {
entity_id,
persistence_id: PersistenceId::new(self.entity_type(), entity_id),
seq_nr: 0, // We don't care about sequence numbers with the offset store as they won't be projected anywhere
timestamp,
event,
offset: record.offset,
tags: vec![],
})
}

Expand Down Expand Up @@ -85,6 +91,7 @@ pub async fn run(
offset_store_receiver: mpsc::Receiver<Message<offset_store::Command>>,
to_compaction_key: impl Fn(&EntityId, &offset_store::Event) -> Option<Key> + Send + Sync + 'static,
) {
let events_entity_type = EntityType::from(offset_store_id.clone());
let events_topic = Topic::from(offset_store_id.clone());

commit_log
Expand All @@ -94,7 +101,10 @@ pub async fn run(

let file_log_topic_adapter = CommitLogTopicAdapter::new(
commit_log,
OffsetStoreEventMarshaler { to_compaction_key },
OffsetStoreEventMarshaler {
entity_type: events_entity_type,
to_compaction_key,
},
&offset_store_id,
events_topic,
);
Expand Down
1 change: 1 addition & 0 deletions akka-projection-rs-grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ futures = { workspace = true }
log = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
regex = { workspace = true }
smol_str = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion akka-projection-rs-grpc/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ mod tests {
let (consumer_filters, consumer_filters_receiver) =
watch::channel(vec![FilterCriteria::IncludeEntityIds {
entity_id_offsets: vec![EntityIdOffset {
entity_id: entity_id.clone(),
entity_id: persistence_id.entity_id.clone(),
seq_nr: 0,
}],
}]);
Expand Down
Loading

0 comments on commit f12f526

Please sign in to comment.