Skip to content

Commit

Permalink
chore: refactor messaging service config and migrate to messaging ser…
Browse files Browse the repository at this point in the history
…vice package

Signed-off-by: Benjamin Voiturier <benjamin.voiturier@iohk.io>
  • Loading branch information
bvoiturier committed Oct 8, 2024
1 parent f1132a7 commit e4c4260
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 49 deletions.
24 changes: 13 additions & 11 deletions cloud-agent/service/server/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -249,16 +249,7 @@ agent {
authApiKey = ${?DEFAULT_WALLET_AUTH_API_KEY}
}
inMemoryQueueCapacity = 1000
kafka {
enabled = false
enabled = ${?DEFAULT_KAFKA_ENABLED}
bootstrapServers = "kafka:9092"
consumers {
autoCreateTopics = false,
maxPollRecords = 500
maxPollInterval = 5.minutes
pollTimeout = 50.millis
rebalanceSafeCommits = true
messagingService {
connectFlow {
consumerCount = 5
retryStrategy {
Expand Down Expand Up @@ -289,6 +280,17 @@ agent {
statusListSync {
consumerCount = 5
}
}
kafkaEnabled = false
kafkaEnabled = ${?DEFAULT_KAFKA_ENABLED}
kafka {
bootstrapServers = "kafka:9092"
consumers {
autoCreateTopics = false,
maxPollRecords = 500
maxPollInterval = 5.minutes
pollTimeout = 50.millis
rebalanceSafeCommits = true
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ object MainApp extends ZIOAppDefault {
_ <- migrations
appConfig <- ZIO.service[AppConfig].provide(SystemModule.configLayer)
messagingServiceLayer <-
if (appConfig.agent.kafka.enabled) {
val kafkaConfig = appConfig.agent.kafka
if (appConfig.agent.messagingService.kafkaEnabled) {
val kafkaConfig = appConfig.agent.messagingService.kafka.get
ZIO.succeed(
ZKafkaMessagingServiceImpl.layer(
kafkaConfig.bootstrapServers.split(',').toList,
Expand All @@ -192,7 +192,7 @@ object MainApp extends ZIOAppDefault {
)
}
messageProducerLayer <-
if (appConfig.agent.kafka.enabled) {
if (appConfig.agent.messagingService.kafkaEnabled) {
ZIO.succeed(
ZKafkaProducerImpl.layer[UUID, WalletIdAndRecordId]
)
Expand All @@ -202,7 +202,7 @@ object MainApp extends ZIOAppDefault {
)
}
syncDIDStateProducerLayer <-
if (appConfig.agent.kafka.enabled) {
if (appConfig.agent.messagingService.kafkaEnabled) {
ZIO.succeed(
ZKafkaProducerImpl.layer[WalletId, WalletId]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import org.hyperledger.identus.castor.core.model.did.VerificationRelationship
import org.hyperledger.identus.iam.authentication.AuthenticationConfig
import org.hyperledger.identus.pollux.vc.jwt.*
import org.hyperledger.identus.shared.db.DbConfig
import org.hyperledger.identus.shared.messaging.MessagingServiceConfig
import zio.config.magnolia.*
import zio.Config

Expand Down Expand Up @@ -154,32 +155,6 @@ final case class DefaultWalletConfig(
authApiKey: String
)

final case class KafkaConfig(enabled: Boolean, bootstrapServers: String, consumers: KafkaConsumersConfig)

final case class KafkaConsumersConfig(
connectFlow: KafkaConsumerJobConfig,
issueFlow: KafkaConsumerJobConfig,
presentFlow: KafkaConsumerJobConfig,
didStateSync: KafkaConsumerJobConfig,
statusListSync: KafkaConsumerJobConfig,
maxPollRecords: Int,
maxPollInterval: Duration,
pollTimeout: Duration,
rebalanceSafeCommits: Boolean,
autoCreateTopics: Boolean,
)

final case class KafkaConsumerJobConfig(
consumerCount: Int,
retryStrategy: Option[KafkaConsumerRetryStrategy]
)

final case class KafkaConsumerRetryStrategy(
maxRetries: Int,
initialDelay: Duration,
maxDelay: Duration,
)

final case class AgentConfig(
httpEndpoint: HttpEndpointConfig,
didCommEndpoint: DidCommEndpointConfig,
Expand All @@ -191,7 +166,7 @@ final case class AgentConfig(
webhookPublisher: WebhookPublisherConfig,
defaultWallet: DefaultWalletConfig,
inMemoryQueueCapacity: Int,
kafka: KafkaConfig
messagingService: MessagingServiceConfig
) {
def validate: Either[String, Unit] =
for {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.hyperledger.identus.agent.server.jobs

import org.hyperledger.identus.agent.server.config.KafkaConsumerJobConfig
import org.hyperledger.identus.agent.walletapi.model.{ManagedDIDState, PublicationState}
import org.hyperledger.identus.agent.walletapi.model.error.DIDSecretStorageError.{KeyNotFoundError, WalletNotFoundError}
import org.hyperledger.identus.agent.walletapi.model.error.GetManagedDIDError
Expand Down Expand Up @@ -29,6 +28,7 @@ import org.hyperledger.identus.pollux.vc.jwt.{
*
}
import org.hyperledger.identus.shared.crypto.*
import org.hyperledger.identus.shared.messaging.ConsumerJobConfig
import org.hyperledger.identus.shared.messaging.MessagingService.RetryStep
import org.hyperledger.identus.shared.models.{KeyId, WalletAccessContext}
import zio.{durationInt, Duration, ZIO, ZLayer}
Expand Down Expand Up @@ -232,7 +232,7 @@ trait BackgroundJobsHelper {
}
}

def retryStepsFromConfig(topicName: String, jobConfig: KafkaConsumerJobConfig): Seq[RetryStep] = {
def retryStepsFromConfig(topicName: String, jobConfig: ConsumerJobConfig): Seq[RetryStep] = {
val retryTopics = jobConfig.retryStrategy match
case None => Seq.empty
case Some(rs) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object ConnectBackgroundJobs extends BackgroundJobsHelper {
_ <- messaging.MessagingService.consumeWithRetryStrategy(
"identus-cloud-agent",
ConnectBackgroundJobs.handleMessage,
retryStepsFromConfig(TOPIC_NAME, appConfig.agent.kafka.consumers.connectFlow)
retryStepsFromConfig(TOPIC_NAME, appConfig.agent.messagingService.connectFlow)
)
} yield ()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ object DIDStateSyncBackgroundJobs extends BackgroundJobsHelper {
_ <- MessagingService.consumeWithRetryStrategy(
"identus-cloud-agent",
DIDStateSyncBackgroundJobs.handleMessage,
retryStepsFromConfig(TOPIC_NAME, appConfig.agent.kafka.consumers.didStateSync)
retryStepsFromConfig(TOPIC_NAME, appConfig.agent.messagingService.didStateSync)
)
} yield ()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object IssueBackgroundJobs extends BackgroundJobsHelper {
_ <- messaging.MessagingService.consumeWithRetryStrategy(
"identus-cloud-agent",
IssueBackgroundJobs.handleMessage,
retryStepsFromConfig(TOPIC_NAME, appConfig.agent.kafka.consumers.issueFlow)
retryStepsFromConfig(TOPIC_NAME, appConfig.agent.messagingService.issueFlow)
)
} yield ()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ object PresentBackgroundJobs extends BackgroundJobsHelper {
_ <- messaging.MessagingService.consumeWithRetryStrategy(
"identus-cloud-agent",
PresentBackgroundJobs.handleMessage,
retryStepsFromConfig(TOPIC_NAME, appConfig.agent.kafka.consumers.presentFlow)
retryStepsFromConfig(TOPIC_NAME, appConfig.agent.messagingService.presentFlow)
)
} yield ()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object StatusListJobs extends BackgroundJobsHelper {
_ <- messaging.MessagingService.consumeWithRetryStrategy(
"identus-cloud-agent",
StatusListJobs.handleMessage,
retryStepsFromConfig(TOPIC_NAME, appConfig.agent.kafka.consumers.statusListSync)
retryStepsFromConfig(TOPIC_NAME, appConfig.agent.messagingService.statusListSync)
)
} yield ()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.hyperledger.identus.shared.messaging

import java.time.Duration

case class MessagingServiceConfig(
connectFlow: ConsumerJobConfig,
issueFlow: ConsumerJobConfig,
presentFlow: ConsumerJobConfig,
didStateSync: ConsumerJobConfig,
statusListSync: ConsumerJobConfig,
kafkaEnabled: Boolean,
kafka: Option[KafkaConfig]
)

final case class ConsumerJobConfig(
consumerCount: Int,
retryStrategy: Option[ConsumerRetryStrategy]
)

final case class ConsumerRetryStrategy(
maxRetries: Int,
initialDelay: Duration,
maxDelay: Duration
)

final case class KafkaConfig(
bootstrapServers: String,
consumers: KafkaConsumersConfig
)

final case class KafkaConsumersConfig(
autoCreateTopics: Boolean,
maxPollRecords: Int,
maxPollInterval: Duration,
pollTimeout: Duration,
rebalanceSafeCommits: Boolean
)

0 comments on commit e4c4260

Please sign in to comment.