Skip to content

Commit

Permalink
feat: ATL-6983 ZIO Stream Kafka PoC in background jobs (#1339)
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Voiturier <benjamin.voiturier@iohk.io>
Signed-off-by: mineme0110 <shailesh.patil@iohk.io>
Signed-off-by: Hyperledger Bot <hyperledger-bot@hyperledger.org>
Co-authored-by: mineme0110 <shailesh.patil@iohk.io>
Co-authored-by: Hyperledger Bot <hyperledger-bot@hyperledger.org>
  • Loading branch information
3 people authored Oct 9, 2024
1 parent c2da492 commit 19ab426
Show file tree
Hide file tree
Showing 55 changed files with 2,031 additions and 714 deletions.
13 changes: 10 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ inThisBuild(
// scalacOptions += "-Yexplicit-nulls",
// scalacOptions += "-Ysafe-init",
// scalacOptions += "-Werror", // <=> "-Xfatal-warnings"
scalacOptions += "-Dquill.macro.log=false", // disable quill macro logs // TODO https://github.com/zio/zio-protoquill/issues/470
scalacOptions += "-Dquill.macro.log=false", // disable quill macro logs // TODO https://github.com/zio/zio-protoquill/issues/470,
scalacOptions ++= Seq("-Xmax-inlines", "50") // manually increase max-inlines above 32 (https://github.com/circe/circe/issues/2162)
)
)

Expand All @@ -53,6 +54,7 @@ lazy val V = new {
val zioCatsInterop = "3.3.0" // TODO "23.1.0.2" // https://mvnrepository.com/artifact/dev.zio/zio-interop-cats
val zioMetricsConnector = "2.3.1"
val zioMock = "1.0.0-RC12"
val zioKafka = "2.7.5"
val mockito = "3.2.18.0"
val monocle = "3.2.0"

Expand Down Expand Up @@ -102,7 +104,11 @@ lazy val D = new {
val zioLog: ModuleID = "dev.zio" %% "zio-logging" % V.zioLogging
val zioSLF4J: ModuleID = "dev.zio" %% "zio-logging-slf4j" % V.zioLogging
val zioJson: ModuleID = "dev.zio" %% "zio-json" % V.zioJson
val zioConcurrent: ModuleID = "dev.zio" %% "zio-concurrent" % V.zio
val zioHttp: ModuleID = "dev.zio" %% "zio-http" % V.zioHttp
val zioKafka: ModuleID = "dev.zio" %% "zio-kafka" % V.zioKafka excludeAll (
ExclusionRule("dev.zio", "zio_3"), ExclusionRule("dev.zio", "zio-streams_3")
)
val zioCatsInterop: ModuleID = "dev.zio" %% "zio-interop-cats" % V.zioCatsInterop
val zioMetricsConnectorMicrometer: ModuleID = "dev.zio" %% "zio-metrics-connectors-micrometer" % V.zioMetricsConnector
val tapirPrometheusMetrics: ModuleID = "com.softwaremill.sttp.tapir" %% "tapir-prometheus-metrics" % V.tapir
Expand Down Expand Up @@ -185,7 +191,9 @@ lazy val D_Shared = new {
D.typesafeConfig,
D.scalaPbGrpc,
D.zio,
D.zioConcurrent,
D.zioHttp,
D.zioKafka,
D.scalaUri,
D.zioPrelude,
// FIXME: split shared DB stuff as subproject?
Expand Down Expand Up @@ -341,12 +349,11 @@ lazy val D_Pollux_VC_JWT = new {

lazy val D_EventNotification = new {
val zio = "dev.zio" %% "zio" % V.zio
val zioConcurrent = "dev.zio" %% "zio-concurrent" % V.zio
val zioTest = "dev.zio" %% "zio-test" % V.zio % Test
val zioTestSbt = "dev.zio" %% "zio-test-sbt" % V.zio % Test
val zioTestMagnolia = "dev.zio" %% "zio-test-magnolia" % V.zio % Test

val zioDependencies: Seq[ModuleID] = Seq(zio, zioConcurrent, zioTest, zioTestSbt, zioTestMagnolia)
val zioDependencies: Seq[ModuleID] = Seq(zio, zioTest, zioTestSbt, zioTestMagnolia)
val baseDependencies: Seq[ModuleID] = zioDependencies
}

Expand Down
67 changes: 49 additions & 18 deletions cloud-agent/service/server/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,10 @@ pollux {
publicEndpointUrl = "http://localhost:"${agent.httpEndpoint.http.port}
publicEndpointUrl = ${?POLLUX_STATUS_LIST_REGISTRY_PUBLIC_URL}
}
issueBgJobRecordsLimit = 25
issueBgJobRecordsLimit = ${?ISSUE_BG_JOB_RECORDS_LIMIT}
issueBgJobRecurrenceDelay = 2 seconds
issueBgJobRecurrenceDelay = ${?ISSUE_BG_JOB_RECURRENCE_DELAY}
issueBgJobProcessingParallelism = 5
issueBgJobProcessingParallelism = ${?ISSUE_BG_JOB_PROCESSING_PARALLELISM}
presentationBgJobRecordsLimit = 25
presentationBgJobRecordsLimit = ${?PRESENTATION_BG_JOB_RECORDS_LIMIT}
presentationBgJobRecurrenceDelay = 2 seconds
presentationBgJobRecurrenceDelay = ${?PRESENTATION_BG_JOB_RECURRENCE_DELAY}
presentationBgJobProcessingParallelism = 5
presentationBgJobProcessingParallelism = ${?PRESENTATION_BG_JOB_PROCESSING_PARALLELISM}
syncRevocationStatusesBgJobRecurrenceDelay = 2 seconds
syncRevocationStatusesBgJobRecurrenceDelay = ${?SYNC_REVOCATION_STATUSES_BG_JOB_RECURRENCE_DELAY}
syncRevocationStatusesBgJobProcessingParallelism = 5
syncRevocationStatusesBgJobProcessingParallelism = ${?SYNC_REVOCATION_STATUSES_BG_JOB_PROCESSING_PARALLELISM}
statusListSyncTriggerRecurrenceDelay = 30 seconds
statusListSyncTriggerRecurrenceDelay = ${?STATUS_LIST_SYNC_TRIGGER_RECURRENCE_DELAY}
didStateSyncTriggerRecurrenceDelay = 30 seconds
didStateSyncTriggerRecurrenceDelay = ${?DID_STATE_SYNC_TRIGGER_RECURRENCE_DELAY}
credential.sdJwt.expiry = 30 days
credential.sdJwt.expiry = ${?CREDENTIAL_SD_JWT_EXPIRY}
presentationInvitationExpiry = 300 seconds
Expand Down Expand Up @@ -81,8 +69,6 @@ connect {
connectBgJobRecordsLimit = ${?CONNECT_BG_JOB_RECORDS_LIMIT}
connectBgJobRecurrenceDelay = 2 seconds
connectBgJobRecurrenceDelay = ${?CONNECT_BG_JOB_RECURRENCE_DELAY}
connectBgJobProcessingParallelism = 5
connectBgJobProcessingParallelism = ${?CONNECT_BG_JOB_PROCESSING_PARALLELISM}
connectInvitationExpiry = 300 seconds
connectInvitationExpiry = ${?CONNECT_INVITATION_EXPIRY}
}
Expand Down Expand Up @@ -262,4 +248,49 @@ agent {
authApiKey = "default"
authApiKey = ${?DEFAULT_WALLET_AUTH_API_KEY}
}
messagingService {
connectFlow {
consumerCount = 5
retryStrategy {
maxRetries = 4
initialDelay = 5.seconds
maxDelay = 40.seconds
}
}
issueFlow {
consumerCount = 5
retryStrategy {
maxRetries = 4
initialDelay = 5.seconds
maxDelay = 40.seconds
}
}
presentFlow {
consumerCount = 5
retryStrategy {
maxRetries = 4
initialDelay = 5.seconds
maxDelay = 40.seconds
}
}
didStateSync {
consumerCount = 5
}
statusListSync {
consumerCount = 5
}
inMemoryQueueCapacity = 1000
kafkaEnabled = false
kafkaEnabled = ${?DEFAULT_KAFKA_ENABLED}
kafka {
bootstrapServers = "kafka:9092"
consumers {
autoCreateTopics = false,
maxPollRecords = 500
maxPollInterval = 5.minutes
pollTimeout = 50.millis
rebalanceSafeCommits = true
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,121 +5,45 @@ import org.hyperledger.identus.agent.server.config.AppConfig
import org.hyperledger.identus.agent.server.http.{ZHttp4sBlazeServer, ZHttpEndpoints}
import org.hyperledger.identus.agent.server.jobs.*
import org.hyperledger.identus.agent.walletapi.model.{Entity, Wallet, WalletSeed}
import org.hyperledger.identus.agent.walletapi.service.{EntityService, ManagedDIDService, WalletManagementService}
import org.hyperledger.identus.agent.walletapi.storage.DIDNonSecretStorage
import org.hyperledger.identus.agent.walletapi.service.{EntityService, WalletManagementService}
import org.hyperledger.identus.castor.controller.{DIDRegistrarServerEndpoints, DIDServerEndpoints}
import org.hyperledger.identus.castor.core.service.DIDService
import org.hyperledger.identus.connect.controller.ConnectionServerEndpoints
import org.hyperledger.identus.connect.core.service.ConnectionService
import org.hyperledger.identus.credentialstatus.controller.CredentialStatusServiceEndpoints
import org.hyperledger.identus.event.controller.EventServerEndpoints
import org.hyperledger.identus.event.notification.EventNotificationConfig
import org.hyperledger.identus.iam.authentication.apikey.ApiKeyAuthenticator
import org.hyperledger.identus.iam.entity.http.EntityServerEndpoints
import org.hyperledger.identus.iam.wallet.http.WalletManagementServerEndpoints
import org.hyperledger.identus.issue.controller.IssueServerEndpoints
import org.hyperledger.identus.mercury.{DidOps, HttpClient}
import org.hyperledger.identus.oid4vci.CredentialIssuerServerEndpoints
import org.hyperledger.identus.pollux.core.service.{CredentialService, PresentationService}
import org.hyperledger.identus.pollux.credentialdefinition.CredentialDefinitionRegistryServerEndpoints
import org.hyperledger.identus.pollux.credentialschema.{
SchemaRegistryServerEndpoints,
VerificationPolicyServerEndpoints
}
import org.hyperledger.identus.pollux.prex.PresentationExchangeServerEndpoints
import org.hyperledger.identus.pollux.vc.jwt.DidResolver as JwtDidResolver
import org.hyperledger.identus.presentproof.controller.PresentProofServerEndpoints
import org.hyperledger.identus.resolvers.DIDResolver
import org.hyperledger.identus.shared.http.UriResolver
import org.hyperledger.identus.shared.models.{HexString, WalletAccessContext, WalletAdministrationContext, WalletId}
import org.hyperledger.identus.shared.utils.DurationOps.toMetricsSeconds
import org.hyperledger.identus.shared.models.*
import org.hyperledger.identus.system.controller.SystemServerEndpoints
import org.hyperledger.identus.verification.controller.VcVerificationServerEndpoints
import zio.*
import zio.metrics.*

object CloudAgentApp {

def run = for {
_ <- AgentInitialization.run
_ <- issueCredentialDidCommExchangesJob.debug.fork
_ <- presentProofExchangeJob.debug.fork
_ <- connectDidCommExchangesJob.debug.fork
_ <- syncDIDPublicationStateFromDltJob.debug.fork
_ <- syncRevocationStatusListsJob.debug.fork
_ <- ConnectBackgroundJobs.connectFlowsHandler
_ <- IssueBackgroundJobs.issueFlowsHandler
_ <- PresentBackgroundJobs.presentFlowsHandler
_ <- DIDStateSyncBackgroundJobs.didStateSyncTrigger
_ <- DIDStateSyncBackgroundJobs.didStateSyncHandler
_ <- StatusListJobs.statusListsSyncTrigger
_ <- StatusListJobs.statusListSyncHandler
_ <- AgentHttpServer.run.tapDefect(e => ZIO.logErrorCause("Agent HTTP Server failure", e)).fork
fiber <- DidCommHttpServer.run.tapDefect(e => ZIO.logErrorCause("DIDComm HTTP Server failure", e)).fork
_ <- WebhookPublisher.layer.build.map(_.get[WebhookPublisher]).flatMap(_.run.fork)
_ <- fiber.join *> ZIO.log(s"Server End")
_ <- ZIO.never
} yield ()

private val issueCredentialDidCommExchangesJob: RIO[
AppConfig & DidOps & DIDResolver & JwtDidResolver & HttpClient & CredentialService & DIDNonSecretStorage &
DIDService & ManagedDIDService & PresentationService & WalletManagementService,
Unit
] =
for {
config <- ZIO.service[AppConfig]
_ <- (IssueBackgroundJobs.issueCredentialDidCommExchanges @@ Metric
.gauge("issuance_flow_did_com_exchange_job_ms_gauge")
.trackDurationWith(_.toMetricsSeconds))
.repeat(Schedule.spaced(config.pollux.issueBgJobRecurrenceDelay))
.unit
} yield ()

private val presentProofExchangeJob: RIO[
AppConfig & DidOps & UriResolver & DIDResolver & JwtDidResolver & HttpClient & PresentationService &
CredentialService & DIDNonSecretStorage & DIDService & ManagedDIDService,
Unit
] =
for {
config <- ZIO.service[AppConfig]
_ <- (PresentBackgroundJobs.presentProofExchanges @@ Metric
.gauge("present_proof_flow_did_com_exchange_job_ms_gauge")
.trackDurationWith(_.toMetricsSeconds))
.repeat(Schedule.spaced(config.pollux.presentationBgJobRecurrenceDelay))
.unit
} yield ()

private val connectDidCommExchangesJob: RIO[
AppConfig & DidOps & DIDResolver & HttpClient & ConnectionService & ManagedDIDService & DIDNonSecretStorage &
WalletManagementService,
Unit
] =
for {
config <- ZIO.service[AppConfig]
_ <- (ConnectBackgroundJobs.didCommExchanges @@ Metric
.gauge("connection_flow_did_com_exchange_job_ms_gauge")
.trackDurationWith(_.toMetricsSeconds))
.repeat(Schedule.spaced(config.connect.connectBgJobRecurrenceDelay))
.unit
} yield ()

private val syncRevocationStatusListsJob = {
for {
config <- ZIO.service[AppConfig]
_ <- (StatusListJobs.syncRevocationStatuses @@ Metric
.gauge("revocation_status_list_sync_job_ms_gauge")
.trackDurationWith(_.toMetricsSeconds))
.repeat(Schedule.spaced(config.pollux.syncRevocationStatusesBgJobRecurrenceDelay))
} yield ()
}

private val syncDIDPublicationStateFromDltJob: URIO[ManagedDIDService & WalletManagementService, Unit] =
ZIO
.serviceWithZIO[WalletManagementService](_.listWallets().map(_._1))
.flatMap { wallets =>
ZIO.foreach(wallets) { wallet =>
DIDStateSyncBackgroundJobs.syncDIDPublicationStateFromDlt
.provideSomeLayer(ZLayer.succeed(WalletAccessContext(wallet.id)))
}
}
.catchAll(e => ZIO.logError(s"error while syncing DID publication state: $e"))
.repeat(Schedule.spaced(10.seconds))
.unit
.provideSomeLayer(ZLayer.succeed(WalletAdministrationContext.Admin()))

}

object AgentHttpServer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import org.hyperledger.identus.agent.server.http.ZioHttpClient
import org.hyperledger.identus.agent.server.sql.Migrations as AgentMigrations
import org.hyperledger.identus.agent.walletapi.service.{
EntityServiceImpl,
ManagedDIDService,
ManagedDIDServiceWithEventNotificationImpl,
WalletManagementServiceImpl
}
Expand All @@ -16,7 +15,6 @@ import org.hyperledger.identus.agent.walletapi.sql.{
JdbcEntityRepository,
JdbcWalletNonSecretStorage
}
import org.hyperledger.identus.agent.walletapi.storage.GenericSecretStorage
import org.hyperledger.identus.castor.controller.{DIDControllerImpl, DIDRegistrarControllerImpl}
import org.hyperledger.identus.castor.core.model.did.{
Service as DidDocumentService,
Expand All @@ -36,7 +34,7 @@ import org.hyperledger.identus.iam.authentication.{DefaultAuthenticator, Oid4vci
import org.hyperledger.identus.iam.authentication.apikey.JdbcAuthenticationRepository
import org.hyperledger.identus.iam.authorization.core.EntityPermissionManagementService
import org.hyperledger.identus.iam.authorization.DefaultPermissionManagementService
import org.hyperledger.identus.iam.entity.http.controller.{EntityController, EntityControllerImpl}
import org.hyperledger.identus.iam.entity.http.controller.EntityControllerImpl
import org.hyperledger.identus.iam.wallet.http.controller.WalletManagementControllerImpl
import org.hyperledger.identus.issue.controller.IssueControllerImpl
import org.hyperledger.identus.mercury.*
Expand All @@ -47,7 +45,6 @@ import org.hyperledger.identus.pollux.core.service.*
import org.hyperledger.identus.pollux.core.service.verification.VcVerificationServiceImpl
import org.hyperledger.identus.pollux.credentialdefinition.controller.CredentialDefinitionControllerImpl
import org.hyperledger.identus.pollux.credentialschema.controller.{
CredentialSchemaController,
CredentialSchemaControllerImpl,
VerificationPolicyControllerImpl
}
Expand All @@ -66,6 +63,9 @@ import org.hyperledger.identus.pollux.sql.repository.{
}
import org.hyperledger.identus.presentproof.controller.PresentProofControllerImpl
import org.hyperledger.identus.resolvers.DIDResolver
import org.hyperledger.identus.shared.messaging
import org.hyperledger.identus.shared.messaging.WalletIdAndRecordId
import org.hyperledger.identus.shared.models.WalletId
import org.hyperledger.identus.system.controller.SystemControllerImpl
import org.hyperledger.identus.verification.controller.VcVerificationControllerImpl
import zio.*
Expand All @@ -77,6 +77,7 @@ import zio.metrics.connectors.micrometer.MicrometerConfig
import zio.metrics.jvm.DefaultJvmMetrics

import java.security.Security
import java.util.UUID

object MainApp extends ZIOAppDefault {

Expand Down Expand Up @@ -167,7 +168,6 @@ object MainApp extends ZIOAppDefault {
)
_ <- preMigrations
_ <- migrations

app <- CloudAgentApp.run
.provide(
DidCommX.liveLayer,
Expand Down Expand Up @@ -252,6 +252,11 @@ object MainApp extends ZIOAppDefault {
// HTTP client
SystemModule.zioHttpClientLayer,
Scope.default,
// Messaging Service
ZLayer.fromZIO(ZIO.service[AppConfig].map(_.agent.messagingService)),
messaging.MessagingService.serviceLayer,
messaging.MessagingService.producerLayer[UUID, WalletIdAndRecordId],
messaging.MessagingService.producerLayer[WalletId, WalletId]
)
} yield app

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import org.hyperledger.identus.castor.core.model.did.VerificationRelationship
import org.hyperledger.identus.iam.authentication.AuthenticationConfig
import org.hyperledger.identus.pollux.vc.jwt.*
import org.hyperledger.identus.shared.db.DbConfig
import zio.config.*
import org.hyperledger.identus.shared.messaging.MessagingServiceConfig
import zio.config.magnolia.*
import zio.Config

Expand Down Expand Up @@ -70,22 +70,13 @@ final case class PolluxConfig(
database: DatabaseConfig,
credentialSdJwtExpirationTime: Duration,
statusListRegistry: StatusListRegistryConfig,
issueBgJobRecordsLimit: Int,
issueBgJobRecurrenceDelay: Duration,
issueBgJobProcessingParallelism: Int,
presentationBgJobRecordsLimit: Int,
presentationBgJobRecurrenceDelay: Duration,
presentationBgJobProcessingParallelism: Int,
syncRevocationStatusesBgJobRecurrenceDelay: Duration,
syncRevocationStatusesBgJobProcessingParallelism: Int,
statusListSyncTriggerRecurrenceDelay: Duration,
didStateSyncTriggerRecurrenceDelay: Duration,
presentationInvitationExpiry: Duration,
issuanceInvitationExpiry: Duration,
)
final case class ConnectConfig(
database: DatabaseConfig,
connectBgJobRecordsLimit: Int,
connectBgJobRecurrenceDelay: Duration,
connectBgJobProcessingParallelism: Int,
connectInvitationExpiry: Duration,
)

Expand Down Expand Up @@ -173,7 +164,8 @@ final case class AgentConfig(
verification: VerificationConfig,
secretStorage: SecretStorageConfig,
webhookPublisher: WebhookPublisherConfig,
defaultWallet: DefaultWalletConfig
defaultWallet: DefaultWalletConfig,
messagingService: MessagingServiceConfig
) {
def validate: Either[String, Unit] =
for {
Expand Down
Loading

0 comments on commit 19ab426

Please sign in to comment.