Skip to content

Commit

Permalink
Merge pull request #45 from dfds/feature/2226-make-sure-old-capabilit…
Browse files Browse the repository at this point in the history
…ies-have-access-to-schema-registry

copy pasted schema registry access into schema registration process
  • Loading branch information
kralle333 authored Dec 14, 2023
2 parents 3dc33cd + 8a7c7aa commit 1764a4d
Show file tree
Hide file tree
Showing 8 changed files with 367 additions and 28 deletions.
2 changes: 1 addition & 1 deletion src/cmd/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func main() {
return outboxFactory(repository)
})
deleteTopicProcess := del.NewProcess(logger, db, confluentClient, func(repository del.OutboxRepository) del.Outbox { return outboxFactory(repository) })
addSchemaProcess := schema.NewProcess(logger, db, confluentClient, func(repository schema.OutboxRepository) schema.Outbox { return outboxFactory(repository) })
addSchemaProcess := schema.NewProcess(logger, db, confluentClient, awsClient, func(repository schema.OutboxRepository) schema.Outbox { return outboxFactory(repository) })
consumer := Must(messaging.ConfigureConsumer(logger, config.KafkaBroker, config.KafkaGroupId,
messaging.WithCredentials(config.CreateConsumerCredentials()),
messaging.RegisterMessageHandler(config.TopicNameSelfService, "topic_requested", create.NewTopicRequestedHandler(createTopicProcess), &create.TopicRequested{}),
Expand Down
53 changes: 38 additions & 15 deletions src/functional_tests/create_schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
schema "github.com/dfds/confluent-gateway/internal/schema"
"github.com/dfds/confluent-gateway/messaging"
"github.com/h2non/gock"
uuid "github.com/satori/go.uuid"
"github.com/stretchr/testify/require"
"testing"
"time"
Expand Down Expand Up @@ -53,7 +54,43 @@ func TestCreateSchemaProcess(t *testing.T) {
)
require.NoError(t, err)

process := schema.NewProcess(testerApp.logger, testerApp.db, testerApp.confluentClient, func(repository schema.OutboxRepository) schema.Outbox {
err = testerApp.db.CreateTopic(&models.Topic{
Id: createSchemaVariables.TopicId,
CapabilityId: createSchemaVariables.CapabilityId,
ClusterId: testerApp.dbSeedVariables.DevelopmentClusterId,
Name: createSchemaVariables.TopicName,
CreatedAt: time.Now(),
})
require.NoError(t, err)

someUserId := models.MakeUserAccountId(123)
someServiceAccountID := models.ServiceAccountId(uuid.NewV4().String())
clusterAccess := models.ClusterAccess{
Id: uuid.NewV4(),
ClusterId: testerApp.dbSeedVariables.DevelopmentClusterId,
ServiceAccountId: someServiceAccountID,
UserAccountId: someUserId,
Acl: nil,
CreatedAt: time.Time{},
}
err = testerApp.db.CreateServiceAccount(&models.ServiceAccount{
Id: someServiceAccountID,
UserAccountId: someUserId,
CapabilityId: createSchemaVariables.CapabilityId,
ClusterAccesses: []models.ClusterAccess{
clusterAccess,
},
CreatedAt: time.Time{},
})

err = testerApp.db.CreateClusterAccess(&clusterAccess)

//ensureServiceAccountSchemaRegistryAccessStep
setupListKeysHTTPMock(string(testerApp.dbSeedVariables.DevelopmentSchemaRegistryId), someServiceAccountID, 0) // Check if the api key has already been created
setupCreateApiKeyMock(string(testerApp.dbSeedVariables.DevelopmentSchemaRegistryId), someServiceAccountID, "username", "p4ssword") // Then we create an API key for the schema registry
setupRoleBindingHTTPMock(string(someServiceAccountID), testerApp.dbSeedVariables.GetDevelopmentClusterValues()) // Then we create a role binding for the service account

process := schema.NewProcess(testerApp.logger, testerApp.db, testerApp.confluentClient, *testerApp.vaultClient, func(repository schema.OutboxRepository) schema.Outbox {
return outboxFactory(repository)
})

Expand All @@ -65,20 +102,6 @@ func TestCreateSchemaProcess(t *testing.T) {
Schema: "test-schema",
}

err = process.Process(context.Background(), input)
// TODO: Although the topic does not exist, the process ignores that and continues.
//require.ErrorIs(t, err, storage.ErrTopicNotFound)
require.NoError(t, err)

err = testerApp.db.CreateTopic(&models.Topic{
Id: createSchemaVariables.TopicId,
CapabilityId: createSchemaVariables.CapabilityId,
ClusterId: testerApp.dbSeedVariables.DevelopmentClusterId,
Name: createSchemaVariables.TopicName,
CreatedAt: time.Now(),
})
require.NoError(t, err)

setupCreateSchemaHttpMock(input, createSchemaVariables.TopicName, testerApp.dbSeedVariables)
err = process.Process(context.Background(), input)
require.NoError(t, err)
Expand Down
71 changes: 71 additions & 0 deletions src/internal/schema/account.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package schema

import (
"context"
"fmt"
"github.com/dfds/confluent-gateway/internal/models"
)

type accountService struct {
context context.Context
confluent Confluent
repo serviceAccountRepository
}

type serviceAccountRepository interface {
GetServiceAccount(capabilityId models.CapabilityId) (*models.ServiceAccount, error)
}

func NewSchemaAccountService(ctx context.Context, confluent Confluent, repo serviceAccountRepository) *accountService {
return &accountService{
context: ctx,
confluent: confluent,
repo: repo,
}
}

func (h *accountService) GetServiceAccount(capabilityId models.CapabilityId) (*models.ServiceAccount, error) {
return h.repo.GetServiceAccount(capabilityId)
}

func (h *accountService) DeleteSchemaRegistryApiKey(clusterAccess *models.ClusterAccess) error {
return h.confluent.DeleteSchemaRegistryApiKey(h.context, clusterAccess.ClusterId, clusterAccess.ServiceAccountId)
}

func (h *accountService) GetClusterAccess(capabilityId models.CapabilityId, clusterId models.ClusterId) (*models.ClusterAccess, error) {
serviceAccount, err := h.repo.GetServiceAccount(capabilityId)
if err != nil {
return nil, err
}
if serviceAccount == nil {
return nil, fmt.Errorf("no service account for capability '%s' found", capabilityId)
}

clusterAccess, hasClusterAccess := serviceAccount.TryGetClusterAccess(clusterId)

if !hasClusterAccess {
return nil, fmt.Errorf("no cluster access for service account '%s' found", serviceAccount.Id)
}
return clusterAccess, nil
}

func (h *accountService) CountSchemaRegistryApiKeys(clusterAccess *models.ClusterAccess) (int, error) {
keyCount, err := h.confluent.CountSchemaRegistryApiKeys(h.context, clusterAccess.ServiceAccountId, clusterAccess.ClusterId)
if err != nil {
return 0, err
}
return keyCount, nil
}

func (h *accountService) CreateSchemaRegistryApiKey(clusterAccess *models.ClusterAccess) (models.ApiKey, error) {
return h.confluent.CreateSchemaRegistryApiKey(h.context, clusterAccess.ClusterId, clusterAccess.ServiceAccountId)

}

func (h *accountService) CreateServiceAccountRoleBinding(clusterAccess *models.ClusterAccess) error {
err := h.confluent.CreateServiceAccountRoleBinding(h.context, clusterAccess.ServiceAccountId, clusterAccess.ClusterId)
if err != nil {
return err
}
return nil
}
16 changes: 16 additions & 0 deletions src/internal/schema/confluent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package schema

import (
"context"

"github.com/dfds/confluent-gateway/internal/models"
)

type Confluent interface {
CreateServiceAccount(ctx context.Context, name string, description string) (models.ServiceAccountId, error)
CreateSchemaRegistryApiKey(ctx context.Context, clusterId models.ClusterId, serviceAccountId models.ServiceAccountId) (models.ApiKey, error)
CreateServiceAccountRoleBinding(ctx context.Context, serviceAccount models.ServiceAccountId, clusterId models.ClusterId) error
CountSchemaRegistryApiKeys(ctx context.Context, clusterAccess models.ServiceAccountId, clusterId models.ClusterId) (int, error)
DeleteSchemaRegistryApiKey(ctx context.Context, clusterId models.ClusterId, serviceAccountId models.ServiceAccountId) error
RegisterSchema(ctx context.Context, clusterId models.ClusterId, subject string, schema string) error
}
83 changes: 81 additions & 2 deletions src/internal/schema/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package schema

import (
"context"
"fmt"
"github.com/dfds/confluent-gateway/internal/models"
"github.com/dfds/confluent-gateway/logging"
"github.com/dfds/confluent-gateway/messaging"
Expand All @@ -10,13 +11,29 @@ import (
type StepContext struct {
logger logging.Logger
ctx context.Context
account AccountService
vault VaultService
state *models.SchemaProcess
registry SchemaRegistry
input ProcessInput
outbox Outbox
topic TopicService
}
type AccountService interface {
GetServiceAccount(models.CapabilityId) (*models.ServiceAccount, error)
GetClusterAccess(models.CapabilityId, models.ClusterId) (*models.ClusterAccess, error)
CreateSchemaRegistryApiKey(clusterAccess *models.ClusterAccess) (models.ApiKey, error)
CreateServiceAccountRoleBinding(clusterAccess *models.ClusterAccess) error
CountSchemaRegistryApiKeys(clusterAccess *models.ClusterAccess) (int, error)
DeleteSchemaRegistryApiKey(clusterAccess *models.ClusterAccess) error
}

func NewStepContext(logger logging.Logger, ctx context.Context, schema *models.SchemaProcess, registry SchemaRegistry, outbox Outbox, account AccountService, vault VaultService, topic TopicService) *StepContext {
return &StepContext{logger: logger, ctx: ctx, state: schema, registry: registry, outbox: outbox, account: account, vault: vault, topic: topic}
}

func NewStepContext(logger logging.Logger, ctx context.Context, schema *models.SchemaProcess, registry SchemaRegistry, outbox Outbox) *StepContext {
return &StepContext{logger: logger, ctx: ctx, state: schema, registry: registry, outbox: outbox}
type TopicService interface {
GetTopic(string) (*models.Topic, error)
}

type Outbox interface {
Expand Down Expand Up @@ -55,3 +72,65 @@ func (c *StepContext) RaiseSchemaRegistrationFailed(reason string) error {
}
return c.outbox.Produce(event)
}

func (c *StepContext) LogDebug(format string, args ...string) {
c.logger.Debug(format, args...)
}

func (c *StepContext) LogError(err error, format string, args ...string) {
c.logger.Error(err, format, args...)
}

func (c *StepContext) LogWarning(format string, args ...string) {
c.logger.Warning(format, args...)
}

func (c *StepContext) HasServiceAccount(capabilityId models.CapabilityId) bool {
account, err := c.account.GetServiceAccount(capabilityId)
if err != nil {
c.LogError(err, fmt.Sprintf("encountered error when checking if ServiceAccount exists for CapabilityId %s", capabilityId))
return false
}
return account != nil
}

func (c *StepContext) GetClusterAccess() (*models.ClusterAccess, error) {
topic, err := c.topic.GetTopic(c.state.TopicId)
if err != nil {
return nil, err
}
return c.account.GetClusterAccess(topic.CapabilityId, topic.ClusterId)
}

func (c *StepContext) HasSchemaRegistryApiKey(clusterAccess *models.ClusterAccess) (bool, error) {
count, err := c.account.CountSchemaRegistryApiKeys(clusterAccess)
return count > 0, err
}

func (c *StepContext) HasSchemaRegistryApiKeyInVault(clusterAccess *models.ClusterAccess) (bool, error) {
topic, err := c.topic.GetTopic(c.state.TopicId)
if err != nil {
return false, err
}
return c.vault.QuerySchemaRegistryApiKey(topic.CapabilityId, clusterAccess.ClusterId)
}

func (c *StepContext) CreateSchemaRegistryApiKeyAndStoreInVault(clusterAccess *models.ClusterAccess, shouldOverwriteKey bool) error {
newKey, err := c.account.CreateSchemaRegistryApiKey(clusterAccess)
if err != nil {
return err
}
topic, err := c.topic.GetTopic(c.state.TopicId)
if err != nil {
return err
}
return c.vault.StoreSchemaRegistryApiKey(topic.CapabilityId, clusterAccess.ClusterId, newKey, shouldOverwriteKey)
}

func (c *StepContext) DeleteSchemaRegistryApiKey(clusterAccess *models.ClusterAccess) error {
return c.account.DeleteSchemaRegistryApiKey(clusterAccess)
}

func (c *StepContext) CreateServiceAccountRoleBinding(clusterAccess *models.ClusterAccess) error {
return c.account.CreateServiceAccountRoleBinding(clusterAccess)
}
Loading

0 comments on commit 1764a4d

Please sign in to comment.