Skip to content

Commit

Permalink
Add new generic on PgStore and converter trait to decouple persistenc…
Browse files Browse the repository at this point in the history
…e from Aggregate::Event (#191)

* Add new generic on PgStore and converter trait to decouple persistence from Aggregate::Event

* Add ability to set schema in the store builder

* Add example of how to use schema for both deprecating and upcasting use cases

* Add description of schema example

* Use explicit methods on the conversion trait in place of making use of from and to

* Refactor schema example and rename functions on trait

* Refactor example into three seperate example files

* Rename Event -> Persistable and reorganize code

* Add Schema support to PgRebuilder

* Update read me with details of using Schema

* Remove allow clippy lint and specify rust toolchain for CI

* Update library version number

* Fix documentation links and naming of methods

* Return Upcaster to original location

* Move persistable to postgres module

* Remove sql feature

* Update change log
  • Loading branch information
Johnabell authored Apr 3, 2024
1 parent 04c87d8 commit 8766a1c
Show file tree
Hide file tree
Showing 19 changed files with 904 additions and 50 deletions.
2 changes: 1 addition & 1 deletion .clippy.toml
Original file line number Diff line number Diff line change
@@ -1 +1 @@
msrv = "1.58.0"
msrv = "1.74.0"
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- run: |
rustup override set 1.74.0
rustup component add rustfmt
rustup component add clippy
rustup component add rust-docs
- uses: Swatinem/rust-cache@v2
- uses: taiki-e/install-action@cargo-make
- run: cargo make fmt-check
Expand All @@ -25,6 +30,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- run: rustup override set 1.74.0
- uses: Swatinem/rust-cache@v2
- uses: taiki-e/install-action@cargo-make
- name: Add hosts entries
Expand Down
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

---
## [0.15.0] - 2024-04-03

### Added

- [[#191]]: Add new generic on `PgStore` and `Schema` trait to decouple persistence from `Aggregate::Event`.
- [[#187]]: Make the `AggregateManager` `deref` blanket implementation work for smart pointers.

### Changed

- [[#191]]: Updated MSRV to `1.74.0`.
- [[#191]]: Renamed `Event` trait to `Persistable` (this should not affect users of the library since users of the library benefit from a blanket implementation).

### Removed

- [[#191]]: Removed broken `sql` feature.

---
## [0.14.0] - 2024-01-09

Expand Down
7 changes: 3 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,15 @@ license = "MIT OR Apache-2.0"
name = "esrs"
readme = "README.md"
repository = "https://github.com/primait/event_sourcing.rs"
rust-version = "1.58.0"
version = "0.14.0"
rust-version = "1.74.0"
version = "0.15.0"

[package.metadata.docs.rs]
all-features = true

[features]
default = []
sql = ["sqlx"]
postgres = ["sql", "sqlx/postgres", "typed-builder", "tokio"]
postgres = ["sqlx", "sqlx/postgres", "typed-builder", "tokio"]
rebuilder = []
kafka = ["rdkafka", "typed-builder"]
rabbit = ["lapin", "typed-builder"]
Expand Down
2 changes: 2 additions & 0 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ script = [
"cargo run --example readme --features=postgres",
"cargo run --example rebuilder --features=rebuilder,postgres",
"cargo run --example saga --features=postgres",
"cargo run --example schema --features=postgres",
"cargo run --example shared_view --features=postgres",
"cargo run --example store_crud --features=postgres",
"cargo run --example transactional_view --features=postgres",
Expand All @@ -95,6 +96,7 @@ script = [
"cargo clippy --example readme --features=postgres -- -D warnings",
"cargo clippy --example rebuilder --features=rebuilder,postgres -- -D warnings",
"cargo clippy --example saga --features=postgres -- -D warnings",
"cargo clippy --example schema --features=postgres -- -D warnings",
"cargo clippy --example shared_view --features=postgres -- -D warnings",
"cargo clippy --example store_crud --features=postgres -- -D warnings",
"cargo clippy --example transactional_view --features=postgres -- -D warnings",
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,14 @@ store.persist(&mut state, events).await?;
To alleviate the burden of writing all of this code, you can leverage the `AggregateManager` helper. An `AggregateManager`
could be considered as a synchronous `CommandBus`.

##### Decoupling `Aggregate::Event` from the database using `Schema`

To avoid strong coupling between the domain events represented by `Aggregate::Event` and the persistence layer. It is possible to introduce a `Schema` type on the `PgStore`.

This type must implement `Schema` and `Persistable`. The mechanism enables the domain events to evolve more freely. For example, it is possible to deprecate an event variant making use of the schema (see [deprecating events example](examples/schema/deprecating_events.rs)). Additionally, this mechanism can be used as an alternative for upcasting (see [upcasting example](examples/schema/upcasting.rs)).

For an example of how to introduce a schema to an existing application see [introducing schema example](examples/schema/adding_schema.rs).

```rust
let manager: AggregateManager<_> = AggregateManager::new(store);
manager.handle_command(Default::default(), BookCommand::Buy { num_of_copies: 1 }).await
Expand Down
184 changes: 184 additions & 0 deletions examples/schema/adding_schema.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
//! This example demonstrates how it is possible to add a schema to a store.
//!
//! The module `before_schema` represents the code in the system before the introduction of the
//! schema. The first part of the example shows the state of the system before the schema is
//! introduced.
//!
//! The module `after_schema` represents the code in the system after the introduction of the
//! schema. Similarly, the second part of the example shows how the to setup the PgStore in this
//! state and how the previously persisted events are visible via the schema.
//!
//! Note that the introduction of the schema removes the need for `Aggregate::Event` to implement
//! `Persistable` instead relies on the implementation of `Schema` and the fact that `Schema`
//! implements `Persistable`.
use esrs::manager::AggregateManager;
use serde::{Deserialize, Serialize};

use sqlx::PgPool;
use uuid::Uuid;

use esrs::store::postgres::{PgStore, PgStoreBuilder};
use esrs::store::EventStore;
use esrs::AggregateState;

use crate::common::CommonError;

pub(crate) enum Command {}

mod before_schema {
//! This module represents the code of the initial iteration of the system before the
//! introduction of the schema.
use super::*;
pub(crate) struct Aggregate;

impl esrs::Aggregate for Aggregate {
const NAME: &'static str = "introducing_schema";
type State = SchemaState;
type Command = Command;
type Event = Event;
type Error = CommonError;

fn handle_command(_state: &Self::State, command: Self::Command) -> Result<Vec<Self::Event>, Self::Error> {
match command {}
}

fn apply_event(state: Self::State, payload: Self::Event) -> Self::State {
let mut events = state.events;
events.push(payload);

Self::State { events }
}
}

#[derive(Default)]
pub(crate) struct SchemaState {
pub(crate) events: Vec<Event>,
}

// Required only before the schema is introduced, after the schema is introduced
// we do not need to make the type serializable. See other schema examples.
#[derive(Deserialize, Serialize)]
pub enum Event {
A,
B { contents: String },
C { count: u64 },
}

#[cfg(feature = "upcasting")]
impl esrs::event::Upcaster for Event {}
}

mod after_schema {
//! This module represents the code after the introduction of the schema.
use super::*;
pub(crate) struct Aggregate;

impl esrs::Aggregate for Aggregate {
const NAME: &'static str = "introducing_schema";
type State = SchemaState;
type Command = Command;
type Event = Event;
type Error = CommonError;

fn handle_command(_state: &Self::State, command: Self::Command) -> Result<Vec<Self::Event>, Self::Error> {
match command {}
}

fn apply_event(state: Self::State, payload: Self::Event) -> Self::State {
let mut events = state.events;
events.push(payload);

Self::State { events }
}
}

#[derive(Default)]
pub(crate) struct SchemaState {
pub(crate) events: Vec<Event>,
}

pub enum Event {
A,
B { contents: String },
C { count: u64 },
}
#[derive(Deserialize, Serialize)]
pub enum Schema {
A,
B { contents: String },
C { count: u64 },
}

impl esrs::store::postgres::Schema<Event> for Schema {
fn from_event(value: Event) -> Self {
match value {
Event::A => Schema::A,
Event::B { contents } => Schema::B { contents },
Event::C { count } => Schema::C { count },
}
}

fn to_event(self) -> Option<Event> {
match self {
Self::A => Some(Event::A),
Self::B { contents } => Some(Event::B { contents }),
Self::C { count } => Some(Event::C { count }),
}
}
}

#[cfg(feature = "upcasting")]
impl esrs::event::Upcaster for Schema {}
}

pub(crate) async fn example(pool: PgPool) {
let aggregate_id: Uuid = Uuid::new_v4();

// Initial state of system before introduction of schema
{
use before_schema::{Aggregate, Event};

let store: PgStore<Aggregate> = PgStoreBuilder::new(pool.clone()).try_build().await.unwrap();

let events = vec![
Event::A,
Event::B {
contents: "hello world".to_owned(),
},
Event::C { count: 42 },
];

let mut state = AggregateState::with_id(aggregate_id);
let events = store.persist(&mut state, events).await.unwrap();

assert_eq!(events.len(), 3);
}

// After introducing Schema
{
use after_schema::{Aggregate, Event, Schema};

let store: PgStore<Aggregate, Schema> = PgStoreBuilder::new(pool.clone())
.with_schema::<Schema>()
.try_build()
.await
.unwrap();

let events = vec![
Event::A,
Event::B {
contents: "goodbye world".to_owned(),
},
Event::C { count: 42 },
];

let manager = AggregateManager::new(store.clone());
let mut state = manager.load(aggregate_id).await.unwrap().unwrap();
let _ = store.persist(&mut state, events).await.unwrap();

let events = manager.load(aggregate_id).await.unwrap().unwrap().into_inner().events;

assert_eq!(events.len(), 6);
}
}
Loading

0 comments on commit 8766a1c

Please sign in to comment.