Skip to content

Commit

Permalink
refactor: move all CustomQuery logic to agent_shared
Browse files Browse the repository at this point in the history
  • Loading branch information
nanderstabel committed Aug 30, 2024
1 parent 7c13929 commit 384244c
Show file tree
Hide file tree
Showing 11 changed files with 125 additions and 238 deletions.
81 changes: 5 additions & 76 deletions agent_holder/src/credential/queries/all_credentials.rs
Original file line number Diff line number Diff line change
@@ -1,83 +1,12 @@
use crate::credential::queries::{Credential, CustomQuery, ViewRepository};
use async_trait::async_trait;
use cqrs_es::{
persist::{PersistenceError, ViewContext},
EventEnvelope, Query, View,
};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::{collections::HashMap, marker::PhantomData};

use super::CredentialView;

const VIEW_ID: &str = "all_credentials";

/// A custom query trait for the Credential aggregate. This query is used to update the `AllCredentialsView`.
pub struct AllCredentialsQuery<R, V>
where
R: ViewRepository<V, Credential>,
V: View<Credential>,
{
view_repository: Arc<R>,
_phantom: PhantomData<V>,
}

impl<R, V> AllCredentialsQuery<R, V>
where
R: ViewRepository<V, Credential>,
V: View<Credential>,
{
pub fn new(view_repository: Arc<R>) -> Self {
AllCredentialsQuery {
view_repository,
_phantom: PhantomData,
}
}
}

#[async_trait]
impl<R, V> Query<Credential> for AllCredentialsQuery<R, V>
where
R: ViewRepository<V, Credential>,
V: View<Credential>,
{
// The `dispatch` method is called by the `CqrsFramework` when an event is published. By default `cqrs` will use the
// `aggregate_id` as the `view_id` when calling the `dispatch` method. We override this behavior by using the
// `VIEW_ID` constant as the `view_id`.
async fn dispatch(&self, _view_id: &str, events: &[EventEnvelope<Credential>]) {
self.apply_events(VIEW_ID, events).await.ok();
}
}

#[async_trait]
impl<R, V> CustomQuery<R, V> for AllCredentialsQuery<R, V>
where
R: ViewRepository<V, Credential>,
V: View<Credential>,
{
async fn load_mut(&self, view_id: String) -> Result<(V, ViewContext), PersistenceError> {
match self.view_repository.load_with_context(&view_id).await? {
None => {
let view_context = ViewContext::new(view_id, 0);
Ok((Default::default(), view_context))
}
Some((view, context)) => Ok((view, context)),
}
}

async fn apply_events(&self, view_id: &str, events: &[EventEnvelope<Credential>]) -> Result<(), PersistenceError> {
for event in events {
let (mut view, view_context) = self.load_mut(view_id.to_string()).await?;

view.update(event);
self.view_repository.update_view(view, view_context).await?;
}
Ok(())
}
}
use crate::credential::queries::Credential;
use cqrs_es::{EventEnvelope, View};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct AllCredentialsView {
#[serde(flatten)]
pub credentials: HashMap<String, CredentialView>,
}

Expand Down
19 changes: 1 addition & 18 deletions agent_holder/src/credential/queries/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,9 @@ pub mod all_credentials;

use super::event::CredentialEvent;
use crate::credential::aggregate::Credential;
use async_trait::async_trait;
use cqrs_es::{
persist::{PersistenceError, ViewContext, ViewRepository},
EventEnvelope, Query, View,
};
use cqrs_es::{EventEnvelope, View};
use serde::{Deserialize, Serialize};

/// A custom query trait for the Credential aggregate. This trait is used to define custom queries for the Credential aggregate
/// that do not make use of `GenericQuery`.
#[async_trait]
pub trait CustomQuery<R, V>: Query<Credential>
where
R: ViewRepository<V, Credential>,
V: View<Credential>,
{
async fn load_mut(&self, view_id: String) -> Result<(V, ViewContext), PersistenceError>;

async fn apply_events(&self, view_id: &str, events: &[EventEnvelope<Credential>]) -> Result<(), PersistenceError>;
}

#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct CredentialView {
pub credential_id: Option<String>,
Expand Down
81 changes: 5 additions & 76 deletions agent_holder/src/offer/queries/all_offers.rs
Original file line number Diff line number Diff line change
@@ -1,83 +1,12 @@
use crate::offer::queries::{CustomQuery, Offer, ViewRepository};
use async_trait::async_trait;
use cqrs_es::{
persist::{PersistenceError, ViewContext},
EventEnvelope, Query, View,
};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::{collections::HashMap, marker::PhantomData};

use super::OfferView;

const VIEW_ID: &str = "all_offers";

/// A custom query trait for the Offer aggregate. This query is used to update the `AllOffersView`.
pub struct AllOffersQuery<R, V>
where
R: ViewRepository<V, Offer>,
V: View<Offer>,
{
view_repository: Arc<R>,
_phantom: PhantomData<V>,
}

impl<R, V> AllOffersQuery<R, V>
where
R: ViewRepository<V, Offer>,
V: View<Offer>,
{
pub fn new(view_repository: Arc<R>) -> Self {
AllOffersQuery {
view_repository,
_phantom: PhantomData,
}
}
}

#[async_trait]
impl<R, V> Query<Offer> for AllOffersQuery<R, V>
where
R: ViewRepository<V, Offer>,
V: View<Offer>,
{
// The `dispatch` method is called by the `CqrsFramework` when an event is published. By default `cqrs` will use the
// `aggregate_id` as the `view_id` when calling the `dispatch` method. We override this behavior by using the
// `VIEW_ID` constant as the `view_id`.
async fn dispatch(&self, _view_id: &str, events: &[EventEnvelope<Offer>]) {
self.apply_events(VIEW_ID, events).await.ok();
}
}

#[async_trait]
impl<R, V> CustomQuery<R, V> for AllOffersQuery<R, V>
where
R: ViewRepository<V, Offer>,
V: View<Offer>,
{
async fn load_mut(&self, view_id: String) -> Result<(V, ViewContext), PersistenceError> {
match self.view_repository.load_with_context(&view_id).await? {
None => {
let view_context = ViewContext::new(view_id, 0);
Ok((Default::default(), view_context))
}
Some((view, context)) => Ok((view, context)),
}
}

async fn apply_events(&self, view_id: &str, events: &[EventEnvelope<Offer>]) -> Result<(), PersistenceError> {
for event in events {
let (mut view, view_context) = self.load_mut(view_id.to_string()).await?;

view.update(event);
self.view_repository.update_view(view, view_context).await?;
}
Ok(())
}
}
use crate::offer::queries::Offer;
use cqrs_es::{EventEnvelope, View};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct AllOffersView {
#[serde(flatten)]
pub offers: HashMap<String, OfferView>,
}

Expand Down
28 changes: 4 additions & 24 deletions agent_holder/src/offer/queries/mod.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,14 @@
pub mod all_offers;

use std::collections::HashMap;

use async_trait::async_trait;
use cqrs_es::{
persist::{PersistenceError, ViewContext, ViewRepository},
EventEnvelope, Query, View,
};
use super::aggregate::Status;
use crate::offer::aggregate::Offer;
use cqrs_es::{EventEnvelope, View};
use oid4vci::{
credential_issuer::credential_configurations_supported::CredentialConfigurationsSupportedObject,
credential_offer::CredentialOfferParameters, token_response::TokenResponse,
};
use serde::{Deserialize, Serialize};

use crate::offer::aggregate::Offer;

use super::aggregate::Status;

/// A custom query trait for the Offer aggregate. This trait is used to define custom queries for the Offer aggregate
/// that do not make use of `GenericQuery`.
#[async_trait]
pub trait CustomQuery<R, V>: Query<Offer>
where
R: ViewRepository<V, Offer>,
V: View<Offer>,
{
async fn load_mut(&self, view_id: String) -> Result<(V, ViewContext), PersistenceError>;

async fn apply_events(&self, view_id: &str, events: &[EventEnvelope<Offer>]) -> Result<(), PersistenceError>;
}
use std::collections::HashMap;

#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct OfferView {
Expand Down
5 changes: 3 additions & 2 deletions agent_issuance/src/offer/queries/access_token.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::offer::queries::{CustomQuery, Offer, OfferEvent, ViewRepository};
use crate::offer::queries::{Offer, OfferEvent, ViewRepository};
use agent_shared::custom_queries::CustomQuery;
use async_trait::async_trait;
use cqrs_es::{
persist::{PersistenceError, ViewContext},
Expand Down Expand Up @@ -43,7 +44,7 @@ where
}

#[async_trait]
impl<R, V> CustomQuery<R, V> for AccessTokenQuery<R, V>
impl<R, V> CustomQuery<R, V, Offer> for AccessTokenQuery<R, V>
where
R: ViewRepository<V, Offer>,
V: View<Offer>,
Expand Down
25 changes: 3 additions & 22 deletions agent_issuance/src/offer/queries/mod.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,14 @@
pub mod access_token;
pub mod pre_authorized_code;

use async_trait::async_trait;
use cqrs_es::{
persist::{PersistenceError, ViewContext, ViewRepository},
EventEnvelope, Query, View,
};
use super::event::OfferEvent;
use crate::offer::aggregate::Offer;
use cqrs_es::{persist::ViewRepository, EventEnvelope, View};
use oid4vci::{
credential_offer::CredentialOffer, credential_response::CredentialResponse, token_response::TokenResponse,
};
use serde::{Deserialize, Serialize};

use crate::offer::aggregate::Offer;

use super::event::OfferEvent;

/// A custom query trait for the Offer aggregate. This trait is used to define custom queries for the Offer aggregate
/// that do not make use of `GenericQuery`.
#[async_trait]
pub trait CustomQuery<R, V>: Query<Offer>
where
R: ViewRepository<V, Offer>,
V: View<Offer>,
{
async fn load_mut(&self, view_id: String) -> Result<(V, ViewContext), PersistenceError>;

async fn apply_events(&self, view_id: &str, events: &[EventEnvelope<Offer>]) -> Result<(), PersistenceError>;
}

#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct OfferView {
pub credential_offer: Option<CredentialOffer>,
Expand Down
5 changes: 3 additions & 2 deletions agent_issuance/src/offer/queries/pre_authorized_code.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::offer::queries::{CustomQuery, Offer, OfferEvent, ViewRepository};
use crate::offer::queries::{Offer, OfferEvent, ViewRepository};
use agent_shared::custom_queries::CustomQuery;
use async_trait::async_trait;
use cqrs_es::{
persist::{PersistenceError, ViewContext},
Expand Down Expand Up @@ -43,7 +44,7 @@ where
}

#[async_trait]
impl<R, V> CustomQuery<R, V> for PreAuthorizedCodeQuery<R, V>
impl<R, V> CustomQuery<R, V, Offer> for PreAuthorizedCodeQuery<R, V>
where
R: ViewRepository<V, Offer>,
V: View<Offer>,
Expand Down
Loading

0 comments on commit 384244c

Please sign in to comment.