Skip to content

Commit

Permalink
feat: add credentials endpoint for Holder
Browse files Browse the repository at this point in the history
  • Loading branch information
nanderstabel committed Aug 27, 2024
1 parent 1babdd4 commit 7bcc730
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 10 deletions.
18 changes: 18 additions & 0 deletions agent_api_rest/src/holder/holder/credentials/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use agent_holder::state::HolderState;
use agent_shared::handlers::query_handler;
use axum::{
extract::State,
response::{IntoResponse, Response},
Json,
};
use hyper::StatusCode;

#[axum_macros::debug_handler]
pub(crate) async fn credentials(State(state): State<HolderState>) -> Response {
// TODO: Add extension that allows for selecting all credentials.
match query_handler("all_credentials", &state.query.all_credentials).await {
Ok(Some(offer_view)) => (StatusCode::OK, Json(offer_view)).into_response(),
Ok(None) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
_ => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
}
}
1 change: 1 addition & 0 deletions agent_api_rest/src/holder/holder/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod credentials;
pub mod offers;
1 change: 1 addition & 0 deletions agent_api_rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub fn app(state: ApplicationState) -> Router {
.route("/offers", post(offers))
.route("/offers/send", post(send))
// Agent Holder
.route("/holder/credentials", get(holder::holder::credentials::credentials))
.route("/holder/offers", get(holder::holder::offers::offers))
.route(
"/holder/offers/:offer_id/accept",
Expand Down
9 changes: 9 additions & 0 deletions agent_application/docker/db/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ CREATE TABLE holder_credential
PRIMARY KEY (view_id)
);


CREATE TABLE all_credentials
(
view_id text NOT NULL,
version bigint CHECK (version >= 0) NOT NULL,
payload json NOT NULL,
PRIMARY KEY (view_id)
);

CREATE TABLE authorization_request
(
view_id text NOT NULL,
Expand Down
94 changes: 94 additions & 0 deletions agent_holder/src/credential/queries/all_credentials.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use crate::credential::queries::{Credential, CustomQuery, ViewRepository};
use async_trait::async_trait;
use cqrs_es::{
persist::{PersistenceError, ViewContext},
EventEnvelope, Query, View,
};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::{collections::HashMap, marker::PhantomData};

use super::CredentialView;

const VIEW_ID: &str = "all_credentials";

/// A custom query trait for the Credential aggregate. This query is used to update the `AllCredentialsView`.
pub struct AllCredentialsQuery<R, V>
where
R: ViewRepository<V, Credential>,
V: View<Credential>,
{
view_repository: Arc<R>,
_phantom: PhantomData<V>,
}

impl<R, V> AllCredentialsQuery<R, V>
where
R: ViewRepository<V, Credential>,
V: View<Credential>,
{
pub fn new(view_repository: Arc<R>) -> Self {
AllCredentialsQuery {
view_repository,
_phantom: PhantomData,
}
}
}

#[async_trait]
impl<R, V> Query<Credential> for AllCredentialsQuery<R, V>
where
R: ViewRepository<V, Credential>,
V: View<Credential>,
{
// The `dispatch` method is called by the `CqrsFramework` when an event is published. By default `cqrs` will use the
// `aggregate_id` as the `view_id` when calling the `dispatch` method. We override this behavior by using the
// `VIEW_ID` constant as the `view_id`.
async fn dispatch(&self, _view_id: &str, events: &[EventEnvelope<Credential>]) {
self.apply_events(VIEW_ID, events).await.ok();
}
}

#[async_trait]
impl<R, V> CustomQuery<R, V> for AllCredentialsQuery<R, V>
where
R: ViewRepository<V, Credential>,
V: View<Credential>,
{
async fn load_mut(&self, view_id: String) -> Result<(V, ViewContext), PersistenceError> {
match self.view_repository.load_with_context(&view_id).await? {
None => {
let view_context = ViewContext::new(view_id, 0);
Ok((Default::default(), view_context))
}
Some((view, context)) => Ok((view, context)),
}
}

async fn apply_events(&self, view_id: &str, events: &[EventEnvelope<Credential>]) -> Result<(), PersistenceError> {
for event in events {
let (mut view, view_context) = self.load_mut(view_id.to_string()).await?;

view.update(event);
self.view_repository.update_view(view, view_context).await?;
}
Ok(())
}
}

#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct AllCredentialsView {
pub credentials: HashMap<String, CredentialView>,
}

impl View<Credential> for AllCredentialsView {
fn update(&mut self, event: &EventEnvelope<Credential>) {
self.credentials
// Get the entry for the aggregate_id
.entry(event.aggregate_id.clone())
// or insert a new one if it doesn't exist
.or_insert_with(Default::default)
// update the view with the event
.update(event);
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,27 @@
use super::{entity::Data, event::CredentialEvent};
pub mod all_credentials;

use super::event::CredentialEvent;
use crate::credential::aggregate::Credential;
use cqrs_es::{EventEnvelope, View};
use oid4vci::credential_issuer::credential_configurations_supported::CredentialConfigurationsSupportedObject;
use axum::async_trait;
use cqrs_es::{
persist::{PersistenceError, ViewContext, ViewRepository},
EventEnvelope, Query, View,
};
use serde::{Deserialize, Serialize};

/// A custom query trait for the Credential aggregate. This trait is used to define custom queries for the Credential aggregate
/// that do not make use of `GenericQuery`.
#[async_trait]
pub trait CustomQuery<R, V>: Query<Credential>
where
R: ViewRepository<V, Credential>,
V: View<Credential>,
{
async fn load_mut(&self, view_id: String) -> Result<(V, ViewContext), PersistenceError>;

async fn apply_events(&self, view_id: &str, events: &[EventEnvelope<Credential>]) -> Result<(), PersistenceError>;
}

#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct CredentialView {
pub credential_id: Option<String>,
Expand Down
11 changes: 8 additions & 3 deletions agent_holder/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use cqrs_es::persist::ViewRepository;
use std::sync::Arc;

use crate::credential::aggregate::Credential;
use crate::credential::queries::all_credentials::AllCredentialsView;
use crate::credential::queries::CredentialView;
use crate::offer::aggregate::Offer;
use crate::offer::queries::all_offers::AllOffersView;
Expand Down Expand Up @@ -33,17 +34,20 @@ pub struct CommandHandlers {
/// `Aggregate` types must be the same.
type Queries = ViewRepositories<
dyn ViewRepository<CredentialView, Credential>,
dyn ViewRepository<AllCredentialsView, Credential>,
dyn ViewRepository<OfferView, Offer>,
dyn ViewRepository<AllOffersView, Offer>,
>;

pub struct ViewRepositories<C, O1, O2>
pub struct ViewRepositories<C1, C2, O1, O2>
where
C: ViewRepository<CredentialView, Credential> + ?Sized,
C1: ViewRepository<CredentialView, Credential> + ?Sized,
C2: ViewRepository<AllCredentialsView, Credential> + ?Sized,
O1: ViewRepository<OfferView, Offer> + ?Sized,
O2: ViewRepository<AllOffersView, Offer> + ?Sized,
{
pub credential: Arc<C>,
pub credential: Arc<C1>,
pub all_credentials: Arc<C2>,
pub offer: Arc<O1>,
pub all_offers: Arc<O2>,
}
Expand All @@ -52,6 +56,7 @@ impl Clone for Queries {
fn clone(&self) -> Self {
ViewRepositories {
credential: self.credential.clone(),
all_credentials: self.all_credentials.clone(),
offer: self.offer.clone(),
all_offers: self.all_offers.clone(),
}
Expand Down
11 changes: 9 additions & 2 deletions agent_store/src/in_memory.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use agent_holder::{offer::queries::all_offers::AllOffersQuery, services::HolderServices, state::HolderState};
use agent_holder::{
credential::queries::all_credentials::AllCredentialsQuery, offer::queries::all_offers::AllOffersQuery,
services::HolderServices, state::HolderState,
};
use agent_issuance::{
offer::{
aggregate::Offer,
Expand Down Expand Up @@ -181,9 +184,11 @@ pub async fn holder_state(
// Initialize the in-memory repositories.
let credential = Arc::new(MemRepository::default());
let offer = Arc::new(MemRepository::default());
let all_credentials = Arc::new(MemRepository::default());
let all_offers = Arc::new(MemRepository::default());

// Create custom-queries for the offer aggregate.
let all_credentials_query = AllCredentialsQuery::new(all_credentials.clone());
let all_offers_query = AllOffersQuery::new(all_offers.clone());

// Partition the event_publishers into the different aggregates.
Expand All @@ -196,7 +201,8 @@ pub async fn holder_state(
credential_event_publishers.into_iter().fold(
AggregateHandler::new(holder_services.clone())
.append_query(SimpleLoggingQuery {})
.append_query(generic_query(credential.clone())),
.append_query(generic_query(credential.clone()))
.append_query(all_credentials_query),
|aggregate_handler, event_publisher| aggregate_handler.append_event_publisher(event_publisher),
),
),
Expand All @@ -212,6 +218,7 @@ pub async fn holder_state(
},
query: agent_holder::state::ViewRepositories {
credential,
all_credentials,
offer,
all_offers,
},
Expand Down
12 changes: 10 additions & 2 deletions agent_store/src/postgres.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use agent_holder::{offer::queries::all_offers::AllOffersQuery, services::HolderServices, state::HolderState};
use agent_holder::{
credential::queries::all_credentials::AllCredentialsQuery, offer::queries::all_offers::AllOffersQuery,
services::HolderServices, state::HolderState,
};
use agent_issuance::{
offer::queries::{access_token::AccessTokenQuery, pre_authorized_code::PreAuthorizedCodeQuery},
services::IssuanceServices,
Expand Down Expand Up @@ -139,10 +142,13 @@ pub async fn holder_state(
// Initialize the postgres repositories.
let credential: Arc<PostgresViewRepository<_, _>> =
Arc::new(PostgresViewRepository::new("holder_credential", pool.clone()));
let all_credentials: Arc<PostgresViewRepository<_, _>> =
Arc::new(PostgresViewRepository::new("all_credentials", pool.clone()));
let offer = Arc::new(PostgresViewRepository::new("received_offer", pool.clone()));
let all_offers = Arc::new(PostgresViewRepository::new("all_offers", pool.clone()));

// Create custom-queries for the offer aggregate.
let all_credentials_query = AllCredentialsQuery::new(all_credentials.clone());
let all_offers_query = AllOffersQuery::new(all_offers.clone());

// Partition the event_publishers into the different aggregates.
Expand All @@ -155,7 +161,8 @@ pub async fn holder_state(
credential_event_publishers.into_iter().fold(
AggregateHandler::new(pool.clone(), holder_services.clone())
.append_query(SimpleLoggingQuery {})
.append_query(generic_query(credential.clone())),
.append_query(generic_query(credential.clone()))
.append_query(all_credentials_query),
|aggregate_handler, event_publisher| aggregate_handler.append_event_publisher(event_publisher),
),
),
Expand All @@ -171,6 +178,7 @@ pub async fn holder_state(
},
query: agent_holder::state::ViewRepositories {
credential,
all_credentials,
offer,
all_offers,
},
Expand Down

0 comments on commit 7bcc730

Please sign in to comment.