Skip to content

Commit

Permalink
The dev declares how entity ids become keys as per elsewhere
Browse files Browse the repository at this point in the history
  • Loading branch information
huntc committed Sep 20, 2023
1 parent fa0eceb commit 385b28c
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 19 deletions.
14 changes: 7 additions & 7 deletions akka-persistence-rs-commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ where
/// Provide a key we can use for the purposes of log compaction.
/// A key would generally comprise and event type value held in
/// the high bits, and the entity id in the lower bits.
fn to_compaction_key(entity_id: &EntityId, event: &E) -> Option<Key>;
fn to_compaction_key(&self, entity_id: &EntityId, event: &E) -> Option<Key>;

/// Extract an entity id from a consumer envelope.
fn to_entity_id(record: &ConsumerRecord) -> Option<EntityId>;
fn to_entity_id(&self, record: &ConsumerRecord) -> Option<EntityId>;

/// Produce an event envelope from a consumer record.
async fn envelope(
Expand Down Expand Up @@ -184,7 +184,7 @@ where
) -> Option<ProducerRecord> {
use streambed::commit_log::{Header, HeaderKey};

let key = Self::to_compaction_key(&entity_id, &event)?;
let key = self.to_compaction_key(&entity_id, &event)?;
let buf = streambed::encrypt_struct(
self.secret_store(),
&self.secret_path(&entity_id),
Expand Down Expand Up @@ -286,7 +286,7 @@ where
if let Ok(mut consumer_records) = consumer_records {
Ok(Box::pin(stream!({
while let Some(consumer_record) = consumer_records.next().await {
if let Some(record_entity_id) = M::to_entity_id(&consumer_record) {
if let Some(record_entity_id) = marshaler.to_entity_id(&consumer_record) {
if let Some(envelope) =
marshaler.envelope(record_entity_id, consumer_record).await
{
Expand Down Expand Up @@ -322,7 +322,7 @@ where
if let Ok(mut consumer_records) = consumer_records {
Ok(Box::pin(stream!({
while let Some(consumer_record) = consumer_records.next().await {
if let Some(record_entity_id) = M::to_entity_id(&consumer_record) {
if let Some(record_entity_id) = marshaler.to_entity_id(&consumer_record) {
if &record_entity_id == entity_id {
if let Some(envelope) =
marshaler.envelope(record_entity_id, consumer_record).await
Expand Down Expand Up @@ -476,11 +476,11 @@ mod tests {

#[async_trait]
impl CommitLogMarshaler<MyEvent> for MyEventMarshaler {
fn to_compaction_key(_entity_id: &EntityId, _event: &MyEvent) -> Option<Key> {
fn to_compaction_key(&self, _entity_id: &EntityId, _event: &MyEvent) -> Option<Key> {
panic!("should not be called")
}

fn to_entity_id(record: &ConsumerRecord) -> Option<EntityId> {
fn to_entity_id(&self, record: &ConsumerRecord) -> Option<EntityId> {
let Header { value, .. } = record
.headers
.iter()
Expand Down
6 changes: 3 additions & 3 deletions akka-projection-rs-commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ where

Box::pin(stream!({
while let Some(consumer_record) = records.next().await {
if let Some(record_entity_id) = M::to_entity_id(&consumer_record) {
if let Some(record_entity_id) = marshaler.to_entity_id(&consumer_record) {
let persistence_id =
PersistenceId::new(self.entity_type.clone(), record_entity_id);
if self.slice_range.contains(&persistence_id.slice()) {
Expand Down Expand Up @@ -173,11 +173,11 @@ mod tests {

#[async_trait]
impl CommitLogMarshaler<MyEvent> for MyEventMarshaler {
fn to_compaction_key(_entity_id: &EntityId, _event: &MyEvent) -> Option<Key> {
fn to_compaction_key(&self, _entity_id: &EntityId, _event: &MyEvent) -> Option<Key> {
panic!("should not be called")
}

fn to_entity_id(record: &ConsumerRecord) -> Option<EntityId> {
fn to_entity_id(&self, record: &ConsumerRecord) -> Option<EntityId> {
let Header { value, .. } = record
.headers
.iter()
Expand Down
21 changes: 14 additions & 7 deletions akka-projection-rs-commitlog/src/offset_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,20 @@ use streambed::commit_log::{ConsumerRecord, Header, HeaderKey, Key, ProducerReco
use streambed_logged::{compaction::KeyBasedRetention, FileLog};
use tokio::sync::mpsc;

struct OffsetStoreEventMarshaler;
struct OffsetStoreEventMarshaler<F> {
to_compaction_key: F,
}

#[async_trait]
impl CommitLogMarshaler<offset_store::Event> for OffsetStoreEventMarshaler {
fn to_compaction_key(_entity_id: &EntityId, _event: &offset_store::Event) -> Option<Key> {
None // FIXME
impl<F> CommitLogMarshaler<offset_store::Event> for OffsetStoreEventMarshaler<F>
where
F: Fn(&EntityId, &offset_store::Event) -> Option<Key> + Send + Sync,
{
fn to_compaction_key(&self, entity_id: &EntityId, event: &offset_store::Event) -> Option<Key> {
(self.to_compaction_key)(entity_id, event)
}

fn to_entity_id(record: &ConsumerRecord) -> Option<EntityId> {
fn to_entity_id(&self, record: &ConsumerRecord) -> Option<EntityId> {
let Header { value, .. } = record
.headers
.iter()
Expand Down Expand Up @@ -55,11 +60,12 @@ impl CommitLogMarshaler<offset_store::Event> for OffsetStoreEventMarshaler {
key: HeaderKey::from("entity-id"),
value: entity_id.as_bytes().into(),
}];
let key = self.to_compaction_key(&entity_id, &event)?;
Some(ProducerRecord {
topic,
headers,
timestamp: Some(timestamp),
key: 0,
key,
value: seq_nr.to_be_bytes().to_vec(),
partition: 0,
})
Expand All @@ -75,6 +81,7 @@ pub async fn run(
keys_expected: usize,
offset_store_id: OffsetStoreId,
offset_store_receiver: mpsc::Receiver<Message<offset_store::Command>>,
to_compaction_key: impl Fn(&EntityId, &offset_store::Event) -> Option<Key> + Send + Sync + 'static,
) {
let events_topic = Topic::from(offset_store_id.clone());

Expand All @@ -85,7 +92,7 @@ pub async fn run(

let file_log_topic_adapter = CommitLogTopicAdapter::new(
commit_log,
OffsetStoreEventMarshaler,
OffsetStoreEventMarshaler { to_compaction_key },
&offset_store_id,
events_topic,
);
Expand Down
1 change: 1 addition & 0 deletions examples/iot-service/src/registration_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub async fn task(
EXPECTED_DISTINCT_REGISTRATIONS,
offset_store_id,
offset_store_receiver,
|_, _| None, // FIXME
)
.await
});
Expand Down
4 changes: 2 additions & 2 deletions examples/iot-service/src/temperature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ const EVENT_ID_BIT_MASK: u64 = 0xFFFFFFFF;

#[async_trait]
impl CommitLogMarshaler<Event> for EventEnvelopeMarshaler {
fn to_compaction_key(entity_id: &EntityId, event: &Event) -> Option<Key> {
fn to_compaction_key(&self, entity_id: &EntityId, event: &Event) -> Option<Key> {
let record_type = match event {
Event::TemperatureRead { .. } => Some(0),
Event::Registered { .. } => Some(1),
Expand All @@ -128,7 +128,7 @@ impl CommitLogMarshaler<Event> for EventEnvelopeMarshaler {
})
}

fn to_entity_id(record: &streambed::commit_log::ConsumerRecord) -> Option<EntityId> {
fn to_entity_id(&self, record: &streambed::commit_log::ConsumerRecord) -> Option<EntityId> {
let entity_id = (record.key & EVENT_ID_BIT_MASK) as u32;
let mut buffer = itoa::Buffer::new();
Some(EntityId::from(buffer.format(entity_id)))
Expand Down

0 comments on commit 385b28c

Please sign in to comment.