From 385b28ce5927a284456a2bdd76d6172ced52cd6b Mon Sep 17 00:00:00 2001 From: huntc Date: Thu, 21 Sep 2023 07:10:27 +1000 Subject: [PATCH] The dev declares how entity ids become keys as per elsewhere --- akka-persistence-rs-commitlog/src/lib.rs | 14 ++++++------- akka-projection-rs-commitlog/src/lib.rs | 6 +++--- .../src/offset_store.rs | 21 ++++++++++++------- .../src/registration_projection.rs | 1 + examples/iot-service/src/temperature.rs | 4 ++-- 5 files changed, 27 insertions(+), 19 deletions(-) diff --git a/akka-persistence-rs-commitlog/src/lib.rs b/akka-persistence-rs-commitlog/src/lib.rs index 462c5a1..86236d4 100644 --- a/akka-persistence-rs-commitlog/src/lib.rs +++ b/akka-persistence-rs-commitlog/src/lib.rs @@ -87,10 +87,10 @@ where /// Provide a key we can use for the purposes of log compaction. /// A key would generally comprise and event type value held in /// the high bits, and the entity id in the lower bits. - fn to_compaction_key(entity_id: &EntityId, event: &E) -> Option; + fn to_compaction_key(&self, entity_id: &EntityId, event: &E) -> Option; /// Extract an entity id from a consumer envelope. - fn to_entity_id(record: &ConsumerRecord) -> Option; + fn to_entity_id(&self, record: &ConsumerRecord) -> Option; /// Produce an event envelope from a consumer record. async fn envelope( @@ -184,7 +184,7 @@ where ) -> Option { use streambed::commit_log::{Header, HeaderKey}; - let key = Self::to_compaction_key(&entity_id, &event)?; + let key = self.to_compaction_key(&entity_id, &event)?; let buf = streambed::encrypt_struct( self.secret_store(), &self.secret_path(&entity_id), @@ -286,7 +286,7 @@ where if let Ok(mut consumer_records) = consumer_records { Ok(Box::pin(stream!({ while let Some(consumer_record) = consumer_records.next().await { - if let Some(record_entity_id) = M::to_entity_id(&consumer_record) { + if let Some(record_entity_id) = marshaler.to_entity_id(&consumer_record) { if let Some(envelope) = marshaler.envelope(record_entity_id, consumer_record).await { @@ -322,7 +322,7 @@ where if let Ok(mut consumer_records) = consumer_records { Ok(Box::pin(stream!({ while let Some(consumer_record) = consumer_records.next().await { - if let Some(record_entity_id) = M::to_entity_id(&consumer_record) { + if let Some(record_entity_id) = marshaler.to_entity_id(&consumer_record) { if &record_entity_id == entity_id { if let Some(envelope) = marshaler.envelope(record_entity_id, consumer_record).await @@ -476,11 +476,11 @@ mod tests { #[async_trait] impl CommitLogMarshaler for MyEventMarshaler { - fn to_compaction_key(_entity_id: &EntityId, _event: &MyEvent) -> Option { + fn to_compaction_key(&self, _entity_id: &EntityId, _event: &MyEvent) -> Option { panic!("should not be called") } - fn to_entity_id(record: &ConsumerRecord) -> Option { + fn to_entity_id(&self, record: &ConsumerRecord) -> Option { let Header { value, .. } = record .headers .iter() diff --git a/akka-projection-rs-commitlog/src/lib.rs b/akka-projection-rs-commitlog/src/lib.rs index 524fb98..7b5dd33 100644 --- a/akka-projection-rs-commitlog/src/lib.rs +++ b/akka-projection-rs-commitlog/src/lib.rs @@ -100,7 +100,7 @@ where Box::pin(stream!({ while let Some(consumer_record) = records.next().await { - if let Some(record_entity_id) = M::to_entity_id(&consumer_record) { + if let Some(record_entity_id) = marshaler.to_entity_id(&consumer_record) { let persistence_id = PersistenceId::new(self.entity_type.clone(), record_entity_id); if self.slice_range.contains(&persistence_id.slice()) { @@ -173,11 +173,11 @@ mod tests { #[async_trait] impl CommitLogMarshaler for MyEventMarshaler { - fn to_compaction_key(_entity_id: &EntityId, _event: &MyEvent) -> Option { + fn to_compaction_key(&self, _entity_id: &EntityId, _event: &MyEvent) -> Option { panic!("should not be called") } - fn to_entity_id(record: &ConsumerRecord) -> Option { + fn to_entity_id(&self, record: &ConsumerRecord) -> Option { let Header { value, .. } = record .headers .iter() diff --git a/akka-projection-rs-commitlog/src/offset_store.rs b/akka-projection-rs-commitlog/src/offset_store.rs index 9e9e290..bb6bdec 100644 --- a/akka-projection-rs-commitlog/src/offset_store.rs +++ b/akka-projection-rs-commitlog/src/offset_store.rs @@ -10,15 +10,20 @@ use streambed::commit_log::{ConsumerRecord, Header, HeaderKey, Key, ProducerReco use streambed_logged::{compaction::KeyBasedRetention, FileLog}; use tokio::sync::mpsc; -struct OffsetStoreEventMarshaler; +struct OffsetStoreEventMarshaler { + to_compaction_key: F, +} #[async_trait] -impl CommitLogMarshaler for OffsetStoreEventMarshaler { - fn to_compaction_key(_entity_id: &EntityId, _event: &offset_store::Event) -> Option { - None // FIXME +impl CommitLogMarshaler for OffsetStoreEventMarshaler +where + F: Fn(&EntityId, &offset_store::Event) -> Option + Send + Sync, +{ + fn to_compaction_key(&self, entity_id: &EntityId, event: &offset_store::Event) -> Option { + (self.to_compaction_key)(entity_id, event) } - fn to_entity_id(record: &ConsumerRecord) -> Option { + fn to_entity_id(&self, record: &ConsumerRecord) -> Option { let Header { value, .. } = record .headers .iter() @@ -55,11 +60,12 @@ impl CommitLogMarshaler for OffsetStoreEventMarshaler { key: HeaderKey::from("entity-id"), value: entity_id.as_bytes().into(), }]; + let key = self.to_compaction_key(&entity_id, &event)?; Some(ProducerRecord { topic, headers, timestamp: Some(timestamp), - key: 0, + key, value: seq_nr.to_be_bytes().to_vec(), partition: 0, }) @@ -75,6 +81,7 @@ pub async fn run( keys_expected: usize, offset_store_id: OffsetStoreId, offset_store_receiver: mpsc::Receiver>, + to_compaction_key: impl Fn(&EntityId, &offset_store::Event) -> Option + Send + Sync + 'static, ) { let events_topic = Topic::from(offset_store_id.clone()); @@ -85,7 +92,7 @@ pub async fn run( let file_log_topic_adapter = CommitLogTopicAdapter::new( commit_log, - OffsetStoreEventMarshaler, + OffsetStoreEventMarshaler { to_compaction_key }, &offset_store_id, events_topic, ); diff --git a/examples/iot-service/src/registration_projection.rs b/examples/iot-service/src/registration_projection.rs index 5dd751b..2af3343 100644 --- a/examples/iot-service/src/registration_projection.rs +++ b/examples/iot-service/src/registration_projection.rs @@ -40,6 +40,7 @@ pub async fn task( EXPECTED_DISTINCT_REGISTRATIONS, offset_store_id, offset_store_receiver, + |_, _| None, // FIXME ) .await }); diff --git a/examples/iot-service/src/temperature.rs b/examples/iot-service/src/temperature.rs index ded9d95..aa193f1 100644 --- a/examples/iot-service/src/temperature.rs +++ b/examples/iot-service/src/temperature.rs @@ -117,7 +117,7 @@ const EVENT_ID_BIT_MASK: u64 = 0xFFFFFFFF; #[async_trait] impl CommitLogMarshaler for EventEnvelopeMarshaler { - fn to_compaction_key(entity_id: &EntityId, event: &Event) -> Option { + fn to_compaction_key(&self, entity_id: &EntityId, event: &Event) -> Option { let record_type = match event { Event::TemperatureRead { .. } => Some(0), Event::Registered { .. } => Some(1), @@ -128,7 +128,7 @@ impl CommitLogMarshaler for EventEnvelopeMarshaler { }) } - fn to_entity_id(record: &streambed::commit_log::ConsumerRecord) -> Option { + fn to_entity_id(&self, record: &streambed::commit_log::ConsumerRecord) -> Option { let entity_id = (record.key & EVENT_ID_BIT_MASK) as u32; let mut buffer = itoa::Buffer::new(); Some(EntityId::from(buffer.format(entity_id)))