Skip to content

Commit

Permalink
Declare the size of a numeric entity id
Browse files Browse the repository at this point in the history
Permits the number of bits to be used for describing an entity id to be declared explicitly, rather than assuming 32 bits. I found that storing MAC addresses with 48 bits is also useful, and still leaves plenty of bits for record types.

A const generic is used and so therefore there is no runtime penalty. I have included a runtime assertion to ensure that an entity id does not exceed the allocated number of bits.
  • Loading branch information
huntc authored and patriknw committed Feb 1, 2024
1 parent ad0a353 commit a516fe7
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 22 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Cargo.lock
target
.DS_Store
dist
dist
.idea
51 changes: 30 additions & 21 deletions akka-persistence-rs-commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,30 +487,35 @@ where
pub mod cbor {
use super::*;

pub struct Marshaller<E, F, SS> {
pub struct Marshaller<E, F, SS, const RTB: u64> {
pub entity_type: EntityType,
pub events_key_secret_path: Arc<str>,
pub to_record_type: F,
pub secret_store: SS,
phantom: PhantomData<E>,
}

// Our event keys will occupy the top 12 bits of the key, meaning
// that we can have 4K types of event. We use 32 of the bottom 52
// bits as the entity id. We choose 32 bits as this is a common size
// for identifiers transmitted from IoT sensors. These identifiers
// are also known as "device addresses" and represent an address
// which may, in turn, equate to a 64 bit address globally unique
// to a device. These globally unique addresses are not generally
// transmitted in order to conserve packet size.
const EVENT_TYPE_BIT_SHIFT: usize = 52;
const EVENT_ID_BIT_MASK: u64 = 0xFFFFFFFF;
impl<E, F, SS, const RTB: u64> Marshaller<E, F, SS, RTB> {
// Our event record types are recommended to occupy the top 12 bits of the key
// given a value for RTB of 12, meaning that we can have 4K types of event.
// 32 bits is a common size for identifying entities at the edge with respect to
// IoT sensors, and so given 52 bits remaining, there are plenty of bits available.
// These identifiers are also known as "device addresses" and represent an address
// which may, in turn, equate to a 64 bit address globally unique
// to a device. These globally unique addresses are not generally
// transmitted in order to conserve packet size.
//
// Alternatively, an entity id could represent a MAC address, which will
// occupy 48 of the 52 bits available given an RTB of 12.
const EVENT_TYPE_BIT_SHIFT: u64 = 64 - RTB;
const ENTITY_ID_BIT_MASK: u64 = !(0xFFFFFFFF_FFFFFFFF << Self::EVENT_TYPE_BIT_SHIFT);
}

#[async_trait]
impl<E, F, SS> CommitLogMarshaller<E> for Marshaller<E, F, SS>
impl<E, F, SS, const RTB: u64> CommitLogMarshaller<E> for Marshaller<E, F, SS, RTB>
where
for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait,
F: Fn(&E) -> u32 + Sync,
F: Fn(&E) -> u64 + Sync,
SS: SecretStore,
{
fn entity_type(&self) -> EntityType {
Expand All @@ -520,12 +525,16 @@ pub mod cbor {
fn to_compaction_key(&self, entity_id: &EntityId, event: &E) -> Key {
let record_type = (self.to_record_type)(event);
// It is an unrecoverable error if the entity id is non-numeric.
let entity_id = entity_id.parse::<u32>().unwrap();
(record_type as u64) << EVENT_TYPE_BIT_SHIFT | entity_id as u64
let entity_id = entity_id.parse::<u64>().unwrap();
assert!(
entity_id & !Self::EVENT_TYPE_BIT_SHIFT != 0,
"Entity id occupies too many bits."
);
record_type << Self::EVENT_TYPE_BIT_SHIFT | entity_id
}

fn to_entity_id(&self, record: &ConsumerRecord) -> Option<EntityId> {
let entity_id = (record.key & EVENT_ID_BIT_MASK) as u32;
let entity_id = record.key & Self::ENTITY_ID_BIT_MASK;
let mut buffer = itoa::Buffer::new();
Some(EntityId::from(buffer.format(entity_id)))
}
Expand All @@ -552,10 +561,10 @@ pub mod cbor {
}

#[async_trait]
impl<E, F, SS> EncryptedCommitLogMarshaller<E> for Marshaller<E, F, SS>
impl<E, F, SS, const RTB: u64> EncryptedCommitLogMarshaller<E> for Marshaller<E, F, SS, RTB>
where
for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait,
F: Fn(&E) -> u32 + Sync,
F: Fn(&E) -> u64 + Sync,
SS: SecretStore,
{
type SecretStore = SS;
Expand All @@ -573,15 +582,15 @@ pub mod cbor {
/// a supplied secret store for encryption. Entity identifiers are also
/// required to be numeric. These characteristics are reasonable when using
/// the Streambed commit log at the edge.
pub fn marshaller<E, F, S, SS>(
pub fn marshaller<E, F, S, SS, const RTB: u64>(
entity_type: EntityType,
events_key_secret_path: S,
secret_store: SS,
to_record_type: F,
) -> Marshaller<E, F, SS>
) -> Marshaller<E, F, SS, RTB>
where
for<'a> E: DeserializeOwned + Serialize + Send + Sync + 'a,
F: Fn(&E) -> u32 + Sync,
F: Fn(&E) -> u64 + Sync,
SS: SecretStore,
S: ToString,
{
Expand Down

0 comments on commit a516fe7

Please sign in to comment.