diff --git a/.clippy.toml b/.clippy.toml index 48204101..3b9db9df 100644 --- a/.clippy.toml +++ b/.clippy.toml @@ -1 +1 @@ -msrv = "1.58.0" +msrv = "1.74.0" diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1f04da56..f79f1437 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -12,6 +12,11 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 + - run: | + rustup override set 1.74.0 + rustup component add rustfmt + rustup component add clippy + rustup component add rust-docs - uses: Swatinem/rust-cache@v2 - uses: taiki-e/install-action@cargo-make - run: cargo make fmt-check @@ -25,6 +30,7 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 + - run: rustup override set 1.74.0 - uses: Swatinem/rust-cache@v2 - uses: taiki-e/install-action@cargo-make - name: Add hosts entries diff --git a/CHANGELOG.md b/CHANGELOG.md index 6cb53156..62e95dff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +--- +## [0.15.0] - 2024-04-03 + +### Added + +- [[#191]]: Add new generic on `PgStore` and `Schema` trait to decouple persistence from `Aggregate::Event`. +- [[#187]]: Make the `AggregateManager` `deref` blanket implementation work for smart pointers. + +### Changed + +- [[#191]]: Updated MSRV to `1.74.0`. +- [[#191]]: Renamed `Event` trait to `Persistable` (this should not affect users of the library since users of the library benefit from a blanket implementation). + +### Removed + +- [[#191]]: Removed broken `sql` feature. + --- ## [0.14.0] - 2024-01-09 diff --git a/Cargo.toml b/Cargo.toml index 6418fe2a..70baebb8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,16 +8,15 @@ license = "MIT OR Apache-2.0" name = "esrs" readme = "README.md" repository = "https://github.com/primait/event_sourcing.rs" -rust-version = "1.58.0" -version = "0.14.0" +rust-version = "1.74.0" +version = "0.15.0" [package.metadata.docs.rs] all-features = true [features] default = [] -sql = ["sqlx"] -postgres = ["sql", "sqlx/postgres", "typed-builder", "tokio"] +postgres = ["sqlx", "sqlx/postgres", "typed-builder", "tokio"] rebuilder = [] kafka = ["rdkafka", "typed-builder"] rabbit = ["lapin", "typed-builder"] diff --git a/Makefile.toml b/Makefile.toml index 39fb91a4..8ef5af60 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -78,6 +78,7 @@ script = [ "cargo run --example readme --features=postgres", "cargo run --example rebuilder --features=rebuilder,postgres", "cargo run --example saga --features=postgres", + "cargo run --example schema --features=postgres", "cargo run --example shared_view --features=postgres", "cargo run --example store_crud --features=postgres", "cargo run --example transactional_view --features=postgres", @@ -95,6 +96,7 @@ script = [ "cargo clippy --example readme --features=postgres -- -D warnings", "cargo clippy --example rebuilder --features=rebuilder,postgres -- -D warnings", "cargo clippy --example saga --features=postgres -- -D warnings", + "cargo clippy --example schema --features=postgres -- -D warnings", "cargo clippy --example shared_view --features=postgres -- -D warnings", "cargo clippy --example store_crud --features=postgres -- -D warnings", "cargo clippy --example transactional_view --features=postgres -- -D warnings", diff --git a/README.md b/README.md index 5059a7d6..7499f0e4 100644 --- a/README.md +++ b/README.md @@ -212,6 +212,14 @@ store.persist(&mut state, events).await?; To alleviate the burden of writing all of this code, you can leverage the `AggregateManager` helper. An `AggregateManager` could be considered as a synchronous `CommandBus`. +##### Decoupling `Aggregate::Event` from the database using `Schema` + +To avoid strong coupling between the domain events represented by `Aggregate::Event` and the persistence layer. It is possible to introduce a `Schema` type on the `PgStore`. + +This type must implement `Schema` and `Persistable`. The mechanism enables the domain events to evolve more freely. For example, it is possible to deprecate an event variant making use of the schema (see [deprecating events example](examples/schema/deprecating_events.rs)). Additionally, this mechanism can be used as an alternative for upcasting (see [upcasting example](examples/schema/upcasting.rs)). + +For an example of how to introduce a schema to an existing application see [introducing schema example](examples/schema/adding_schema.rs). + ```rust let manager: AggregateManager<_> = AggregateManager::new(store); manager.handle_command(Default::default(), BookCommand::Buy { num_of_copies: 1 }).await diff --git a/examples/schema/adding_schema.rs b/examples/schema/adding_schema.rs new file mode 100644 index 00000000..8c007151 --- /dev/null +++ b/examples/schema/adding_schema.rs @@ -0,0 +1,184 @@ +//! This example demonstrates how it is possible to add a schema to a store. +//! +//! The module `before_schema` represents the code in the system before the introduction of the +//! schema. The first part of the example shows the state of the system before the schema is +//! introduced. +//! +//! The module `after_schema` represents the code in the system after the introduction of the +//! schema. Similarly, the second part of the example shows how the to setup the PgStore in this +//! state and how the previously persisted events are visible via the schema. +//! +//! Note that the introduction of the schema removes the need for `Aggregate::Event` to implement +//! `Persistable` instead relies on the implementation of `Schema` and the fact that `Schema` +//! implements `Persistable`. + +use esrs::manager::AggregateManager; +use serde::{Deserialize, Serialize}; + +use sqlx::PgPool; +use uuid::Uuid; + +use esrs::store::postgres::{PgStore, PgStoreBuilder}; +use esrs::store::EventStore; +use esrs::AggregateState; + +use crate::common::CommonError; + +pub(crate) enum Command {} + +mod before_schema { + //! This module represents the code of the initial iteration of the system before the + //! introduction of the schema. + use super::*; + pub(crate) struct Aggregate; + + impl esrs::Aggregate for Aggregate { + const NAME: &'static str = "introducing_schema"; + type State = SchemaState; + type Command = Command; + type Event = Event; + type Error = CommonError; + + fn handle_command(_state: &Self::State, command: Self::Command) -> Result, Self::Error> { + match command {} + } + + fn apply_event(state: Self::State, payload: Self::Event) -> Self::State { + let mut events = state.events; + events.push(payload); + + Self::State { events } + } + } + + #[derive(Default)] + pub(crate) struct SchemaState { + pub(crate) events: Vec, + } + + // Required only before the schema is introduced, after the schema is introduced + // we do not need to make the type serializable. See other schema examples. + #[derive(Deserialize, Serialize)] + pub enum Event { + A, + B { contents: String }, + C { count: u64 }, + } + + #[cfg(feature = "upcasting")] + impl esrs::event::Upcaster for Event {} +} + +mod after_schema { + //! This module represents the code after the introduction of the schema. + use super::*; + pub(crate) struct Aggregate; + + impl esrs::Aggregate for Aggregate { + const NAME: &'static str = "introducing_schema"; + type State = SchemaState; + type Command = Command; + type Event = Event; + type Error = CommonError; + + fn handle_command(_state: &Self::State, command: Self::Command) -> Result, Self::Error> { + match command {} + } + + fn apply_event(state: Self::State, payload: Self::Event) -> Self::State { + let mut events = state.events; + events.push(payload); + + Self::State { events } + } + } + + #[derive(Default)] + pub(crate) struct SchemaState { + pub(crate) events: Vec, + } + + pub enum Event { + A, + B { contents: String }, + C { count: u64 }, + } + #[derive(Deserialize, Serialize)] + pub enum Schema { + A, + B { contents: String }, + C { count: u64 }, + } + + impl esrs::store::postgres::Schema for Schema { + fn from_event(value: Event) -> Self { + match value { + Event::A => Schema::A, + Event::B { contents } => Schema::B { contents }, + Event::C { count } => Schema::C { count }, + } + } + + fn to_event(self) -> Option { + match self { + Self::A => Some(Event::A), + Self::B { contents } => Some(Event::B { contents }), + Self::C { count } => Some(Event::C { count }), + } + } + } + + #[cfg(feature = "upcasting")] + impl esrs::event::Upcaster for Schema {} +} + +pub(crate) async fn example(pool: PgPool) { + let aggregate_id: Uuid = Uuid::new_v4(); + + // Initial state of system before introduction of schema + { + use before_schema::{Aggregate, Event}; + + let store: PgStore = PgStoreBuilder::new(pool.clone()).try_build().await.unwrap(); + + let events = vec![ + Event::A, + Event::B { + contents: "hello world".to_owned(), + }, + Event::C { count: 42 }, + ]; + + let mut state = AggregateState::with_id(aggregate_id); + let events = store.persist(&mut state, events).await.unwrap(); + + assert_eq!(events.len(), 3); + } + + // After introducing Schema + { + use after_schema::{Aggregate, Event, Schema}; + + let store: PgStore = PgStoreBuilder::new(pool.clone()) + .with_schema::() + .try_build() + .await + .unwrap(); + + let events = vec![ + Event::A, + Event::B { + contents: "goodbye world".to_owned(), + }, + Event::C { count: 42 }, + ]; + + let manager = AggregateManager::new(store.clone()); + let mut state = manager.load(aggregate_id).await.unwrap().unwrap(); + let _ = store.persist(&mut state, events).await.unwrap(); + + let events = manager.load(aggregate_id).await.unwrap().unwrap().into_inner().events; + + assert_eq!(events.len(), 6); + } +} diff --git a/examples/schema/deprecating_events.rs b/examples/schema/deprecating_events.rs new file mode 100644 index 00000000..ee92e9be --- /dev/null +++ b/examples/schema/deprecating_events.rs @@ -0,0 +1,219 @@ +//! This example demonstrates how it is possible to deprecate an event using the Schema mechanism +//! +//! The module `before_deprecation` represents the code in the system before the deprecation of an +//! event. The first part of the example shows the state of the system before the event is +//! deprecated. +//! +//! The module `after_deprecation` represents the code in the system after the deprecation of an +//! schema. Similarly, the second part of the example shows how the deprecation of the event will +//! play out in a running system. +//! +//! Note the only place that still required to reference the deprecated event is in the schema type +//! itself, the rest of the system can simply operate as if it never existed. + +use esrs::manager::AggregateManager; +use serde::{Deserialize, Serialize}; + +use sqlx::PgPool; +use uuid::Uuid; + +use esrs::store::postgres::{PgStore, PgStoreBuilder}; +use esrs::store::EventStore; +use esrs::AggregateState; + +use crate::common::CommonError; + +pub(crate) enum Command {} + +mod before_deprecation { + //! This module represents the code of the initial iteration of the system. + use super::*; + pub(crate) struct Aggregate; + impl esrs::Aggregate for Aggregate { + const NAME: &'static str = "schema_deprecation"; + type State = State; + type Command = Command; + type Event = Event; + type Error = CommonError; + + fn handle_command(_state: &Self::State, command: Self::Command) -> Result, Self::Error> { + match command {} + } + + fn apply_event(state: Self::State, payload: Self::Event) -> Self::State { + let mut events = state.events; + events.push(payload); + + Self::State { events } + } + } + + #[derive(Default)] + pub(crate) struct State { + pub(crate) events: Vec, + } + + pub enum Event { + A, + B { contents: String }, + C { count: u64 }, + } + + #[derive(Deserialize, Serialize)] + pub enum Schema { + A, + B { contents: String }, + C { count: u64 }, + } + + #[cfg(feature = "upcasting")] + impl esrs::event::Upcaster for Schema {} + + impl esrs::store::postgres::Schema for Schema { + fn from_event(value: Event) -> Self { + match value { + Event::A => Schema::A, + Event::B { contents } => Schema::B { contents }, + Event::C { count } => Schema::C { count }, + } + } + + fn to_event(self) -> Option { + match self { + Self::A => Some(Event::A), + Self::B { contents } => Some(Event::B { contents }), + Self::C { count } => Some(Event::C { count }), + } + } + } +} + +mod after_deprecation { + //! This module represents the code after `Event::B` has been deprecated and a new event + //! (`Event::C`) has been introduced + use super::*; + + pub(crate) struct Aggregate; + impl esrs::Aggregate for Aggregate { + const NAME: &'static str = "schema_deprecation"; + type State = State; + type Command = Command; + type Event = Event; + type Error = CommonError; + + fn handle_command(_state: &Self::State, command: Self::Command) -> Result, Self::Error> { + match command {} + } + + fn apply_event(state: Self::State, payload: Self::Event) -> Self::State { + let mut events = state.events; + events.push(payload); + + Self::State { events } + } + } + + #[derive(Default)] + pub(crate) struct State { + pub(crate) events: Vec, + } + + // Event B does not live here + pub enum Event { + A, + C { count: u64 }, + D { contents: String, count: u64 }, + } + + // Event B exists here just for deserialization + #[derive(Deserialize, Serialize)] + pub enum Schema { + A, + B { contents: String }, + C { count: u64 }, + D { contents: String, count: u64 }, + } + + #[cfg(feature = "upcasting")] + impl esrs::event::Upcaster for Schema {} + + impl esrs::store::postgres::Schema for Schema { + fn from_event(value: Event) -> Self { + match value { + Event::A => Schema::A, + Event::C { count } => Schema::C { count }, + Event::D { contents, count } => Schema::D { contents, count }, + } + } + + fn to_event(self) -> Option { + match self { + Self::A => Some(Event::A), + Self::B { .. } => None, + Self::C { count } => Some(Event::C { count }), + Self::D { contents, count } => Some(Event::D { contents, count }), + } + } + } +} + +pub(crate) async fn example(pool: PgPool) { + let aggregate_id: Uuid = Uuid::new_v4(); + + // Initial state of system + { + use before_deprecation::{Aggregate, Event, Schema}; + + let store: PgStore = PgStoreBuilder::new(pool.clone()) + .with_schema::() + .try_build() + .await + .unwrap(); + + let events = vec![ + Event::A, + Event::B { + contents: "goodbye world".to_owned(), + }, + Event::C { count: 42 }, + ]; + + let mut state = AggregateState::with_id(aggregate_id); + let events = store.persist(&mut state, events).await.unwrap(); + + assert_eq!(events.len(), 3); + } + + // After deprecation + { + use after_deprecation::{Aggregate, Event, Schema}; + + let store: PgStore = PgStoreBuilder::new(pool.clone()) + .with_schema::() + .try_build() + .await + .unwrap(); + + let events = store.by_aggregate_id(aggregate_id).await.unwrap(); + + assert_eq!(events.len(), 2); + + let events = vec![ + Event::A, + Event::C { count: 42 }, + Event::D { + contents: "this is the new events".to_owned(), + count: 21, + }, + ]; + + let manager = AggregateManager::new(store.clone()); + let mut state = manager.load(aggregate_id).await.unwrap().unwrap(); + let _ = store.persist(&mut state, events).await.unwrap(); + + let events = manager.load(aggregate_id).await.unwrap().unwrap().into_inner().events; + + // The deprecated events are skipped + assert_eq!(events.len(), 5); + } +} diff --git a/examples/schema/main.rs b/examples/schema/main.rs new file mode 100644 index 00000000..93c7629d --- /dev/null +++ b/examples/schema/main.rs @@ -0,0 +1,23 @@ +//! These example demonstrates several ways a system can evolve through the use of the Schema +//! mechanism. +//! +//! - The first example demonstrates how to add a schema to an existing store. +//! - The second example demonstrates how to use the schema mechanism to deprecate certain event +//! types. +//! - The third example demonstrates how to use the schema mechanism to upcast events. + +mod adding_schema; +#[path = "../common/lib.rs"] +mod common; +mod deprecating_events; +mod upcasting; + +use crate::common::util::new_pool; + +#[tokio::main] +async fn main() { + let pool = new_pool().await; + adding_schema::example(pool.clone()).await; + deprecating_events::example(pool.clone()).await; + upcasting::example(pool).await; +} diff --git a/examples/schema/upcasting.rs b/examples/schema/upcasting.rs new file mode 100644 index 00000000..1d1b047a --- /dev/null +++ b/examples/schema/upcasting.rs @@ -0,0 +1,215 @@ +//! This example demonstrates how it is possible to use the schema mechanism for upcasting. +//! +//! The module `before_upcasting` represents the code in the system before the upcasting of an +//! event. The first part of the example shows the state of the system before the event is +//! upcasted. +//! +//! The module `after_upcasting` represents the code in the system after the upcasting of an +//! schema. Similarly, the second part of the example shows how the upcasting of the event will +//! play out in a running system. +//! +//! Note the only place that still required to reference the original shape of the event is in the +//! schema itself and the rest of the system can simply operate as if the event has always been of +//! this shape. + +use esrs::manager::AggregateManager; +use serde::{Deserialize, Serialize}; + +use sqlx::PgPool; +use uuid::Uuid; + +use esrs::store::postgres::{PgStore, PgStoreBuilder}; +use esrs::store::EventStore; +use esrs::AggregateState; + +use crate::common::CommonError; + +pub(crate) enum Command {} + +mod before_upcasting { + //! This module represents the code of the initial iteration of the system. + use super::*; + pub(crate) struct Aggregate; + impl esrs::Aggregate for Aggregate { + const NAME: &'static str = "schema_upcasting"; + type State = State; + type Command = Command; + type Event = Event; + type Error = CommonError; + + fn handle_command(_state: &Self::State, command: Self::Command) -> Result, Self::Error> { + match command {} + } + + fn apply_event(state: Self::State, payload: Self::Event) -> Self::State { + let mut events = state.events; + events.push(payload); + + Self::State { events } + } + } + + #[derive(Default)] + pub(crate) struct State { + pub(crate) events: Vec, + } + + pub enum Event { + A, + B { contents: String }, + C { count: u64 }, + } + + #[derive(Deserialize, Serialize)] + pub enum Schema { + A, + B { contents: String }, + C { count: u64 }, + } + + #[cfg(feature = "upcasting")] + impl esrs::event::Upcaster for Schema {} + + impl esrs::store::postgres::Schema for Schema { + fn from_event(value: Event) -> Self { + match value { + Event::A => Schema::A, + Event::B { contents } => Schema::B { contents }, + Event::C { count } => Schema::C { count }, + } + } + + fn to_event(self) -> Option { + match self { + Self::A => Some(Event::A), + Self::B { contents } => Some(Event::B { contents }), + Self::C { count } => Some(Event::C { count }), + } + } + } +} + +mod after_upcasting { + //! This module represents the code after the upcasting has been implemented + use super::*; + pub(crate) struct Aggregate; + impl esrs::Aggregate for Aggregate { + const NAME: &'static str = "schema_upcasting"; + type State = State; + type Command = Command; + type Event = Event; + type Error = CommonError; + + fn handle_command(_state: &Self::State, command: Self::Command) -> Result, Self::Error> { + match command {} + } + + fn apply_event(state: Self::State, payload: Self::Event) -> Self::State { + let mut events = state.events; + events.push(payload); + + Self::State { events } + } + } + + #[derive(Default)] + pub(crate) struct State { + pub(crate) events: Vec, + } + + pub enum Event { + A, + B { contents: String, count: u64 }, + C { count: u64 }, + } + + #[derive(Deserialize, Serialize)] + pub enum Schema { + A, + B { contents: String }, + C { count: u64 }, + D { contents: String, count: u64 }, + } + + #[cfg(feature = "upcasting")] + impl esrs::event::Upcaster for Schema {} + + impl esrs::store::postgres::Schema for Schema { + fn from_event(value: Event) -> Self { + match value { + Event::A => Schema::A, + Event::C { count } => Schema::C { count }, + Event::B { contents, count } => Schema::D { contents, count }, + } + } + + fn to_event(self) -> Option { + match self { + Schema::A => Some(Event::A), + Schema::B { contents } => Some(Event::B { contents, count: 1 }), + Schema::C { count } => Some(Event::C { count }), + Schema::D { contents, count } => Some(Event::B { contents, count }), + } + } + } +} + +pub(crate) async fn example(pool: PgPool) { + let aggregate_id: Uuid = Uuid::new_v4(); + + // Initial state of the system + { + use before_upcasting::{Aggregate, Event, Schema}; + + let store: PgStore = PgStoreBuilder::new(pool.clone()) + .with_schema::() + .try_build() + .await + .unwrap(); + + let events = vec![ + Event::A, + Event::B { + contents: "goodbye world".to_owned(), + }, + Event::C { count: 42 }, + ]; + + let mut state = AggregateState::with_id(aggregate_id); + let events = store.persist(&mut state, events).await.unwrap(); + + assert_eq!(events.len(), 3); + } + + // After upcasting before_upcasting::Event::B to after_upcasting::Event::B + { + use after_upcasting::{Aggregate, Event, Schema}; + + let store: PgStore = PgStoreBuilder::new(pool.clone()) + .with_schema::() + .try_build() + .await + .unwrap(); + + let events = vec![ + Event::A, + Event::C { count: 42 }, + Event::B { + contents: "this is the new events".to_owned(), + count: 21, + }, + ]; + + let manager = AggregateManager::new(store.clone()); + let mut state = manager.load(aggregate_id).await.unwrap().unwrap(); + let _ = store.persist(&mut state, events).await.unwrap(); + + let persisted_events = manager.load(aggregate_id).await.unwrap().unwrap().into_inner().events; + + // All the events are visible + assert_eq!(persisted_events.len(), 6); + + // The events have been upcasted + assert!(matches!(persisted_events[1], Event::B { count: 1, .. })); + } +} diff --git a/src/event.rs b/src/event.rs index fd9c8a16..b0205ec1 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,19 +1,5 @@ use serde::de::DeserializeOwned; -use serde::Serialize; -#[cfg(not(feature = "upcasting"))] -pub trait Event: Serialize + DeserializeOwned {} - -#[cfg(not(feature = "upcasting"))] -impl Event for T where T: Serialize + DeserializeOwned {} - -#[cfg(feature = "upcasting")] -pub trait Event: Serialize + DeserializeOwned + Upcaster {} - -#[cfg(feature = "upcasting")] -impl Event for T where T: Serialize + DeserializeOwned + Upcaster {} - -#[cfg(feature = "upcasting")] pub trait Upcaster where Self: Sized, diff --git a/src/lib.rs b/src/lib.rs index 38d125ab..8e22e1c3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,6 +17,7 @@ mod aggregate; mod state; pub mod bus; +#[cfg(feature = "upcasting")] pub mod event; pub mod handler; pub mod manager; @@ -24,7 +25,7 @@ pub mod store; #[cfg(feature = "rebuilder")] pub mod rebuilder; -#[cfg(feature = "sql")] +#[cfg(feature = "postgres")] pub mod sql; pub mod types { diff --git a/src/rebuilder/pg_rebuilder.rs b/src/rebuilder/pg_rebuilder.rs index 57ee1715..b7b55879 100644 --- a/src/rebuilder/pg_rebuilder.rs +++ b/src/rebuilder/pg_rebuilder.rs @@ -1,23 +1,26 @@ +use std::marker::PhantomData; + use async_trait::async_trait; use futures::StreamExt; use sqlx::{PgConnection, Pool, Postgres, Transaction}; use uuid::Uuid; use crate::bus::EventBus; -use crate::event::Event; use crate::handler::{ReplayableEventHandler, TransactionalEventHandler}; use crate::rebuilder::Rebuilder; -use crate::store::postgres::{PgStore, PgStoreBuilder, PgStoreError}; +use crate::store::postgres::persistable::Persistable; +use crate::store::postgres::{PgStore, PgStoreBuilder, PgStoreError, Schema}; use crate::store::{EventStore, StoreEvent}; use crate::Aggregate; -pub struct PgRebuilder +pub struct PgRebuilder::Event> where A: Aggregate, { event_handlers: Vec + Send>>, transactional_event_handlers: Vec + Send>>, event_buses: Vec + Send>>, + _schema: PhantomData, } impl PgRebuilder @@ -56,16 +59,18 @@ where event_handlers: vec![], transactional_event_handlers: vec![], event_buses: vec![], + _schema: PhantomData, } } } #[async_trait] -impl Rebuilder for PgRebuilder +impl Rebuilder for PgRebuilder where A: Aggregate, - A::Event: Event + Send + Sync, A::State: Send, + A::Event: Send + Sync, + S: Schema + Persistable + Send + Sync, { type Executor = Pool; type Error = PgStoreError; @@ -77,8 +82,9 @@ where /// events is processed by the mentioned handlers. /// Finally the events are passed to every configured [`EventBus`]. async fn by_aggregate_id(&self, pool: Pool) -> Result<(), Self::Error> { - let store: PgStore = PgStoreBuilder::new(pool.clone()) + let store: PgStore = PgStoreBuilder::new(pool.clone()) .without_running_migrations() + .with_schema::() .try_build() .await?; @@ -122,7 +128,8 @@ where /// events are handled. After the transaction ends, for each [`crate::handler::EventHandler`] /// and [`EventBus`], the events are handled. async fn all_at_once(&self, pool: Pool) -> Result<(), Self::Error> { - let store: PgStore = PgStoreBuilder::new(pool.clone()) + let store: PgStore = PgStoreBuilder::new(pool.clone()) + .with_schema::() .without_running_migrations() .try_build() .await?; diff --git a/src/sql/event.rs b/src/sql/event.rs index ea90ba43..d2ec21f4 100644 --- a/src/sql/event.rs +++ b/src/sql/event.rs @@ -4,7 +4,8 @@ use chrono::{DateTime, Utc}; use serde_json::Value; use uuid::Uuid; -use crate::event::Event; +use crate::store::postgres::persistable::Persistable; +use crate::store::postgres::Schema; use crate::store::StoreEvent; use crate::types::SequenceNumber; @@ -19,7 +20,31 @@ pub struct DbEvent { pub version: Option, } -impl TryInto> for DbEvent { +impl DbEvent { + pub fn try_into_store_event(self) -> Result>, serde_json::Error> + where + S: Schema, + { + #[cfg(feature = "upcasting")] + let payload = S::upcast(self.payload, self.version)?.to_event(); + #[cfg(not(feature = "upcasting"))] + let payload = serde_json::from_value::(self.payload)?.to_event(); + + Ok(match payload { + None => None, + Some(payload) => Some(StoreEvent { + id: self.id, + aggregate_id: self.aggregate_id, + payload, + occurred_on: self.occurred_on, + sequence_number: self.sequence_number, + version: self.version, + }), + }) + } +} + +impl TryInto> for DbEvent { type Error = serde_json::Error; fn try_into(self) -> Result, Self::Error> { diff --git a/src/store/postgres/builder.rs b/src/store/postgres/builder.rs index 53cada9b..04b72088 100644 --- a/src/store/postgres/builder.rs +++ b/src/store/postgres/builder.rs @@ -1,3 +1,4 @@ +use std::marker::PhantomData; use std::sync::Arc; use sqlx::{PgConnection, Pool, Postgres}; @@ -10,10 +11,11 @@ use crate::sql::statements::{Statements, StatementsHandler}; use crate::store::postgres::{InnerPgStore, PgStoreError}; use crate::Aggregate; -use super::PgStore; +use super::persistable::Persistable; +use super::{PgStore, Schema}; /// Struct used to build a brand new [`PgStore`]. -pub struct PgStoreBuilder +pub struct PgStoreBuilder::Event> where A: Aggregate, { @@ -23,14 +25,15 @@ where transactional_event_handlers: Vec + Send>>, event_buses: Vec + Send>>, run_migrations: bool, + _schema: PhantomData, } -impl PgStoreBuilder +impl PgStoreBuilder::Event> where A: Aggregate, { /// Creates a new instance of a [`PgStoreBuilder`]. - pub fn new(pool: Pool) -> Self { + pub fn new(pool: Pool) -> PgStoreBuilder::Event> { PgStoreBuilder { pool, statements: Statements::new::(), @@ -38,9 +41,15 @@ where transactional_event_handlers: vec![], event_buses: vec![], run_migrations: true, + _schema: PhantomData, } } +} +impl PgStoreBuilder +where + A: Aggregate, +{ /// Set event handlers list pub fn with_event_handlers(mut self, event_handlers: Vec + Send>>) -> Self { self.event_handlers = event_handlers; @@ -91,6 +100,22 @@ where self } + /// Set the schema of the underlying PgStore. + pub fn with_schema(self) -> PgStoreBuilder + where + N: Schema + Persistable + Send + Sync, + { + PgStoreBuilder { + pool: self.pool, + statements: self.statements, + run_migrations: self.run_migrations, + event_handlers: self.event_handlers, + transactional_event_handlers: self.transactional_event_handlers, + event_buses: self.event_buses, + _schema: PhantomData, + } + } + /// This function runs all the needed [`Migrations`], atomically setting up the database if /// `run_migrations` isn't explicitly set to false. [`Migrations`] should be run only at application /// startup due to avoid performance issues. @@ -100,7 +125,7 @@ where /// # Errors /// /// Will return an `Err` if there's an error running [`Migrations`]. - pub async fn try_build(self) -> Result, sqlx::Error> { + pub async fn try_build(self) -> Result, sqlx::Error> { if self.run_migrations { Migrations::run::(&self.pool).await?; } @@ -113,6 +138,7 @@ where transactional_event_handlers: self.transactional_event_handlers, event_buses: self.event_buses, }), + _schema: self._schema, }) } } diff --git a/src/store/postgres/event_store.rs b/src/store/postgres/event_store.rs index c9dc1809..e39442a6 100644 --- a/src/store/postgres/event_store.rs +++ b/src/store/postgres/event_store.rs @@ -1,4 +1,4 @@ -use std::convert::TryInto; +use std::marker::PhantomData; use std::sync::Arc; use async_trait::async_trait; @@ -13,11 +13,12 @@ use tokio::sync::RwLock; use uuid::Uuid; use crate::bus::EventBus; -use crate::event::Event; use crate::handler::{EventHandler, TransactionalEventHandler}; use crate::sql::event::DbEvent; use crate::sql::statements::{Statements, StatementsHandler}; +use crate::store::postgres::persistable::Persistable; use crate::store::postgres::PgStoreError; +use crate::store::postgres::Schema; use crate::store::{EventStore, EventStoreLockGuard, StoreEvent, UnlockOnDrop}; use crate::types::SequenceNumber; use crate::{Aggregate, AggregateState}; @@ -27,11 +28,24 @@ use crate::{Aggregate, AggregateState}; /// /// The store is protected by an [`Arc`] that allows it to be cloneable still having the same memory /// reference. -pub struct PgStore +/// +/// To decouple persistence from the event types, it is possible to optionally, specify the +/// Database event schema for this store as a type that implements [`Persistable`] and +/// [`Schema`]. +/// +/// When events are persisted, they will first be converted to the schema type using +/// [`Schema::from_event`] then serialized using the [`serde::Serialize`] implementation on schema. +/// +/// When events are read from the store, they will first be deserialized into the schema type and +/// then converted into an [`Option`] using [`Schema::from_event`]. In this way +/// it is possible to remove deprecate events in core part of your application by returning [`None`] +/// from [`Schema::from_event`]. +pub struct PgStore::Event> where A: Aggregate, { pub(super) inner: Arc>, + pub(super) _schema: PhantomData, } pub(super) struct InnerPgStore @@ -46,10 +60,11 @@ where pub(super) event_buses: Vec + Send>>, } -impl PgStore +impl PgStore where A: Aggregate, - A::Event: Event + Sync, + A::Event: Send + Sync, + S: Schema + Persistable + Send + Sync, { /// Returns the name of the event store table pub fn table_name(&self) -> &str { @@ -83,17 +98,15 @@ where let id: Uuid = Uuid::new_v4(); #[cfg(feature = "upcasting")] - let version: Option = { - use crate::event::Upcaster; - A::Event::current_version() - }; + let version: Option = S::current_version(); #[cfg(not(feature = "upcasting"))] let version: Option = None; + let schema = S::from_event(event); let _ = sqlx::query(self.inner.statements.insert()) .bind(id) .bind(aggregate_id) - .bind(Json(&event)) + .bind(Json(&schema)) .bind(occurred_on) .bind(sequence_number) .bind(version) @@ -103,7 +116,10 @@ where Ok(StoreEvent { id, aggregate_id, - payload: event, + payload: schema.to_event().expect( + "For any type that implements Schema the following contract should be upheld:\ + assert_eq!(Some(event.clone()), Schema::from_event(event).to_event())", + ), occurred_on, sequence_number, version, @@ -119,7 +135,9 @@ where Box::pin({ sqlx::query_as::<_, DbEvent>(self.inner.statements.select_all()) .fetch(executor) - .map(|res| Ok(res?.try_into()?)) + .map(|res| Ok(res?.try_into_store_event::<_, S>()?)) + .map(Result::transpose) + .filter_map(std::future::ready) }) } } @@ -140,11 +158,12 @@ pub struct PgStoreLockGuard { impl UnlockOnDrop for PgStoreLockGuard {} #[async_trait] -impl EventStore for PgStore +impl EventStore for PgStore where A: Aggregate, A::State: Send, - A::Event: Event + Send + Sync, + A::Event: Send + Sync, + S: Schema + Persistable + Send + Sync, { type Aggregate = A; type Error = PgStoreError; @@ -167,10 +186,13 @@ where .fetch_all(&self.inner.pool) .await? .into_iter() - .map(|event| Ok(event.try_into()?)) + .map(|event| Ok(event.try_into_store_event::<_, S>()?)) + .filter_map(Result::transpose) .collect::>, Self::Error>>()?) } + // Clippy introduced `blocks_in_conditions` lint. With certain version of rust and tracing this + // line throws an error see: https://github.com/rust-lang/rust-clippy/issues/12281 #[tracing::instrument(skip_all, fields(aggregate_id = % aggregate_state.id()), err)] async fn persist( &self, @@ -301,13 +323,14 @@ impl std::fmt::Debug for PgStore { } } -impl Clone for PgStore +impl Clone for PgStore where A: Aggregate, { fn clone(&self) -> Self { Self { inner: Arc::clone(&self.inner), + _schema: PhantomData, } } } diff --git a/src/store/postgres/mod.rs b/src/store/postgres/mod.rs index 3f55112a..9646b1c5 100644 --- a/src/store/postgres/mod.rs +++ b/src/store/postgres/mod.rs @@ -1,8 +1,11 @@ pub use builder::*; pub use event_store::*; +pub use schema::*; mod builder; mod event_store; +pub mod persistable; +mod schema; // Trait aliases are experimental. See issue #41517 // trait PgTransactionalEventHandler = TransactionalEventHandler where A: Aggregate; diff --git a/src/store/postgres/persistable.rs b/src/store/postgres/persistable.rs new file mode 100644 index 00000000..fb07a347 --- /dev/null +++ b/src/store/postgres/persistable.rs @@ -0,0 +1,14 @@ +use serde::de::DeserializeOwned; +use serde::Serialize; + +#[cfg(not(feature = "upcasting"))] +pub trait Persistable: Serialize + DeserializeOwned {} + +#[cfg(not(feature = "upcasting"))] +impl Persistable for T where T: Serialize + DeserializeOwned {} + +#[cfg(feature = "upcasting")] +pub trait Persistable: Serialize + DeserializeOwned + crate::event::Upcaster {} + +#[cfg(feature = "upcasting")] +impl Persistable for T where T: Serialize + DeserializeOwned + crate::event::Upcaster {} diff --git a/src/store/postgres/schema.rs b/src/store/postgres/schema.rs new file mode 100644 index 00000000..60f5e0df --- /dev/null +++ b/src/store/postgres/schema.rs @@ -0,0 +1,96 @@ +use super::persistable::Persistable; + +/// To support decoupling between the [`crate::Aggregate::Event`] type and the schema of the DB table +/// in [`super::PgStore`] you can create a schema type that implements [`Persistable`] and [`Schema`] +/// where `E = Aggregate::Event`. +/// +/// Note: Although [`Schema::to_event`] returns an [`Option`] for any given event and implementation. +/// +/// The following must hold +/// +/// ```rust +/// # use serde::{Serialize, Deserialize}; +/// # use esrs::store::postgres::Schema as SchemaTrait; +/// # +/// # #[derive(Clone, Eq, PartialEq, Debug)] +/// # struct Event { +/// # a: u32, +/// # } +/// # +/// # #[derive(Serialize, Deserialize)] +/// # struct Schema { +/// # a: u32, +/// # } +/// # +/// # #[cfg(feature = "upcasting")] +/// # impl esrs::event::Upcaster for Schema {} +/// # +/// # impl SchemaTrait for Schema { +/// # fn from_event(Event { a }: Event) -> Self { +/// # Self { a } +/// # } +/// # +/// # fn to_event(self) -> Option { +/// # Some(Event { a: self.a }) +/// # } +/// # } +/// # +/// # let event = Event { a: 42 }; +/// assert_eq!(Some(event.clone()), Schema::from_event(event).to_event()); +/// ``` +pub trait Schema: Persistable { + /// Converts the event into the schema type. + fn from_event(event: E) -> Self; + + /// Converts the schema into the event type. + /// + /// This returns an option to enable skipping deprecated event which are persisted in the DB. + /// + /// Note: Although this function returns an [`Option`] for any given event and implementation. + /// + /// The following must hold + /// + /// ```rust + /// # use serde::{Serialize, Deserialize}; + /// # use esrs::store::postgres::Schema as SchemaTrait; + /// # + /// # #[derive(Clone, Eq, PartialEq, Debug)] + /// # struct Event { + /// # a: u32, + /// # } + /// # + /// # #[derive(Serialize, Deserialize)] + /// # struct Schema { + /// # a: u32, + /// # } + /// # + /// # #[cfg(feature = "upcasting")] + /// # impl esrs::event::Upcaster for Schema {} + /// # + /// # impl SchemaTrait for Schema { + /// # fn from_event(Event { a }: Event) -> Self { + /// # Self { a } + /// # } + /// # + /// # fn to_event(self) -> Option { + /// # Some(Event { a: self.a }) + /// # } + /// # } + /// # + /// # let event = Event { a: 42 }; + /// assert_eq!(Some(event.clone()), Schema::from_event(event).to_event()); + /// ``` + fn to_event(self) -> Option; +} + +impl Schema for E +where + E: Persistable, +{ + fn from_event(event: E) -> Self { + event + } + fn to_event(self) -> Option { + Some(self) + } +}