Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ATL-6983 ZIO Stream Kafka PoC in background jobs #1339

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
89 commits
Select commit Hold shift + click to select a range
47488f4
feat: introduce draft MessagingService interface and Kafka impl
bvoiturier Jul 2, 2024
6f85bbb
feat: add zio-kafka implementation
bvoiturier Jul 2, 2024
2838aa2
feat: allow passing Kafka bootstrapServers and groupId as parameters
bvoiturier Jul 3, 2024
f3fb835
chore: exclude ZIO 2.1.2 transitive dependency from zio-kafka as it m…
bvoiturier Jul 3, 2024
0c03775
feat: allow passing an 'R' to the consumer handler method
bvoiturier Jul 3, 2024
55cde88
feat: add kafka and zookeeper service to shared docker compose config
bvoiturier Jul 4, 2024
f1bf42b
feat: early draft (but functional) implementation of messaging servic…
bvoiturier Jul 4, 2024
e8d7146
feat: manually create topics with custom partitions number
bvoiturier Jul 8, 2024
3939e3c
feat: add layer for Kafka consumer and producer
bvoiturier Jul 8, 2024
730f185
feat: rather use producer singleton in connection service
bvoiturier Jul 8, 2024
cad7d89
feat: fork multiple consumers as replacement for connect bg job polli…
bvoiturier Jul 8, 2024
46b3ef3
chore: comment out obsolete code and log when invalid records are rec…
bvoiturier Jul 10, 2024
a80dd8e
chore: explicitly set important consumer settings parameters
bvoiturier Jul 10, 2024
ba858ef
feat: create a 'connect-retry' Kafka topic
bvoiturier Jul 11, 2024
8965c84
feat: introduce ByteArrayWrapper to eliminate dotty compilation error…
bvoiturier Jul 11, 2024
da681fc
feat: implement retry mechanism using dedicated connect-retry topic
bvoiturier Jul 12, 2024
be10565
chore: use const
bvoiturier Jul 12, 2024
c2c5507
feat: implement configurable consumer retry strategy based on topic s…
bvoiturier Jul 12, 2024
374477e
WIP Imemorykafka
mineme0110 Jul 15, 2024
47c3a6b
In memory kafka messaging service
mineme0110 Jul 16, 2024
9d9fbce
created a seperate docker-compose for kafka
mineme0110 Jul 16, 2024
0def74a
created a seperate docker-compose for kafka
mineme0110 Jul 16, 2024
074363b
Add config for kafka inmemrory enable
mineme0110 Jul 17, 2024
e6a0014
Issue background jobs to use messaging service
mineme0110 Jul 20, 2024
313bae6
Issue background jobs to use messaging service
mineme0110 Jul 20, 2024
b9ff594
Moved present background jobs to use the kafka inmemory
mineme0110 Jul 23, 2024
d08d8d7
chore(cloud-agent): minor fixes and clean-up
bvoiturier Jul 31, 2024
4a815e1
Moved sync background jobs to use the kafka inmemory
mineme0110 Aug 5, 2024
93bfc42
Moved sync background jobs to use the kafka inmemory
mineme0110 Aug 5, 2024
9037bd0
chore(cloud-agent): restructure status list backgound job code
bvoiturier Aug 6, 2024
68f9f0d
feat(pollux): refactor status list repo methods to support concurrent…
bvoiturier Aug 7, 2024
fcf9abd
feat(pollux): expose SL repo methods in service
bvoiturier Aug 7, 2024
b06f391
feat(cloud-agent): introduce consumer 'handleMessage' method for stat…
bvoiturier Aug 7, 2024
e788290
feat(cloud-agent): implement status list sync message consumer/proces…
bvoiturier Aug 7, 2024
d7f35a2
chore(cloud-agent): rename forked consumer processes for clarity
bvoiturier Aug 7, 2024
87800e2
chore(cloud-agent): move new code parts to the right place
bvoiturier Aug 7, 2024
7940167
chore: remove log instruction
bvoiturier Aug 7, 2024
7ba0493
chore(cloud-agent): get rid of obsolete pure Kafka-based messaging se…
bvoiturier Aug 7, 2024
9dd3de8
chore: prevent automatic topic creation at consumer level
bvoiturier Aug 7, 2024
758da64
chore: create additional necessary Kafka topics in docker compose and…
bvoiturier Aug 7, 2024
0d3a015
chore: use constant for messaging service topic names
bvoiturier Aug 7, 2024
8972b77
Merge branch 'main' into feat/ATL-6983-zio-stream-kafka-poc-in-connec…
bvoiturier Aug 7, 2024
aae9211
feat: reinstate metrics for all consumers
bvoiturier Aug 7, 2024
e3e7c51
chore(cloud-agent): remove code duplicates
bvoiturier Aug 7, 2024
3a3c7d9
chore: move MessagingService to shared module
bvoiturier Aug 9, 2024
28109b6
feat: enable rebalance safe commits
bvoiturier Aug 13, 2024
bc47a74
chore: use Nginx for locally testing multi-instance deployment
bvoiturier Aug 13, 2024
c015bf4
Merge branch 'main' into feat/ATL-6983-zio-stream-kafka-poc-in-connec…
bvoiturier Aug 16, 2024
035dfad
Merge branch 'main' into feat/ATL-6983-zio-stream-kafka-poc-in-connec…
bvoiturier Aug 20, 2024
14c6db8
Merge branch 'main' into feat/ATL-6983-zio-stream-kafka-poc-in-connec…
bvoiturier Aug 21, 2024
6ea836c
Merge branch 'main' into feat/ATL-6983-zio-stream-kafka-poc-in-connec…
bvoiturier Sep 10, 2024
65ecc2e
test: add necessary Kafka-related services to integration tests conta…
bvoiturier Sep 10, 2024
3e1ed56
chore: reintroduce compilation failure on warning in build.sbt
bvoiturier Sep 10, 2024
b9eeaad
Merge branch 'main' into feat/ATL-6983-zio-stream-kafka-poc-in-connec…
bvoiturier Sep 10, 2024
16aeef0
chore: run scalafmt
bvoiturier Sep 10, 2024
1f8fd30
style: apply linters automatic fixes (#1340)
github-actions[bot] Sep 11, 2024
8730fe4
Merge branch 'main' into feat/ATL-6983-zio-stream-kafka-poc-in-connec…
bvoiturier Sep 11, 2024
ca924f7
feat(agnt): adding configurable consumer count for all jobs
bvoiturier Sep 16, 2024
74c297b
Merge branch 'main' into feat/ATL-6983-zio-stream-kafka-poc-in-connec…
bvoiturier Sep 16, 2024
67c71bb
feat(agent): support per flow/job retry strategy config
bvoiturier Sep 17, 2024
c1b3f8c
chore: extend default revocation status sync job recurrence to 30 secs
bvoiturier Sep 19, 2024
a7a2efe
feat(agent): handle Http4s service errors according to error handling…
bvoiturier Sep 19, 2024
d4c846c
Merge branch 'main' into feat/ATL-6983-zio-stream-kafka-poc-in-connec…
bvoiturier Sep 19, 2024
0a17d72
chore: run scalafmt
bvoiturier Sep 19, 2024
4097041
fix: make sure PSQL duplicate key error is correctly surfaced as an u…
bvoiturier Sep 19, 2024
e2eb106
Merge branch 'main' into feat/ATL-6983-zio-stream-kafka-poc-in-connec…
bvoiturier Sep 19, 2024
dd4e21a
Merge branch 'main' into feat/ATL-6983-zio-stream-kafka-poc-in-connec…
bvoiturier Sep 23, 2024
e3c26ba
feat: solve DID index race condition using Postgres advisory lock
bvoiturier Sep 24, 2024
6aebddc
Merge branch 'main' into feat/ATL-6983-zio-stream-kafka-poc-in-connec…
bvoiturier Sep 24, 2024
ea9b8a3
chore: delete "semaphore" related code
bvoiturier Sep 24, 2024
d4b17a2
chore: increase scalac max-inlines to 50 to fix DeriveConfig macro co…
bvoiturier Sep 30, 2024
f8cd974
Merge branch 'main' into feat/ATL-6983-zio-stream-kafka-poc-in-connec…
bvoiturier Sep 30, 2024
73a79f7
chore: run scalafmt
bvoiturier Sep 30, 2024
b35cbff
feat: solve race condition issue with VC status list index allocation
bvoiturier Sep 30, 2024
7f5d2c5
Merge branch 'main' into feat/ATL-6983-zio-stream-kafka-poc-in-connec…
bvoiturier Sep 30, 2024
4f3663c
fix: for kafka retry strategy generation from config
bvoiturier Sep 30, 2024
f933e25
chore: clean-up obsolete background jobs related config params
bvoiturier Sep 30, 2024
a8b49a5
chore: refactor DID state background job logic
bvoiturier Sep 30, 2024
29601ac
chore: set status list & DID state sync recurrence delay to 5 sec in …
bvoiturier Sep 30, 2024
e182254
Merge branch 'main' into feat/ATL-6983-zio-stream-kafka-poc-in-connec…
bvoiturier Sep 30, 2024
d6a89fb
feat: use unique adivosry lock id for the same wallet Id
bvoiturier Oct 2, 2024
93e0dab
chore: log at debug level
bvoiturier Oct 7, 2024
29bd636
Merge branch 'main' into feat/ATL-6983-zio-stream-kafka-poc-in-connec…
bvoiturier Oct 7, 2024
0fc30ec
chore: add recordId when dying with "record not found"
bvoiturier Oct 7, 2024
f1132a7
feat: support in-memory queue capacity configuration
bvoiturier Oct 7, 2024
e4c4260
chore: refactor messaging service config and migrate to messaging ser…
bvoiturier Oct 8, 2024
bb5ea8f
chore: refactor/simplify MessagingService-related layers creation
bvoiturier Oct 8, 2024
af1ace6
chore: run scalafmt
bvoiturier Oct 8, 2024
dddea95
Merge branch 'main' into feat/ATL-6983-zio-stream-kafka-poc-in-connec…
yshyn-iohk Oct 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ inThisBuild(
// scalacOptions += "-Yexplicit-nulls",
// scalacOptions += "-Ysafe-init",
// scalacOptions += "-Werror", // <=> "-Xfatal-warnings"
scalacOptions += "-Dquill.macro.log=false", // disable quill macro logs // TODO https://github.com/zio/zio-protoquill/issues/470
scalacOptions += "-Dquill.macro.log=false", // disable quill macro logs // TODO https://github.com/zio/zio-protoquill/issues/470,
scalacOptions ++= Seq("-Xmax-inlines", "50") // manually increase max-inlines above 32 (https://github.com/circe/circe/issues/2162)
)
)

Expand All @@ -53,6 +54,7 @@ lazy val V = new {
val zioCatsInterop = "3.3.0" // TODO "23.1.0.2" // https://mvnrepository.com/artifact/dev.zio/zio-interop-cats
val zioMetricsConnector = "2.3.1"
val zioMock = "1.0.0-RC12"
val zioKafka = "2.7.5"
val mockito = "3.2.18.0"
val monocle = "3.2.0"

Expand Down Expand Up @@ -102,7 +104,11 @@ lazy val D = new {
val zioLog: ModuleID = "dev.zio" %% "zio-logging" % V.zioLogging
val zioSLF4J: ModuleID = "dev.zio" %% "zio-logging-slf4j" % V.zioLogging
val zioJson: ModuleID = "dev.zio" %% "zio-json" % V.zioJson
val zioConcurrent: ModuleID = "dev.zio" %% "zio-concurrent" % V.zio
val zioHttp: ModuleID = "dev.zio" %% "zio-http" % V.zioHttp
val zioKafka: ModuleID = "dev.zio" %% "zio-kafka" % V.zioKafka excludeAll (
ExclusionRule("dev.zio", "zio_3"), ExclusionRule("dev.zio", "zio-streams_3")
)
val zioCatsInterop: ModuleID = "dev.zio" %% "zio-interop-cats" % V.zioCatsInterop
val zioMetricsConnectorMicrometer: ModuleID = "dev.zio" %% "zio-metrics-connectors-micrometer" % V.zioMetricsConnector
val tapirPrometheusMetrics: ModuleID = "com.softwaremill.sttp.tapir" %% "tapir-prometheus-metrics" % V.tapir
Expand Down Expand Up @@ -185,7 +191,9 @@ lazy val D_Shared = new {
D.typesafeConfig,
D.scalaPbGrpc,
D.zio,
D.zioConcurrent,
D.zioHttp,
D.zioKafka,
D.scalaUri,
D.zioPrelude,
// FIXME: split shared DB stuff as subproject?
Expand Down Expand Up @@ -341,12 +349,11 @@ lazy val D_Pollux_VC_JWT = new {

lazy val D_EventNotification = new {
val zio = "dev.zio" %% "zio" % V.zio
val zioConcurrent = "dev.zio" %% "zio-concurrent" % V.zio
val zioTest = "dev.zio" %% "zio-test" % V.zio % Test
val zioTestSbt = "dev.zio" %% "zio-test-sbt" % V.zio % Test
val zioTestMagnolia = "dev.zio" %% "zio-test-magnolia" % V.zio % Test

val zioDependencies: Seq[ModuleID] = Seq(zio, zioConcurrent, zioTest, zioTestSbt, zioTestMagnolia)
val zioDependencies: Seq[ModuleID] = Seq(zio, zioTest, zioTestSbt, zioTestMagnolia)
val baseDependencies: Seq[ModuleID] = zioDependencies
}

Expand Down
67 changes: 49 additions & 18 deletions cloud-agent/service/server/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,10 @@ pollux {
publicEndpointUrl = "http://localhost:"${agent.httpEndpoint.http.port}
publicEndpointUrl = ${?POLLUX_STATUS_LIST_REGISTRY_PUBLIC_URL}
}
issueBgJobRecordsLimit = 25
issueBgJobRecordsLimit = ${?ISSUE_BG_JOB_RECORDS_LIMIT}
issueBgJobRecurrenceDelay = 2 seconds
issueBgJobRecurrenceDelay = ${?ISSUE_BG_JOB_RECURRENCE_DELAY}
issueBgJobProcessingParallelism = 5
issueBgJobProcessingParallelism = ${?ISSUE_BG_JOB_PROCESSING_PARALLELISM}
presentationBgJobRecordsLimit = 25
presentationBgJobRecordsLimit = ${?PRESENTATION_BG_JOB_RECORDS_LIMIT}
presentationBgJobRecurrenceDelay = 2 seconds
presentationBgJobRecurrenceDelay = ${?PRESENTATION_BG_JOB_RECURRENCE_DELAY}
presentationBgJobProcessingParallelism = 5
presentationBgJobProcessingParallelism = ${?PRESENTATION_BG_JOB_PROCESSING_PARALLELISM}
syncRevocationStatusesBgJobRecurrenceDelay = 2 seconds
syncRevocationStatusesBgJobRecurrenceDelay = ${?SYNC_REVOCATION_STATUSES_BG_JOB_RECURRENCE_DELAY}
syncRevocationStatusesBgJobProcessingParallelism = 5
syncRevocationStatusesBgJobProcessingParallelism = ${?SYNC_REVOCATION_STATUSES_BG_JOB_PROCESSING_PARALLELISM}
statusListSyncTriggerRecurrenceDelay = 30 seconds
statusListSyncTriggerRecurrenceDelay = ${?STATUS_LIST_SYNC_TRIGGER_RECURRENCE_DELAY}
didStateSyncTriggerRecurrenceDelay = 30 seconds
didStateSyncTriggerRecurrenceDelay = ${?DID_STATE_SYNC_TRIGGER_RECURRENCE_DELAY}
credential.sdJwt.expiry = 30 days
credential.sdJwt.expiry = ${?CREDENTIAL_SD_JWT_EXPIRY}
presentationInvitationExpiry = 300 seconds
Expand Down Expand Up @@ -81,8 +69,6 @@ connect {
connectBgJobRecordsLimit = ${?CONNECT_BG_JOB_RECORDS_LIMIT}
connectBgJobRecurrenceDelay = 2 seconds
connectBgJobRecurrenceDelay = ${?CONNECT_BG_JOB_RECURRENCE_DELAY}
connectBgJobProcessingParallelism = 5
connectBgJobProcessingParallelism = ${?CONNECT_BG_JOB_PROCESSING_PARALLELISM}
connectInvitationExpiry = 300 seconds
connectInvitationExpiry = ${?CONNECT_INVITATION_EXPIRY}
}
Expand Down Expand Up @@ -262,4 +248,49 @@ agent {
authApiKey = "default"
authApiKey = ${?DEFAULT_WALLET_AUTH_API_KEY}
}
messagingService {
connectFlow {
consumerCount = 5
retryStrategy {
maxRetries = 4
initialDelay = 5.seconds
maxDelay = 40.seconds
}
}
issueFlow {
consumerCount = 5
retryStrategy {
maxRetries = 4
initialDelay = 5.seconds
maxDelay = 40.seconds
}
}
presentFlow {
consumerCount = 5
retryStrategy {
maxRetries = 4
initialDelay = 5.seconds
maxDelay = 40.seconds
}
}
didStateSync {
consumerCount = 5
}
statusListSync {
consumerCount = 5
}
inMemoryQueueCapacity = 1000
kafkaEnabled = false
kafkaEnabled = ${?DEFAULT_KAFKA_ENABLED}
kafka {
bootstrapServers = "kafka:9092"
consumers {
autoCreateTopics = false,
maxPollRecords = 500
maxPollInterval = 5.minutes
pollTimeout = 50.millis
rebalanceSafeCommits = true
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,121 +5,45 @@ import org.hyperledger.identus.agent.server.config.AppConfig
import org.hyperledger.identus.agent.server.http.{ZHttp4sBlazeServer, ZHttpEndpoints}
import org.hyperledger.identus.agent.server.jobs.*
import org.hyperledger.identus.agent.walletapi.model.{Entity, Wallet, WalletSeed}
import org.hyperledger.identus.agent.walletapi.service.{EntityService, ManagedDIDService, WalletManagementService}
import org.hyperledger.identus.agent.walletapi.storage.DIDNonSecretStorage
import org.hyperledger.identus.agent.walletapi.service.{EntityService, WalletManagementService}
import org.hyperledger.identus.castor.controller.{DIDRegistrarServerEndpoints, DIDServerEndpoints}
import org.hyperledger.identus.castor.core.service.DIDService
import org.hyperledger.identus.connect.controller.ConnectionServerEndpoints
import org.hyperledger.identus.connect.core.service.ConnectionService
import org.hyperledger.identus.credentialstatus.controller.CredentialStatusServiceEndpoints
import org.hyperledger.identus.event.controller.EventServerEndpoints
import org.hyperledger.identus.event.notification.EventNotificationConfig
import org.hyperledger.identus.iam.authentication.apikey.ApiKeyAuthenticator
import org.hyperledger.identus.iam.entity.http.EntityServerEndpoints
import org.hyperledger.identus.iam.wallet.http.WalletManagementServerEndpoints
import org.hyperledger.identus.issue.controller.IssueServerEndpoints
import org.hyperledger.identus.mercury.{DidOps, HttpClient}
import org.hyperledger.identus.oid4vci.CredentialIssuerServerEndpoints
import org.hyperledger.identus.pollux.core.service.{CredentialService, PresentationService}
import org.hyperledger.identus.pollux.credentialdefinition.CredentialDefinitionRegistryServerEndpoints
import org.hyperledger.identus.pollux.credentialschema.{
SchemaRegistryServerEndpoints,
VerificationPolicyServerEndpoints
}
import org.hyperledger.identus.pollux.prex.PresentationExchangeServerEndpoints
import org.hyperledger.identus.pollux.vc.jwt.DidResolver as JwtDidResolver
import org.hyperledger.identus.presentproof.controller.PresentProofServerEndpoints
import org.hyperledger.identus.resolvers.DIDResolver
import org.hyperledger.identus.shared.http.UriResolver
import org.hyperledger.identus.shared.models.{HexString, WalletAccessContext, WalletAdministrationContext, WalletId}
import org.hyperledger.identus.shared.utils.DurationOps.toMetricsSeconds
import org.hyperledger.identus.shared.models.*
import org.hyperledger.identus.system.controller.SystemServerEndpoints
import org.hyperledger.identus.verification.controller.VcVerificationServerEndpoints
import zio.*
import zio.metrics.*

object CloudAgentApp {

def run = for {
_ <- AgentInitialization.run
_ <- issueCredentialDidCommExchangesJob.debug.fork
_ <- presentProofExchangeJob.debug.fork
_ <- connectDidCommExchangesJob.debug.fork
_ <- syncDIDPublicationStateFromDltJob.debug.fork
_ <- syncRevocationStatusListsJob.debug.fork
_ <- ConnectBackgroundJobs.connectFlowsHandler
_ <- IssueBackgroundJobs.issueFlowsHandler
_ <- PresentBackgroundJobs.presentFlowsHandler
_ <- DIDStateSyncBackgroundJobs.didStateSyncTrigger
_ <- DIDStateSyncBackgroundJobs.didStateSyncHandler
_ <- StatusListJobs.statusListsSyncTrigger
_ <- StatusListJobs.statusListSyncHandler
_ <- AgentHttpServer.run.tapDefect(e => ZIO.logErrorCause("Agent HTTP Server failure", e)).fork
fiber <- DidCommHttpServer.run.tapDefect(e => ZIO.logErrorCause("DIDComm HTTP Server failure", e)).fork
_ <- WebhookPublisher.layer.build.map(_.get[WebhookPublisher]).flatMap(_.run.fork)
_ <- fiber.join *> ZIO.log(s"Server End")
_ <- ZIO.never
} yield ()

private val issueCredentialDidCommExchangesJob: RIO[
AppConfig & DidOps & DIDResolver & JwtDidResolver & HttpClient & CredentialService & DIDNonSecretStorage &
DIDService & ManagedDIDService & PresentationService & WalletManagementService,
Unit
] =
for {
config <- ZIO.service[AppConfig]
_ <- (IssueBackgroundJobs.issueCredentialDidCommExchanges @@ Metric
.gauge("issuance_flow_did_com_exchange_job_ms_gauge")
.trackDurationWith(_.toMetricsSeconds))
.repeat(Schedule.spaced(config.pollux.issueBgJobRecurrenceDelay))
.unit
} yield ()

private val presentProofExchangeJob: RIO[
AppConfig & DidOps & UriResolver & DIDResolver & JwtDidResolver & HttpClient & PresentationService &
CredentialService & DIDNonSecretStorage & DIDService & ManagedDIDService,
Unit
] =
for {
config <- ZIO.service[AppConfig]
_ <- (PresentBackgroundJobs.presentProofExchanges @@ Metric
.gauge("present_proof_flow_did_com_exchange_job_ms_gauge")
.trackDurationWith(_.toMetricsSeconds))
.repeat(Schedule.spaced(config.pollux.presentationBgJobRecurrenceDelay))
.unit
} yield ()

private val connectDidCommExchangesJob: RIO[
AppConfig & DidOps & DIDResolver & HttpClient & ConnectionService & ManagedDIDService & DIDNonSecretStorage &
WalletManagementService,
Unit
] =
for {
config <- ZIO.service[AppConfig]
_ <- (ConnectBackgroundJobs.didCommExchanges @@ Metric
.gauge("connection_flow_did_com_exchange_job_ms_gauge")
.trackDurationWith(_.toMetricsSeconds))
.repeat(Schedule.spaced(config.connect.connectBgJobRecurrenceDelay))
.unit
} yield ()

private val syncRevocationStatusListsJob = {
for {
config <- ZIO.service[AppConfig]
_ <- (StatusListJobs.syncRevocationStatuses @@ Metric
.gauge("revocation_status_list_sync_job_ms_gauge")
.trackDurationWith(_.toMetricsSeconds))
.repeat(Schedule.spaced(config.pollux.syncRevocationStatusesBgJobRecurrenceDelay))
} yield ()
}

private val syncDIDPublicationStateFromDltJob: URIO[ManagedDIDService & WalletManagementService, Unit] =
ZIO
.serviceWithZIO[WalletManagementService](_.listWallets().map(_._1))
.flatMap { wallets =>
ZIO.foreach(wallets) { wallet =>
DIDStateSyncBackgroundJobs.syncDIDPublicationStateFromDlt
.provideSomeLayer(ZLayer.succeed(WalletAccessContext(wallet.id)))
}
}
.catchAll(e => ZIO.logError(s"error while syncing DID publication state: $e"))
.repeat(Schedule.spaced(10.seconds))
.unit
.provideSomeLayer(ZLayer.succeed(WalletAdministrationContext.Admin()))

}

object AgentHttpServer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import org.hyperledger.identus.agent.server.http.ZioHttpClient
import org.hyperledger.identus.agent.server.sql.Migrations as AgentMigrations
import org.hyperledger.identus.agent.walletapi.service.{
EntityServiceImpl,
ManagedDIDService,
ManagedDIDServiceWithEventNotificationImpl,
WalletManagementServiceImpl
}
Expand All @@ -16,7 +15,6 @@ import org.hyperledger.identus.agent.walletapi.sql.{
JdbcEntityRepository,
JdbcWalletNonSecretStorage
}
import org.hyperledger.identus.agent.walletapi.storage.GenericSecretStorage
import org.hyperledger.identus.castor.controller.{DIDControllerImpl, DIDRegistrarControllerImpl}
import org.hyperledger.identus.castor.core.model.did.{
Service as DidDocumentService,
Expand All @@ -36,7 +34,7 @@ import org.hyperledger.identus.iam.authentication.{DefaultAuthenticator, Oid4vci
import org.hyperledger.identus.iam.authentication.apikey.JdbcAuthenticationRepository
import org.hyperledger.identus.iam.authorization.core.EntityPermissionManagementService
import org.hyperledger.identus.iam.authorization.DefaultPermissionManagementService
import org.hyperledger.identus.iam.entity.http.controller.{EntityController, EntityControllerImpl}
import org.hyperledger.identus.iam.entity.http.controller.EntityControllerImpl
import org.hyperledger.identus.iam.wallet.http.controller.WalletManagementControllerImpl
import org.hyperledger.identus.issue.controller.IssueControllerImpl
import org.hyperledger.identus.mercury.*
Expand All @@ -47,7 +45,6 @@ import org.hyperledger.identus.pollux.core.service.*
import org.hyperledger.identus.pollux.core.service.verification.VcVerificationServiceImpl
import org.hyperledger.identus.pollux.credentialdefinition.controller.CredentialDefinitionControllerImpl
import org.hyperledger.identus.pollux.credentialschema.controller.{
CredentialSchemaController,
CredentialSchemaControllerImpl,
VerificationPolicyControllerImpl
}
Expand All @@ -66,6 +63,9 @@ import org.hyperledger.identus.pollux.sql.repository.{
}
import org.hyperledger.identus.presentproof.controller.PresentProofControllerImpl
import org.hyperledger.identus.resolvers.DIDResolver
import org.hyperledger.identus.shared.messaging
import org.hyperledger.identus.shared.messaging.WalletIdAndRecordId
import org.hyperledger.identus.shared.models.WalletId
import org.hyperledger.identus.system.controller.SystemControllerImpl
import org.hyperledger.identus.verification.controller.VcVerificationControllerImpl
import zio.*
Expand All @@ -77,6 +77,7 @@ import zio.metrics.connectors.micrometer.MicrometerConfig
import zio.metrics.jvm.DefaultJvmMetrics

import java.security.Security
import java.util.UUID

object MainApp extends ZIOAppDefault {

Expand Down Expand Up @@ -167,7 +168,6 @@ object MainApp extends ZIOAppDefault {
)
_ <- preMigrations
_ <- migrations

app <- CloudAgentApp.run
.provide(
DidCommX.liveLayer,
Expand Down Expand Up @@ -252,6 +252,11 @@ object MainApp extends ZIOAppDefault {
// HTTP client
SystemModule.zioHttpClientLayer,
Scope.default,
// Messaging Service
ZLayer.fromZIO(ZIO.service[AppConfig].map(_.agent.messagingService)),
messaging.MessagingService.serviceLayer,
messaging.MessagingService.producerLayer[UUID, WalletIdAndRecordId],
messaging.MessagingService.producerLayer[WalletId, WalletId]
)
} yield app

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import org.hyperledger.identus.castor.core.model.did.VerificationRelationship
import org.hyperledger.identus.iam.authentication.AuthenticationConfig
import org.hyperledger.identus.pollux.vc.jwt.*
import org.hyperledger.identus.shared.db.DbConfig
import zio.config.*
import org.hyperledger.identus.shared.messaging.MessagingServiceConfig
import zio.config.magnolia.*
import zio.Config

Expand Down Expand Up @@ -70,22 +70,13 @@ final case class PolluxConfig(
database: DatabaseConfig,
credentialSdJwtExpirationTime: Duration,
statusListRegistry: StatusListRegistryConfig,
issueBgJobRecordsLimit: Int,
issueBgJobRecurrenceDelay: Duration,
issueBgJobProcessingParallelism: Int,
presentationBgJobRecordsLimit: Int,
presentationBgJobRecurrenceDelay: Duration,
presentationBgJobProcessingParallelism: Int,
syncRevocationStatusesBgJobRecurrenceDelay: Duration,
syncRevocationStatusesBgJobProcessingParallelism: Int,
statusListSyncTriggerRecurrenceDelay: Duration,
didStateSyncTriggerRecurrenceDelay: Duration,
presentationInvitationExpiry: Duration,
issuanceInvitationExpiry: Duration,
)
final case class ConnectConfig(
database: DatabaseConfig,
connectBgJobRecordsLimit: Int,
connectBgJobRecurrenceDelay: Duration,
connectBgJobProcessingParallelism: Int,
connectInvitationExpiry: Duration,
)

Expand Down Expand Up @@ -173,7 +164,8 @@ final case class AgentConfig(
verification: VerificationConfig,
secretStorage: SecretStorageConfig,
webhookPublisher: WebhookPublisherConfig,
defaultWallet: DefaultWalletConfig
defaultWallet: DefaultWalletConfig,
messagingService: MessagingServiceConfig
) {
def validate: Either[String, Unit] =
for {
Expand Down
Loading
Loading