From 7bcc7308a2ed287b99e98055089de5d10ef6160c Mon Sep 17 00:00:00 2001 From: Nander Stabel Date: Tue, 27 Aug 2024 21:22:55 +0200 Subject: [PATCH] feat: add credentials endpoint for Holder --- .../src/holder/holder/credentials/mod.rs | 18 ++++ agent_api_rest/src/holder/holder/mod.rs | 1 + agent_api_rest/src/lib.rs | 1 + agent_application/docker/db/init.sql | 9 ++ .../src/credential/queries/all_credentials.rs | 94 +++++++++++++++++++ .../credential/{queries.rs => queries/mod.rs} | 24 ++++- agent_holder/src/state.rs | 11 ++- agent_store/src/in_memory.rs | 11 ++- agent_store/src/postgres.rs | 12 ++- 9 files changed, 171 insertions(+), 10 deletions(-) create mode 100644 agent_api_rest/src/holder/holder/credentials/mod.rs create mode 100644 agent_holder/src/credential/queries/all_credentials.rs rename agent_holder/src/credential/{queries.rs => queries/mod.rs} (53%) diff --git a/agent_api_rest/src/holder/holder/credentials/mod.rs b/agent_api_rest/src/holder/holder/credentials/mod.rs new file mode 100644 index 00000000..806e96a1 --- /dev/null +++ b/agent_api_rest/src/holder/holder/credentials/mod.rs @@ -0,0 +1,18 @@ +use agent_holder::state::HolderState; +use agent_shared::handlers::query_handler; +use axum::{ + extract::State, + response::{IntoResponse, Response}, + Json, +}; +use hyper::StatusCode; + +#[axum_macros::debug_handler] +pub(crate) async fn credentials(State(state): State) -> Response { + // TODO: Add extension that allows for selecting all credentials. + match query_handler("all_credentials", &state.query.all_credentials).await { + Ok(Some(offer_view)) => (StatusCode::OK, Json(offer_view)).into_response(), + Ok(None) => StatusCode::INTERNAL_SERVER_ERROR.into_response(), + _ => StatusCode::INTERNAL_SERVER_ERROR.into_response(), + } +} diff --git a/agent_api_rest/src/holder/holder/mod.rs b/agent_api_rest/src/holder/holder/mod.rs index 4791300f..1a09baa0 100644 --- a/agent_api_rest/src/holder/holder/mod.rs +++ b/agent_api_rest/src/holder/holder/mod.rs @@ -1 +1,2 @@ +pub mod credentials; pub mod offers; diff --git a/agent_api_rest/src/lib.rs b/agent_api_rest/src/lib.rs index 953b1c39..ab3165b0 100644 --- a/agent_api_rest/src/lib.rs +++ b/agent_api_rest/src/lib.rs @@ -59,6 +59,7 @@ pub fn app(state: ApplicationState) -> Router { .route("/offers", post(offers)) .route("/offers/send", post(send)) // Agent Holder + .route("/holder/credentials", get(holder::holder::credentials::credentials)) .route("/holder/offers", get(holder::holder::offers::offers)) .route( "/holder/offers/:offer_id/accept", diff --git a/agent_application/docker/db/init.sql b/agent_application/docker/db/init.sql index d1ccbc36..f333905c 100644 --- a/agent_application/docker/db/init.sql +++ b/agent_application/docker/db/init.sql @@ -74,6 +74,15 @@ CREATE TABLE holder_credential PRIMARY KEY (view_id) ); + +CREATE TABLE all_credentials +( + view_id text NOT NULL, + version bigint CHECK (version >= 0) NOT NULL, + payload json NOT NULL, + PRIMARY KEY (view_id) +); + CREATE TABLE authorization_request ( view_id text NOT NULL, diff --git a/agent_holder/src/credential/queries/all_credentials.rs b/agent_holder/src/credential/queries/all_credentials.rs new file mode 100644 index 00000000..4f9b78fd --- /dev/null +++ b/agent_holder/src/credential/queries/all_credentials.rs @@ -0,0 +1,94 @@ +use crate::credential::queries::{Credential, CustomQuery, ViewRepository}; +use async_trait::async_trait; +use cqrs_es::{ + persist::{PersistenceError, ViewContext}, + EventEnvelope, Query, View, +}; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use std::{collections::HashMap, marker::PhantomData}; + +use super::CredentialView; + +const VIEW_ID: &str = "all_credentials"; + +/// A custom query trait for the Credential aggregate. This query is used to update the `AllCredentialsView`. +pub struct AllCredentialsQuery +where + R: ViewRepository, + V: View, +{ + view_repository: Arc, + _phantom: PhantomData, +} + +impl AllCredentialsQuery +where + R: ViewRepository, + V: View, +{ + pub fn new(view_repository: Arc) -> Self { + AllCredentialsQuery { + view_repository, + _phantom: PhantomData, + } + } +} + +#[async_trait] +impl Query for AllCredentialsQuery +where + R: ViewRepository, + V: View, +{ + // The `dispatch` method is called by the `CqrsFramework` when an event is published. By default `cqrs` will use the + // `aggregate_id` as the `view_id` when calling the `dispatch` method. We override this behavior by using the + // `VIEW_ID` constant as the `view_id`. + async fn dispatch(&self, _view_id: &str, events: &[EventEnvelope]) { + self.apply_events(VIEW_ID, events).await.ok(); + } +} + +#[async_trait] +impl CustomQuery for AllCredentialsQuery +where + R: ViewRepository, + V: View, +{ + async fn load_mut(&self, view_id: String) -> Result<(V, ViewContext), PersistenceError> { + match self.view_repository.load_with_context(&view_id).await? { + None => { + let view_context = ViewContext::new(view_id, 0); + Ok((Default::default(), view_context)) + } + Some((view, context)) => Ok((view, context)), + } + } + + async fn apply_events(&self, view_id: &str, events: &[EventEnvelope]) -> Result<(), PersistenceError> { + for event in events { + let (mut view, view_context) = self.load_mut(view_id.to_string()).await?; + + view.update(event); + self.view_repository.update_view(view, view_context).await?; + } + Ok(()) + } +} + +#[derive(Debug, Default, Serialize, Deserialize, Clone)] +pub struct AllCredentialsView { + pub credentials: HashMap, +} + +impl View for AllCredentialsView { + fn update(&mut self, event: &EventEnvelope) { + self.credentials + // Get the entry for the aggregate_id + .entry(event.aggregate_id.clone()) + // or insert a new one if it doesn't exist + .or_insert_with(Default::default) + // update the view with the event + .update(event); + } +} diff --git a/agent_holder/src/credential/queries.rs b/agent_holder/src/credential/queries/mod.rs similarity index 53% rename from agent_holder/src/credential/queries.rs rename to agent_holder/src/credential/queries/mod.rs index 164263a9..e60e497a 100644 --- a/agent_holder/src/credential/queries.rs +++ b/agent_holder/src/credential/queries/mod.rs @@ -1,9 +1,27 @@ -use super::{entity::Data, event::CredentialEvent}; +pub mod all_credentials; + +use super::event::CredentialEvent; use crate::credential::aggregate::Credential; -use cqrs_es::{EventEnvelope, View}; -use oid4vci::credential_issuer::credential_configurations_supported::CredentialConfigurationsSupportedObject; +use axum::async_trait; +use cqrs_es::{ + persist::{PersistenceError, ViewContext, ViewRepository}, + EventEnvelope, Query, View, +}; use serde::{Deserialize, Serialize}; +/// A custom query trait for the Credential aggregate. This trait is used to define custom queries for the Credential aggregate +/// that do not make use of `GenericQuery`. +#[async_trait] +pub trait CustomQuery: Query +where + R: ViewRepository, + V: View, +{ + async fn load_mut(&self, view_id: String) -> Result<(V, ViewContext), PersistenceError>; + + async fn apply_events(&self, view_id: &str, events: &[EventEnvelope]) -> Result<(), PersistenceError>; +} + #[derive(Debug, Default, Serialize, Deserialize, Clone)] pub struct CredentialView { pub credential_id: Option, diff --git a/agent_holder/src/state.rs b/agent_holder/src/state.rs index f879a5ed..f1bf37b6 100644 --- a/agent_holder/src/state.rs +++ b/agent_holder/src/state.rs @@ -3,6 +3,7 @@ use cqrs_es::persist::ViewRepository; use std::sync::Arc; use crate::credential::aggregate::Credential; +use crate::credential::queries::all_credentials::AllCredentialsView; use crate::credential::queries::CredentialView; use crate::offer::aggregate::Offer; use crate::offer::queries::all_offers::AllOffersView; @@ -33,17 +34,20 @@ pub struct CommandHandlers { /// `Aggregate` types must be the same. type Queries = ViewRepositories< dyn ViewRepository, + dyn ViewRepository, dyn ViewRepository, dyn ViewRepository, >; -pub struct ViewRepositories +pub struct ViewRepositories where - C: ViewRepository + ?Sized, + C1: ViewRepository + ?Sized, + C2: ViewRepository + ?Sized, O1: ViewRepository + ?Sized, O2: ViewRepository + ?Sized, { - pub credential: Arc, + pub credential: Arc, + pub all_credentials: Arc, pub offer: Arc, pub all_offers: Arc, } @@ -52,6 +56,7 @@ impl Clone for Queries { fn clone(&self) -> Self { ViewRepositories { credential: self.credential.clone(), + all_credentials: self.all_credentials.clone(), offer: self.offer.clone(), all_offers: self.all_offers.clone(), } diff --git a/agent_store/src/in_memory.rs b/agent_store/src/in_memory.rs index 316019e0..44db81e5 100644 --- a/agent_store/src/in_memory.rs +++ b/agent_store/src/in_memory.rs @@ -1,4 +1,7 @@ -use agent_holder::{offer::queries::all_offers::AllOffersQuery, services::HolderServices, state::HolderState}; +use agent_holder::{ + credential::queries::all_credentials::AllCredentialsQuery, offer::queries::all_offers::AllOffersQuery, + services::HolderServices, state::HolderState, +}; use agent_issuance::{ offer::{ aggregate::Offer, @@ -181,9 +184,11 @@ pub async fn holder_state( // Initialize the in-memory repositories. let credential = Arc::new(MemRepository::default()); let offer = Arc::new(MemRepository::default()); + let all_credentials = Arc::new(MemRepository::default()); let all_offers = Arc::new(MemRepository::default()); // Create custom-queries for the offer aggregate. + let all_credentials_query = AllCredentialsQuery::new(all_credentials.clone()); let all_offers_query = AllOffersQuery::new(all_offers.clone()); // Partition the event_publishers into the different aggregates. @@ -196,7 +201,8 @@ pub async fn holder_state( credential_event_publishers.into_iter().fold( AggregateHandler::new(holder_services.clone()) .append_query(SimpleLoggingQuery {}) - .append_query(generic_query(credential.clone())), + .append_query(generic_query(credential.clone())) + .append_query(all_credentials_query), |aggregate_handler, event_publisher| aggregate_handler.append_event_publisher(event_publisher), ), ), @@ -212,6 +218,7 @@ pub async fn holder_state( }, query: agent_holder::state::ViewRepositories { credential, + all_credentials, offer, all_offers, }, diff --git a/agent_store/src/postgres.rs b/agent_store/src/postgres.rs index ef2cc880..4c6adb97 100644 --- a/agent_store/src/postgres.rs +++ b/agent_store/src/postgres.rs @@ -1,4 +1,7 @@ -use agent_holder::{offer::queries::all_offers::AllOffersQuery, services::HolderServices, state::HolderState}; +use agent_holder::{ + credential::queries::all_credentials::AllCredentialsQuery, offer::queries::all_offers::AllOffersQuery, + services::HolderServices, state::HolderState, +}; use agent_issuance::{ offer::queries::{access_token::AccessTokenQuery, pre_authorized_code::PreAuthorizedCodeQuery}, services::IssuanceServices, @@ -139,10 +142,13 @@ pub async fn holder_state( // Initialize the postgres repositories. let credential: Arc> = Arc::new(PostgresViewRepository::new("holder_credential", pool.clone())); + let all_credentials: Arc> = + Arc::new(PostgresViewRepository::new("all_credentials", pool.clone())); let offer = Arc::new(PostgresViewRepository::new("received_offer", pool.clone())); let all_offers = Arc::new(PostgresViewRepository::new("all_offers", pool.clone())); // Create custom-queries for the offer aggregate. + let all_credentials_query = AllCredentialsQuery::new(all_credentials.clone()); let all_offers_query = AllOffersQuery::new(all_offers.clone()); // Partition the event_publishers into the different aggregates. @@ -155,7 +161,8 @@ pub async fn holder_state( credential_event_publishers.into_iter().fold( AggregateHandler::new(pool.clone(), holder_services.clone()) .append_query(SimpleLoggingQuery {}) - .append_query(generic_query(credential.clone())), + .append_query(generic_query(credential.clone())) + .append_query(all_credentials_query), |aggregate_handler, event_publisher| aggregate_handler.append_event_publisher(event_publisher), ), ), @@ -171,6 +178,7 @@ pub async fn holder_state( }, query: agent_holder::state::ViewRepositories { credential, + all_credentials, offer, all_offers, },