Skip to content

Commit

Permalink
chore: refactor/simplify MessagingService-related layers creation
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Voiturier <benjamin.voiturier@iohk.io>
  • Loading branch information
bvoiturier committed Oct 8, 2024
1 parent e4c4260 commit bb5ea8f
Show file tree
Hide file tree
Showing 14 changed files with 107 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ agent {
authApiKey = "default"
authApiKey = ${?DEFAULT_WALLET_AUTH_API_KEY}
}
inMemoryQueueCapacity = 1000
messagingService {
connectFlow {
consumerCount = 5
Expand Down Expand Up @@ -280,6 +279,7 @@ agent {
statusListSync {
consumerCount = 5
}
inMemoryQueueCapacity = 1000
kafkaEnabled = false
kafkaEnabled = ${?DEFAULT_KAFKA_ENABLED}
kafka {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,7 @@ import org.hyperledger.identus.pollux.sql.repository.{
}
import org.hyperledger.identus.presentproof.controller.PresentProofControllerImpl
import org.hyperledger.identus.resolvers.DIDResolver
import org.hyperledger.identus.shared.messaging.kafka.{
InMemoryMessagingService,
ZKafkaMessagingServiceImpl,
ZKafkaProducerImpl
}
import org.hyperledger.identus.shared.messaging
import org.hyperledger.identus.shared.messaging.WalletIdAndRecordId
import org.hyperledger.identus.shared.models.WalletId
import org.hyperledger.identus.system.controller.SystemControllerImpl
Expand Down Expand Up @@ -172,46 +168,6 @@ object MainApp extends ZIOAppDefault {
)
_ <- preMigrations
_ <- migrations
appConfig <- ZIO.service[AppConfig].provide(SystemModule.configLayer)
messagingServiceLayer <-
if (appConfig.agent.messagingService.kafkaEnabled) {
val kafkaConfig = appConfig.agent.messagingService.kafka.get
ZIO.succeed(
ZKafkaMessagingServiceImpl.layer(
kafkaConfig.bootstrapServers.split(',').toList,
kafkaConfig.consumers.autoCreateTopics,
kafkaConfig.consumers.maxPollRecords,
kafkaConfig.consumers.maxPollInterval,
kafkaConfig.consumers.pollTimeout,
kafkaConfig.consumers.rebalanceSafeCommits
)
)
} else {
ZIO.succeed(
ZLayer.succeed(appConfig.agent.inMemoryQueueCapacity) >>> InMemoryMessagingService.messagingServiceLayer
)
}
messageProducerLayer <-
if (appConfig.agent.messagingService.kafkaEnabled) {
ZIO.succeed(
ZKafkaProducerImpl.layer[UUID, WalletIdAndRecordId]
)
} else {
ZIO.succeed(
InMemoryMessagingService.producerLayer[UUID, WalletIdAndRecordId]
)
}
syncDIDStateProducerLayer <-
if (appConfig.agent.messagingService.kafkaEnabled) {
ZIO.succeed(
ZKafkaProducerImpl.layer[WalletId, WalletId]
)
} else {
ZIO.succeed(
InMemoryMessagingService.producerLayer[WalletId, WalletId]
)
}

app <- CloudAgentApp.run
.provide(
DidCommX.liveLayer,
Expand Down Expand Up @@ -296,9 +252,11 @@ object MainApp extends ZIOAppDefault {
// HTTP client
SystemModule.zioHttpClientLayer,
Scope.default,
messagingServiceLayer,
messageProducerLayer,
syncDIDStateProducerLayer
// Messaging Service
ZLayer.fromZIO(ZIO.service[AppConfig].map(_.agent.messagingService)),
messaging.MessagingService.serviceLayer,
messaging.MessagingService.producerLayer[UUID, WalletIdAndRecordId],
messaging.MessagingService.producerLayer[WalletId, WalletId]
)
} yield app

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import org.hyperledger.identus.iam.authentication.AuthenticationConfig
import org.hyperledger.identus.pollux.vc.jwt.*
import org.hyperledger.identus.shared.db.DbConfig
import org.hyperledger.identus.shared.messaging.MessagingServiceConfig
import zio.config.magnolia.*
import zio.Config
import zio.config.magnolia.*

import java.net.URL
import java.time.Duration
Expand Down Expand Up @@ -165,7 +165,6 @@ final case class AgentConfig(
secretStorage: SecretStorageConfig,
webhookPublisher: WebhookPublisherConfig,
defaultWallet: DefaultWalletConfig,
inMemoryQueueCapacity: Int,
messagingService: MessagingServiceConfig
) {
def validate: Either[String, Unit] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,19 @@ import org.hyperledger.identus.castor.core.service.{DIDService, MockDIDService}
import org.hyperledger.identus.oid4vci.http.{ClaimDescriptor, CredentialDefinition, Localization}
import org.hyperledger.identus.oid4vci.service.{OIDCCredentialIssuerService, OIDCCredentialIssuerServiceImpl}
import org.hyperledger.identus.oid4vci.storage.InMemoryIssuanceSessionService
import org.hyperledger.identus.pollux.core.model.oid4vci.CredentialConfiguration
import org.hyperledger.identus.pollux.core.model.CredentialFormat
import org.hyperledger.identus.pollux.core.repository.{
CredentialRepository,
CredentialRepositoryInMemory,
CredentialStatusListRepositoryInMemory
}
import org.hyperledger.identus.pollux.core.model.oid4vci.CredentialConfiguration
import org.hyperledger.identus.pollux.core.repository.{CredentialRepositoryInMemory, CredentialStatusListRepositoryInMemory}
import org.hyperledger.identus.pollux.core.service.*
import org.hyperledger.identus.pollux.core.service.uriResolvers.ResourceUrlResolver
import org.hyperledger.identus.pollux.vc.jwt.PrismDidResolver
import org.hyperledger.identus.shared.messaging.kafka.InMemoryMessagingService
import org.hyperledger.identus.shared.messaging.WalletIdAndRecordId
import org.hyperledger.identus.shared.messaging.{MessagingService, MessagingServiceConfig, WalletIdAndRecordId}
import org.hyperledger.identus.shared.models.{WalletAccessContext, WalletId}
import zio.{Clock, Random, URLayer, ZIO, ZLayer}
import zio.json.*
import zio.json.ast.Json
import zio.mock.MockSpecDefault
import zio.test.*
import zio.test.Assertion.*
import zio.{Clock, Random, URLayer, ZIO, ZLayer}

import java.net.URI
import java.time.Instant
Expand Down Expand Up @@ -56,8 +50,8 @@ object OIDCCredentialIssuerServiceSpec
GenericSecretStorageInMemory.layer,
LinkSecretServiceImpl.layer,
CredentialServiceImpl.layer,
(ZLayer.succeed(100) >>> InMemoryMessagingService.messagingServiceLayer >>>
InMemoryMessagingService.producerLayer[UUID, WalletIdAndRecordId]).orDie,
(MessagingServiceConfig.inMemoryLayer >>> MessagingService.serviceLayer >>>
MessagingService.producerLayer[UUID, WalletIdAndRecordId]).orDie,
OIDCCredentialIssuerServiceImpl.layer
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
package org.hyperledger.identus.connect.core.service

import io.circe.syntax.*
import org.hyperledger.identus.connect.core.model.ConnectionRecord.*
import org.hyperledger.identus.connect.core.model.error.ConnectionServiceError
import org.hyperledger.identus.connect.core.model.error.ConnectionServiceError.InvalidStateForOperation
import org.hyperledger.identus.connect.core.model.ConnectionRecord
import org.hyperledger.identus.connect.core.model.ConnectionRecord.*
import org.hyperledger.identus.connect.core.repository.ConnectionRepositoryInMemory
import org.hyperledger.identus.mercury.model.{DidId, Message}
import org.hyperledger.identus.mercury.protocol.connection.ConnectionResponse
import org.hyperledger.identus.shared.messaging.kafka.InMemoryMessagingService
import org.hyperledger.identus.shared.messaging
import org.hyperledger.identus.shared.messaging.WalletIdAndRecordId
import org.hyperledger.identus.shared.models.*
import org.hyperledger.identus.shared.models.{WalletAccessContext, WalletId}
import zio.*
import zio.test.*
import zio.test.Assertion.*

import java.time.Instant
import java.util.UUID

object ConnectionServiceImplSpec extends ZIOSpecDefault {
Expand Down Expand Up @@ -315,8 +312,9 @@ object ConnectionServiceImplSpec extends ZIOSpecDefault {
}
).provide(
connectionServiceLayer,
ZLayer.succeed(100) >>> InMemoryMessagingService.messagingServiceLayer,
InMemoryMessagingService.producerLayer[UUID, WalletIdAndRecordId],
messaging.MessagingServiceConfig.inMemoryLayer,
messaging.MessagingService.serviceLayer,
messaging.MessagingService.producerLayer[UUID, WalletIdAndRecordId],
ZLayer.succeed(WalletAccessContext(WalletId.random)),
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ import org.hyperledger.identus.event.notification.*
import org.hyperledger.identus.mercury.model.DidId
import org.hyperledger.identus.mercury.protocol.connection.{ConnectionRequest, ConnectionResponse}
import org.hyperledger.identus.mercury.protocol.invitation.v2.Invitation
import org.hyperledger.identus.shared.messaging.kafka.{ZKafkaMessagingServiceImpl, ZKafkaProducerImpl}
import org.hyperledger.identus.shared.messaging
import org.hyperledger.identus.shared.messaging.WalletIdAndRecordId
import org.hyperledger.identus.shared.models.*
import org.hyperledger.identus.shared.models.{WalletAccessContext, WalletId}
import zio.*
import zio.mock.Expectation
Expand Down Expand Up @@ -154,8 +153,9 @@ object ConnectionServiceNotifierSpec extends ZIOSpecDefault {
inviteeExpectations.toLayer
) >>> ConnectionServiceNotifier.layer,
ZLayer.succeed(WalletAccessContext(WalletId.random)),
ZKafkaMessagingServiceImpl.layer(List("localhost:29092")),
ZKafkaProducerImpl.layer[UUID, WalletIdAndRecordId]
messaging.MessagingServiceConfig.inMemoryLayer,
messaging.MessagingService.serviceLayer,
messaging.MessagingService.producerLayer[UUID, WalletIdAndRecordId]
)
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package org.hyperledger.identus.messaging

import org.hyperledger.identus.shared.messaging
import org.hyperledger.identus.shared.messaging.{Message, MessagingService, Serde}
import org.hyperledger.identus.shared.messaging.kafka.ZKafkaMessagingServiceImpl
import zio.{durationInt, Random, Schedule, Scope, URIO, ZIO, ZIOAppArgs, ZIOAppDefault, ZLayer}
import zio.json.{DecoderOps, DeriveJsonDecoder, DeriveJsonEncoder, EncoderOps, JsonDecoder, JsonEncoder}
import zio.{Random, Schedule, Scope, URIO, ZIO, ZIOAppArgs, ZIOAppDefault, ZLayer, durationInt}

import java.nio.charset.StandardCharsets
import java.util.UUID
Expand Down Expand Up @@ -36,7 +36,8 @@ object MessagingServiceTest extends ZIOAppDefault {
_ <- ZIO.never
} yield ()
effect.provide(
ZKafkaMessagingServiceImpl.layer(List("localhost:29092")),
messaging.MessagingServiceConfig.inMemoryLayer,
messaging.MessagingService.serviceLayer,
ZLayer.succeed("Sample 'R' passed to handler")
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package org.hyperledger.identus.messaging.kafka

import org.hyperledger.identus.messaging.*
import org.hyperledger.identus.shared.messaging.{Consumer, Message, Producer}
import org.hyperledger.identus.shared.messaging.kafka.InMemoryMessagingService
import org.hyperledger.identus.shared.messaging.*
import zio.*
import zio.test.*
import zio.test.Assertion.*

object InMemoryMessagingServiceSpec extends ZIOSpecDefault {
val testLayer = ZLayer.succeed(100) >+> InMemoryMessagingService.messagingServiceLayer >+>
InMemoryMessagingService.producerLayer[String, String] >+>
InMemoryMessagingService.consumerLayer[String, String]("test-group")
val testLayer = MessagingServiceConfig.inMemoryLayer >+> MessagingService.serviceLayer >+>
MessagingService.producerLayer[String, String] >+>
MessagingService.consumerLayer[String, String]("test-group")

def spec = suite("InMemoryMessagingServiceSpec")(
test("should produce and consume messages") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,17 @@ package org.hyperledger.identus.pollux.core.service
import io.circe.Json
import org.hyperledger.identus.agent.walletapi.memory.GenericSecretStorageInMemory
import org.hyperledger.identus.agent.walletapi.service.ManagedDIDService
import org.hyperledger.identus.agent.walletapi.storage.GenericSecretStorage
import org.hyperledger.identus.castor.core.model.did.PrismDID
import org.hyperledger.identus.castor.core.service.DIDService
import org.hyperledger.identus.mercury.model.{AttachmentDescriptor, DidId}
import org.hyperledger.identus.mercury.protocol.issuecredential.*
import org.hyperledger.identus.pollux.core.model.*
import org.hyperledger.identus.pollux.core.model.presentation.Options
import org.hyperledger.identus.pollux.core.repository.{
CredentialDefinitionRepositoryInMemory,
CredentialRepositoryInMemory,
CredentialStatusListRepositoryInMemory
}
import org.hyperledger.identus.pollux.core.repository.{CredentialDefinitionRepositoryInMemory, CredentialRepositoryInMemory, CredentialStatusListRepositoryInMemory}
import org.hyperledger.identus.pollux.prex.{ClaimFormat, Ldp, PresentationDefinition}
import org.hyperledger.identus.pollux.vc.jwt.*
import org.hyperledger.identus.shared.http.UriResolver
import org.hyperledger.identus.shared.messaging.kafka.InMemoryMessagingService
import org.hyperledger.identus.shared.messaging.WalletIdAndRecordId
import org.hyperledger.identus.shared.models.*
import org.hyperledger.identus.shared.messaging.{MessagingService, MessagingServiceConfig, WalletIdAndRecordId}
import org.hyperledger.identus.shared.models.{WalletAccessContext, WalletId}
import zio.*

Expand All @@ -44,8 +37,8 @@ trait CredentialServiceSpecHelper {
credentialDefinitionServiceLayer,
GenericSecretStorageInMemory.layer,
LinkSecretServiceImpl.layer,
(ZLayer.succeed(100) >>> InMemoryMessagingService.messagingServiceLayer >>>
InMemoryMessagingService.producerLayer[UUID, WalletIdAndRecordId]).orDie,
(MessagingServiceConfig.inMemoryLayer >>> MessagingService.serviceLayer >>>
MessagingService.producerLayer[UUID, WalletIdAndRecordId]).orDie,
CredentialServiceImpl.layer
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package org.hyperledger.identus.pollux.core.service

import com.nimbusds.jose.jwk.*
import org.hyperledger.identus.agent.walletapi.memory.GenericSecretStorageInMemory
import org.hyperledger.identus.castor.core.model.did.DID
import org.hyperledger.identus.mercury.{AgentPeerService, PeerDID}
import org.hyperledger.identus.mercury.model.{AttachmentDescriptor, DidId}
import org.hyperledger.identus.mercury.protocol.presentproof.*
import org.hyperledger.identus.mercury.{AgentPeerService, PeerDID}
import org.hyperledger.identus.pollux.core.model.*
import org.hyperledger.identus.pollux.core.model.error.PresentationError
import org.hyperledger.identus.pollux.core.repository.*
Expand All @@ -14,9 +13,7 @@ import org.hyperledger.identus.pollux.core.service.uriResolvers.ResourceUrlResol
import org.hyperledger.identus.pollux.vc.jwt.*
import org.hyperledger.identus.shared.crypto.KmpSecp256k1KeyOps
import org.hyperledger.identus.shared.http.UriResolver
import org.hyperledger.identus.shared.messaging.kafka.InMemoryMessagingService
import org.hyperledger.identus.shared.messaging.WalletIdAndRecordId
import org.hyperledger.identus.shared.models.*
import org.hyperledger.identus.shared.messaging.{MessagingService, MessagingServiceConfig, WalletIdAndRecordId}
import org.hyperledger.identus.shared.models.{WalletAccessContext, WalletId}
import zio.*

Expand Down Expand Up @@ -46,8 +43,8 @@ trait PresentationServiceSpecHelper {
linkSecretLayer,
PresentationRepositoryInMemory.layer,
CredentialRepositoryInMemory.layer,
(ZLayer.succeed(100) >>> InMemoryMessagingService.messagingServiceLayer >>>
InMemoryMessagingService.producerLayer[UUID, WalletIdAndRecordId]).orDie,
(MessagingServiceConfig.inMemoryLayer >>> MessagingService.serviceLayer >>>
MessagingService.producerLayer[UUID, WalletIdAndRecordId]).orDie,
) ++ defaultWalletLayer

def createIssuer(did: String): Issuer = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.hyperledger.identus.shared.messaging

import zio.{durationInt, Cause, Duration, EnvironmentTag, RIO, Task, URIO, ZIO}
import org.hyperledger.identus.shared.messaging.kafka.{InMemoryMessagingService, ZKafkaMessagingServiceImpl}
import zio.{durationInt, Cause, Duration, EnvironmentTag, RIO, RLayer, Task, URIO, URLayer, ZIO, ZLayer}

import java.time.Instant
trait MessagingService {
Expand Down Expand Up @@ -69,6 +70,30 @@ object MessagingService {
)(implicit kSerde: Serde[K], vSerde: Serde[V]): RIO[HR & Producer[K, V] & MessagingService, Unit] =
consumeWithRetryStrategy(groupId, handler, Seq(RetryStep(topicName, consumerCount, 0.seconds, None)))

val serviceLayer: URLayer[MessagingServiceConfig, MessagingService] =
ZLayer
.service[MessagingServiceConfig]
.flatMap(config =>
if (config.get.kafkaEnabled) ZKafkaMessagingServiceImpl.layer
else InMemoryMessagingService.layer
)

def producerLayer[K: EnvironmentTag, V: EnvironmentTag](implicit
kSerde: Serde[K],
vSerde: Serde[V]
): RLayer[MessagingService, Producer[K, V]] = ZLayer.fromZIO(for {
messagingService <- ZIO.service[MessagingService]
producer <- messagingService.makeProducer[K, V]()
} yield producer)

def consumerLayer[K: EnvironmentTag, V: EnvironmentTag](groupId: String)(implicit
kSerde: Serde[K],
vSerde: Serde[V]
): RLayer[MessagingService, Consumer[K, V]] = ZLayer.fromZIO(for {
messagingService <- ZIO.service[MessagingService]
consumer <- messagingService.makeConsumer[K, V](groupId)
} yield consumer)

}

case class Message[K, V](key: K, value: V, offset: Long, timestamp: Long)
Expand Down
Loading

0 comments on commit bb5ea8f

Please sign in to comment.