Skip to content

Commit

Permalink
Merge pull request #10 from lightbend/local-projection-impl
Browse files Browse the repository at this point in the history
Implement local projections
  • Loading branch information
huntc authored Aug 24, 2023
2 parents 2ab7d96 + 29246a4 commit 5fe9242
Show file tree
Hide file tree
Showing 15 changed files with 367 additions and 173 deletions.
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ rand = "0.8"
scopeguard = "1.1"
serde = "1.0.151"
smol_str = "0.2.0"
streambed = { git = "https://github.com/streambed/streambed-rs.git", rev = "4b7e1561666bc1860339d27f5abf9be246ef5ad9" }
streambed-confidant = { git = "https://github.com/streambed/streambed-rs.git", rev = "4b7e1561666bc1860339d27f5abf9be246ef5ad9" }
streambed-logged = { git = "https://github.com/streambed/streambed-rs.git", rev = "4b7e1561666bc1860339d27f5abf9be246ef5ad9" }
streambed = { git = "https://github.com/streambed/streambed-rs.git", rev = "d343171f7ec85cc028674115c8df30150b741ef6" }
streambed-confidant = { git = "https://github.com/streambed/streambed-rs.git", rev = "d343171f7ec85cc028674115c8df30150b741ef6" }
streambed-logged = { git = "https://github.com/streambed/streambed-rs.git", rev = "d343171f7ec85cc028674115c8df30150b741ef6" }
streambed-storage = { git = "https://github.com/streambed/streambed-rs.git", rev = "d343171f7ec85cc028674115c8df30150b741ef6" }
test-log = "0.2.11"
tokio = "1.23.0"
tokio-stream = "0.1.14"
Expand Down
33 changes: 22 additions & 11 deletions akka-persistence-rs-commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@

use akka_persistence_rs::{
entity_manager::{EventEnvelope as EntityManagerEventEnvelope, Handler, SourceProvider},
EntityId,
EntityId, Offset, WithOffset,
};
use async_stream::stream;
use async_trait::async_trait;
use serde::{de::DeserializeOwned, Serialize};
use std::{io, marker::PhantomData, pin::Pin, sync::Arc};
use streambed::{
commit_log::{
CommitLog, ConsumerRecord, Key, Offset, ProducerRecord, Subscription, Topic, TopicRef,
CommitLog, ConsumerRecord, Key, Offset as CommitLogOffset, ProducerRecord, Subscription,
Topic,
},
secret_store::SecretStore,
};
Expand All @@ -21,11 +22,11 @@ use tokio_stream::{Stream, StreamExt};
pub struct EventEnvelope<E> {
pub entity_id: EntityId,
pub event: E,
pub offset: Offset,
pub offset: CommitLogOffset,
}

impl<E> EventEnvelope<E> {
pub fn new<EI>(entity_id: EI, event: E, offset: Offset) -> Self
pub fn new<EI>(entity_id: EI, event: E, offset: CommitLogOffset) -> Self
where
EI: Into<EntityId>,
{
Expand All @@ -37,6 +38,12 @@ impl<E> EventEnvelope<E> {
}
}

impl<E> WithOffset for EventEnvelope<E> {
fn offset(&self) -> Offset {
Offset::Sequence(self.offset)
}
}

/// Provides the ability to transform the the memory representation of Akka Persistence events from
/// and to the records that a CommitLog expects. Given the "cbor" feature, we use CBOR for serialization.
/// Encryption/decryption to commit log records is also applied. Therefore a secret store is expected.
Expand Down Expand Up @@ -155,12 +162,12 @@ where
M: CommitLogEventEnvelopeMarshaler<E>,
for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait,
{
pub fn new(commit_log: CL, marshaler: M, consumer_group_name: &str, topic: TopicRef) -> Self {
pub fn new(commit_log: CL, marshaler: M, consumer_group_name: &str, topic: Topic) -> Self {
Self {
commit_log,
consumer_group_name: consumer_group_name.into(),
marshaler,
topic: topic.into(),
topic,
phantom: PhantomData,
}
}
Expand Down Expand Up @@ -466,7 +473,7 @@ mod tests {
event: MyEvent,
) -> Option<ProducerRecord> {
let headers = vec![Header {
key: "entity-id".to_string(),
key: Topic::from("entity-id"),
value: entity_id.as_bytes().into(),
}];
Some(ProducerRecord {
Expand Down Expand Up @@ -496,7 +503,7 @@ mod tests {
commit_log.clone(),
marshaler,
"some-consumer",
"some-topic",
Topic::from("some-topic"),
);

// Scaffolding
Expand Down Expand Up @@ -552,7 +559,7 @@ mod tests {

for _ in 0..10 {
let last_offset = commit_log
.offsets("some-topic".to_string(), 0)
.offsets(Topic::from("some-topic"), 0)
.await
.map(|lo| lo.end_offset);
if last_offset == Some(3) {
Expand Down Expand Up @@ -582,8 +589,12 @@ mod tests {

let marshaler = MyEventMarshaler;

let file_log_topic_adapter =
CommitLogTopicAdapter::new(commit_log, marshaler, "some-consumer", "some-topic");
let file_log_topic_adapter = CommitLogTopicAdapter::new(
commit_log,
marshaler,
"some-consumer",
Topic::from("some-topic"),
);

let my_behavior = MyBehavior;

Expand Down
105 changes: 59 additions & 46 deletions akka-persistence-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,43 @@ pub type EntityType = smol_str::SmolStr;
/// Uniquely identifies an entity, or entity instance.
pub type EntityId = smol_str::SmolStr;

/// A slice is deterministically defined based on the persistence id.
/// `NUMBER_OF_SLICES` is not configurable because changing the value would result in
/// different slice for a persistence id than what was used before, which would
/// result in invalid events_by_slices call on a source provider.
pub const NUMBER_OF_SLICES: u32 = 1024;

/// Split the total number of slices into ranges by the given `number_of_ranges`.
/// For example, `NUMBER_OF_SLICES` is 1024 and given 4 `number_of_ranges` this method will
/// return ranges (0 to 255), (256 to 511), (512 to 767) and (768 to 1023).
pub fn slice_ranges(number_of_ranges: u32) -> Vec<Range<u32>> {
let range_size = NUMBER_OF_SLICES / number_of_ranges;
assert!(
number_of_ranges * range_size == NUMBER_OF_SLICES,
"number_of_ranges must be a whole number divisor of numberOfSlices."
);
let mut ranges = Vec::with_capacity(number_of_ranges as usize);
for i in 0..number_of_ranges {
ranges.push(i * range_size..i * range_size + range_size)
}
ranges
}

// Implementation of the JDK8 string hashcode:
// https://docs.oracle.com/javase/8/docs/api/java/lang/String.html#hashCode
fn jdk_string_hashcode(s: &str) -> i32 {
let mut hash = Wrapping(0i32);
const MULTIPLIER: Wrapping<i32> = Wrapping(31);
let count = s.len();
if count > 0 {
let mut chars = s.chars();
for _ in 0..count {
hash = hash * MULTIPLIER + Wrapping(chars.next().unwrap() as i32);
}
}
hash.0
}

/// A namespaced entity id given an entity type.
pub struct PersistenceId {
pub entity_type: EntityType,
Expand All @@ -29,6 +66,10 @@ impl PersistenceId {
entity_id,
}
}

pub fn slice(&self) -> u32 {
(jdk_string_hashcode(&self.to_string()) % NUMBER_OF_SLICES as i32).unsigned_abs()
}
}

impl Display for PersistenceId {
Expand Down Expand Up @@ -58,54 +99,25 @@ impl<C> Message<C> {
}
}

/// A slice is deterministically defined based on the persistence id.
/// `numberOfSlices` is not configurable because changing the value would result in
/// different slice for a persistence id than what was used before, which would
/// result in invalid events_by_slices call on a source provider.
pub const NUMBER_OF_SLICES: u32 = 1024;

/// A slice is deterministically defined based on the persistence id. The purpose is to
/// evenly distribute all persistence ids over the slices and be able to query the
/// events for a range of slices.
pub fn slice_for_persistence_id(persistence_id: &PersistenceId) -> u32 {
(jdk_string_hashcode(&persistence_id.to_string()) % NUMBER_OF_SLICES as i32).unsigned_abs()
}

/// Split the total number of slices into ranges by the given `number_of_ranges`.
/// For example, `NUMBER_OF_SLICES` is 1024 and given 4 `number_of_ranges` this method will
/// return ranges (0 to 255), (256 to 511), (512 to 767) and (768 to 1023).
pub fn slice_ranges(number_of_ranges: u32) -> Vec<Range<u32>> {
let range_size = NUMBER_OF_SLICES / number_of_ranges;
assert!(
number_of_ranges * range_size == NUMBER_OF_SLICES,
"number_of_ranges must be a whole number divisor of numberOfSlices."
);
let mut ranges = Vec::with_capacity(number_of_ranges as usize);
for i in 0..number_of_ranges {
ranges.push(i * range_size..i * range_size + range_size)
}
ranges
pub enum Offset {
/// Corresponds to an ordered sequence number for the events. Note that the corresponding
/// offset of each event is provided in an Envelope,
/// which makes it possible to resume the stream at a later point from a given offset.
///
/// The `offset` is exclusive, i.e. the event with the exact same sequence number will not be included
/// in the returned stream. This means that you can use the offset that is returned in an `Envelope`
/// as the `offset` parameter in a subsequent query.
///
Sequence(u64),
}

// Implementation of the JDK8 string hashcode:
// https://docs.oracle.com/javase/8/docs/api/java/lang/String.html#hashCode
fn jdk_string_hashcode(s: &str) -> i32 {
let mut hash = Wrapping(0i32);
const MULTIPLIER: Wrapping<i32> = Wrapping(31);
let count = s.len();
if count > 0 {
let mut chars = s.chars();
for _ in 0..count {
hash = hash * MULTIPLIER + Wrapping(chars.next().unwrap() as i32);
}
}
hash.0
/// Implemented by structures that can return an offset.
pub trait WithOffset {
fn offset(&self) -> Offset;
}

#[cfg(test)]
mod tests {
use smol_str::SmolStr;

use super::*;

#[test]
Expand All @@ -118,10 +130,11 @@ mod tests {
#[test]
fn test_slice_for_persistence_id() {
assert_eq!(
slice_for_persistence_id(&PersistenceId::new(
SmolStr::from("some-entity-type"),
SmolStr::from("some-entity-id")
)),
PersistenceId::new(
EntityType::from("some-entity-type"),
EntityId::from("some-entity-id")
)
.slice(),
451
);
}
Expand Down
2 changes: 2 additions & 0 deletions akka-projection-rs-commitlog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ version = "0.1.0"
edition = "2021"

[dependencies]
async-stream = { workspace = true }
async-trait = { workspace = true }
serde = { workspace = true }
streambed = { workspace = true }
tokio-stream = { workspace = true }
Expand Down
Loading

0 comments on commit 5fe9242

Please sign in to comment.