Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flesh out the slice API for the user code #9

Merged
merged 5 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 85 additions & 50 deletions akka-persistence-rs-commitlog/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,42 @@
#![doc = include_str!("../README.md")]

use akka_persistence_rs::{
entity_manager::{EventEnvelope, Handler, SourceProvider},
entity_manager::{EventEnvelope as EntityManagerEventEnvelope, Handler, SourceProvider},
EntityId,
};
use async_stream::stream;
use async_trait::async_trait;
use serde::{de::DeserializeOwned, Serialize};
use std::{io, marker::PhantomData, pin::Pin, sync::Arc};
use streambed::{
commit_log::{CommitLog, ConsumerRecord, Key, ProducerRecord, Subscription, Topic, TopicRef},
commit_log::{
CommitLog, ConsumerRecord, Key, Offset, ProducerRecord, Subscription, Topic, TopicRef,
},
secret_store::SecretStore,
};
use tokio_stream::{Stream, StreamExt};

/// An envelope wraps a commit log event associated with a specific entity.
#[derive(Clone, Debug, PartialEq)]
pub struct EventEnvelope<E> {
pub entity_id: EntityId,
pub event: E,
pub offset: Offset,
}

impl<E> EventEnvelope<E> {
pub fn new<EI>(entity_id: EI, event: E, offset: Offset) -> Self
where
EI: Into<EntityId>,
{
Self {
entity_id: entity_id.into(),
event,
offset,
}
}
}

/// Provides the ability to transform the the memory representation of Akka Persistence events from
/// and to the records that a CommitLog expects. Given the "cbor" feature, we use CBOR for serialization.
/// Encryption/decryption to commit log records is also applied. Therefore a secret store is expected.
Expand All @@ -27,7 +50,7 @@ where
/// Provide a key we can use for the purposes of log compaction.
/// A key would generally comprise and event type value held in
/// the high bits, and the entity id in the lower bits.
fn to_compaction_key(envelope: &EventEnvelope<E>) -> Option<Key>;
fn to_compaction_key(entity_id: &EntityId, event: &E) -> Option<Key>;

/// Extract an entity id from a consumer envelope.
fn to_entity_id(record: &ConsumerRecord) -> Option<EntityId>;
Expand All @@ -52,7 +75,7 @@ where
|value| ciborium::de::from_reader(value),
)
.await
.map(|event| EventEnvelope::new(entity_id, event))
.map(|event| EventEnvelope::new(entity_id, event, record.offset))
}

#[cfg(not(feature = "cbor"))]
Expand All @@ -66,39 +89,38 @@ where
async fn producer_record(
&self,
topic: Topic,
envelope: EventEnvelope<E>,
) -> Option<(ProducerRecord, EventEnvelope<E>)> {
let key = Self::to_compaction_key(&envelope)?;
entity_id: EntityId,
event: E,
) -> Option<ProducerRecord> {
let key = Self::to_compaction_key(&entity_id, &event)?;
let buf = streambed::encrypt_struct(
self.secret_store(),
&self.secret_path(&envelope.entity_id),
&self.secret_path(&entity_id),
|event| {
let mut buf = Vec::new();
ciborium::ser::into_writer(event, &mut buf).map(|_| buf)
},
rand::thread_rng,
&envelope.event,
&event,
)
.await?;
Some((
ProducerRecord {
topic,
headers: vec![],
timestamp: None,
key,
value: buf,
partition: 0,
},
envelope,
))
Some(ProducerRecord {
topic,
headers: vec![],
timestamp: None,
key,
value: buf,
partition: 0,
})
}

#[cfg(not(feature = "cbor"))]
async fn producer_record(
&self,
topic: Topic,
envelope: EventEnvelope<E>,
) -> Option<(ProducerRecord, EventEnvelope<E>)>;
entity_id: EntityId,
event: E,
) -> Option<ProducerRecord>;
}

/// Adapts a Streambed CommitLog for use with Akka Persistence.
Expand Down Expand Up @@ -153,7 +175,8 @@ where
{
async fn source_initial(
&mut self,
) -> io::Result<Pin<Box<dyn Stream<Item = EventEnvelope<E>> + Send + 'async_trait>>> {
) -> io::Result<Pin<Box<dyn Stream<Item = EntityManagerEventEnvelope<E>> + Send + 'async_trait>>>
{
let consumer_records = produce_to_last_offset(
&self.commit_log,
&self.consumer_group_name,
Expand All @@ -170,7 +193,10 @@ where
if let Some(envelope) =
marshaler.envelope(record_entity_id, consumer_record).await
{
yield envelope;
yield EntityManagerEventEnvelope::new(
envelope.entity_id,
envelope.event,
);
}
}
}
Expand All @@ -183,7 +209,8 @@ where
async fn source(
&mut self,
entity_id: &EntityId,
) -> io::Result<Pin<Box<dyn Stream<Item = EventEnvelope<E>> + Send + 'async_trait>>> {
) -> io::Result<Pin<Box<dyn Stream<Item = EntityManagerEventEnvelope<E>> + Send + 'async_trait>>>
{
let consumer_records = produce_to_last_offset(
&self.commit_log,
&self.consumer_group_name,
Expand All @@ -201,7 +228,10 @@ where
if let Some(envelope) =
marshaler.envelope(record_entity_id, consumer_record).await
{
yield envelope;
yield EntityManagerEventEnvelope::new(
envelope.entity_id,
envelope.event,
);
}
}
}
Expand Down Expand Up @@ -251,12 +281,19 @@ impl<CL, E, M> Handler<E> for CommitLogTopicAdapter<CL, E, M>
where
CL: CommitLog,
M: CommitLogEventEnvelopeMarshaler<E> + Send + Sync,
for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait,
for<'async_trait> E: Clone + DeserializeOwned + Serialize + Send + Sync + 'async_trait,
{
async fn process(&mut self, envelope: EventEnvelope<E>) -> io::Result<EventEnvelope<E>> {
let (producer_record, envelope) = self
async fn process(
&mut self,
envelope: EntityManagerEventEnvelope<E>,
) -> io::Result<EntityManagerEventEnvelope<E>> {
let producer_record = self
.marshaler
.producer_record(self.topic.clone(), envelope)
.producer_record(
self.topic.clone(),
envelope.entity_id.clone(),
envelope.event.clone(),
)
.await
.ok_or_else(|| {
io::Error::new(
Expand Down Expand Up @@ -294,7 +331,7 @@ mod tests {

// Scaffolding

#[derive(Deserialize, Serialize)]
#[derive(Clone, Deserialize, Serialize)]
struct MyEvent {
value: String,
}
Expand Down Expand Up @@ -388,7 +425,7 @@ mod tests {
impl CommitLogEventEnvelopeMarshaler<MyEvent> for MyEventMarshaler {
type SecretStore = NoopSecretStore;

fn to_compaction_key(_envelope: &EventEnvelope<MyEvent>) -> Option<Key> {
fn to_compaction_key(_entity_id: &EntityId, _event: &MyEvent) -> Option<Key> {
panic!("should not be called")
}

Expand Down Expand Up @@ -418,30 +455,28 @@ mod tests {
Some(EventEnvelope {
entity_id,
event,
deletion_event: false,
offset: 0,
})
}

async fn producer_record(
&self,
topic: Topic,
envelope: EventEnvelope<MyEvent>,
) -> Option<(ProducerRecord, EventEnvelope<MyEvent>)> {
entity_id: EntityId,
event: MyEvent,
) -> Option<ProducerRecord> {
let headers = vec![Header {
key: "entity-id".to_string(),
value: envelope.entity_id.as_bytes().into(),
value: entity_id.as_bytes().into(),
}];
Some((
ProducerRecord {
topic,
headers,
timestamp: None,
key: 0,
value: envelope.event.value.clone().into_bytes(),
partition: 0,
},
envelope,
))
Some(ProducerRecord {
topic,
headers,
timestamp: None,
key: 0,
value: event.value.clone().into_bytes(),
partition: 0,
})
}
}

Expand Down Expand Up @@ -478,7 +513,7 @@ mod tests {
// Process some events and then produce a stream.

let envelope = adapter
.process(EventEnvelope::new(
.process(EntityManagerEventEnvelope::new(
entity_id.clone(),
MyEvent {
value: "first-event".to_string(),
Expand All @@ -489,7 +524,7 @@ mod tests {
assert_eq!(envelope.entity_id, entity_id);

let envelope = adapter
.process(EventEnvelope::new(
.process(EntityManagerEventEnvelope::new(
entity_id.clone(),
MyEvent {
value: "second-event".to_string(),
Expand All @@ -502,7 +537,7 @@ mod tests {
// Produce to a different entity id, so that we can test out the filtering next.

adapter
.process(EventEnvelope::new(
.process(EntityManagerEventEnvelope::new(
"some-other-entity-id",
MyEvent {
value: "third-event".to_string(),
Expand Down
97 changes: 97 additions & 0 deletions akka-persistence-rs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
#![doc = include_str!("../README.md")]

use std::{
fmt::{self, Display, Write},
num::Wrapping,
ops::Range,
};

pub mod effect;
pub mod entity;
pub mod entity_manager;
Expand All @@ -16,6 +22,23 @@ pub struct PersistenceId {
pub entity_id: EntityId,
}

impl PersistenceId {
pub fn new(entity_type: EntityType, entity_id: EntityId) -> Self {
Self {
entity_type,
entity_id,
}
}
}

impl Display for PersistenceId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.entity_type)?;
f.write_char('|')?;
f.write_str(&self.entity_id)
}
}

/// A message encapsulates a command that is addressed to a specific entity.
#[derive(Debug, PartialEq)]
pub struct Message<C> {
Expand All @@ -34,3 +57,77 @@ impl<C> Message<C> {
}
}
}

/// A slice is deterministically defined based on the persistence id.
/// `numberOfSlices` is not configurable because changing the value would result in
/// different slice for a persistence id than what was used before, which would
/// result in invalid events_by_slices call on a source provider.
pub const NUMBER_OF_SLICES: u32 = 1024;

/// A slice is deterministically defined based on the persistence id. The purpose is to
/// evenly distribute all persistence ids over the slices and be able to query the
/// events for a range of slices.
pub fn slice_for_persistence_id(persistence_id: &PersistenceId) -> u32 {
(jdk_string_hashcode(&persistence_id.to_string()) % NUMBER_OF_SLICES as i32).unsigned_abs()
}

/// Split the total number of slices into ranges by the given `number_of_ranges`.
/// For example, `NUMBER_OF_SLICES` is 1024 and given 4 `number_of_ranges` this method will
/// return ranges (0 to 255), (256 to 511), (512 to 767) and (768 to 1023).
pub fn slice_ranges(number_of_ranges: u32) -> Vec<Range<u32>> {
let range_size = NUMBER_OF_SLICES / number_of_ranges;
assert!(
number_of_ranges * range_size == NUMBER_OF_SLICES,
"number_of_ranges must be a whole number divisor of numberOfSlices."
);
let mut ranges = Vec::with_capacity(number_of_ranges as usize);
for i in 0..number_of_ranges {
ranges.push(i * range_size..i * range_size + range_size)
}
ranges
}

// Implementation of the JDK8 string hashcode:
// https://docs.oracle.com/javase/8/docs/api/java/lang/String.html#hashCode
fn jdk_string_hashcode(s: &str) -> i32 {
let mut hash = Wrapping(0i32);
const MULTIPLIER: Wrapping<i32> = Wrapping(31);
let count = s.len();
if count > 0 {
let mut chars = s.chars();
for _ in 0..count {
hash = hash * MULTIPLIER + Wrapping(chars.next().unwrap() as i32);
}
}
hash.0
}

#[cfg(test)]
mod tests {
use smol_str::SmolStr;

use super::*;

#[test]
fn test_jdk_string_hashcode() {
assert_eq!(jdk_string_hashcode(""), 0);
assert_eq!(jdk_string_hashcode("howtodoinjava.com"), 1894145264);
assert_eq!(jdk_string_hashcode("hello world"), 1794106052);
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These were taken from a JDK example.


#[test]
fn test_slice_for_persistence_id() {
assert_eq!(
slice_for_persistence_id(&PersistenceId::new(
SmolStr::from("some-entity-type"),
SmolStr::from("some-entity-id")
)),
451
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have verified that it's the same slice in jvm

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent!

);
}

#[test]
fn test_slice_ranges() {
assert_eq!(slice_ranges(4), vec![0..256, 256..512, 512..768, 768..1024]);
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that Range is non-inclusive in terms of its max i.e. it is "upto". I could've used RangeInclusive instead, but then I'd have do an additional subtraction when constructing each range. :-)

}
Loading