Skip to content

Commit

Permalink
Merge pull request #52 from dfds/feature/2596-bug-confluent-gateway-c…
Browse files Browse the repository at this point in the history
…rash-loops-when-ensuring-service-account

made ensure service account process a bit more robust
  • Loading branch information
kralle333 authored Feb 12, 2024
2 parents f7226a3 + a679473 commit 572bd65
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 7 deletions.
110 changes: 110 additions & 0 deletions src/functional_tests/create_serviceaccount_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,42 @@ func setupCreateServiceAccountHttpMock(processInput serviceaccount.ProcessInput,

}

func setupCreateServiceAccountFailureExistingDisplayNameHttpMock(processInput serviceaccount.ProcessInput) {

payload := string(`{
"display_name": "` + processInput.CapabilityId + `",
"description": "` + "Created by Confluent Gateway" + `"
}`)

gock.New(testerApp.config.ConfluentCloudApiUrl).
Post("/iam/v2/service-accounts").
BodyString(payload).
BasicAuth(testerApp.config.ConfluentCloudApiUserName, testerApp.config.ConfluentCloudApiPassword).
Reply(409)

}

func setupListServiceAccountsHttpMock(serviceAccountId models.ServiceAccountId, displayName string) {
serviceAccounts := fmt.Sprintf(`{
"data": [
{
"id": "%s",
"display_name": "%s"
},
{
"id": "some-other-id",
"display_name":"hey there"
}
]
}`, serviceAccountId, displayName)

gock.New(testerApp.config.ConfluentCloudApiUrl).
Get("/iam/v2/service-accounts").
BasicAuth(testerApp.config.ConfluentCloudApiUserName, testerApp.config.ConfluentCloudApiPassword).
Reply(200).
BodyString(serviceAccounts)
}

func setupGetInternalConfluentUsersHttpMock(serviceAccountId models.ServiceAccountId) {

s := struct {
Expand Down Expand Up @@ -211,3 +247,77 @@ func TestCreateServiceAccountProcess(t *testing.T) {

helpers.RequireNoUnmatchedGockMocks(t)
}

func TestCreateServiceAccountWithExistingServiceAccountNameProcess(t *testing.T) {
createServiceAccountVariables := helpers.NewTestVariables("create_serviceaccount_test")
userAccountId := models.MakeUserAccountId(internalConfluentUserId)
defer func() {
testerApp.db.RemoveAllOutboxEntries()
testerApp.db.RemoveAllServiceAccounts()
}()
outboxFactory, err := messaging.ConfigureOutbox(testerApp.logger,
messaging.RegisterMessage(testerApp.config.TopicNameKafkaClusterAccessGranted, "cluster-access-granted", &serviceaccount.ServiceAccountAccessGranted{}),
)
require.NoError(t, err)

process := serviceaccount.NewProcess(testerApp.logger,
testerApp.db,
testerApp.confluentClient,
*testerApp.vaultClient,
func(repository serviceaccount.OutboxRepository) serviceaccount.Outbox {
return outboxFactory(repository)
})

input := serviceaccount.ProcessInput{
CapabilityId: createServiceAccountVariables.CapabilityId,
ClusterId: testerApp.dbSeedVariables.DevelopmentClusterId,
}

createdClusterApiKey := models.ApiKey{
Username: "new_cluster_apikey_username",
Password: "new_cluster_apikey_password",
}
createdSchemaRegistryApiKey := models.ApiKey{
Username: "new_schema_registry_apikey_username",
Password: "new_schema_registry_apikey_password",
}

account, err := testerApp.db.GetServiceAccount(input.CapabilityId)
require.Error(t, err)
require.Nil(t, account)

// a lot of mocking going on here, might indicate that this process is doing too much

//ensureServiceAccountStep:
setupCreateServiceAccountFailureExistingDisplayNameHttpMock(input) // First we create a service account that fails
setupListServiceAccountsHttpMock(createServiceAccountVariables.ServiceAccountId, string(input.CapabilityId))
setupGetInternalConfluentUsersHttpMock(createServiceAccountVariables.ServiceAccountId) // Then we check if we have a matching user for that service account

//ensureServiceAccountAclStep:
setupCreateAclHttpMock(createServiceAccountVariables.CapabilityId, input.ClusterId, userAccountId) // Then we create ACLs for the service account

//ensureServiceAccountClusterAccessStep
setupListKeysHTTPMock(string(input.ClusterId), createServiceAccountVariables.ServiceAccountId, 0) // Then we check if the API key was created
setupCreateApiKeyMock(string(input.ClusterId), createServiceAccountVariables.ServiceAccountId, createdClusterApiKey.Username, createdClusterApiKey.Password) // Then we create an API key for the cluster

//ensureServiceAccountSchemaRegistryAccessStep
setupListKeysHTTPMock(string(testerApp.dbSeedVariables.DevelopmentSchemaRegistryId), createServiceAccountVariables.ServiceAccountId, 0) // Check if the api key has already been created
setupCreateApiKeyMock(string(testerApp.dbSeedVariables.DevelopmentSchemaRegistryId), createServiceAccountVariables.ServiceAccountId, createdSchemaRegistryApiKey.Username, createdSchemaRegistryApiKey.Password) // Then we create an API key for the schema registry
setupRoleBindingHTTPMock(string(createServiceAccountVariables.ServiceAccountId), testerApp.dbSeedVariables.GetDevelopmentClusterValues()) // Then we create a role binding for the service account

err = process.Process(context.Background(), input)
require.NoError(t, err)

// check outbox
outboxEntries, err := testerApp.db.GetAllOutboxEntries()
require.NoError(t, err)
require.Len(t, outboxEntries, 1)
require.Equal(t, outboxEntries[0].Topic, testerApp.config.TopicNameKafkaClusterAccessGranted)

// Check if we actually got the service account in db
account, err = testerApp.db.GetServiceAccount(input.CapabilityId)
require.NoError(t, err)
require.Equal(t, account.Id, createServiceAccountVariables.ServiceAccountId)

helpers.RequireNoUnmatchedGockMocks(t)
}
44 changes: 40 additions & 4 deletions src/internal/confluent/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type CloudApiAccess struct {
var ErrSchemaRegistryIdIsEmpty = errors.New("schema registry id is not found, manually add id to cluster table")
var ErrMissingSchemaRegistryIds = errors.New("unable to create schema registry role binding: cluster table has any or all missing ids: organization_id, environment_id, schema_registry_id")
var ErrApiKeyNotFoundForDeletion = errors.New("unable to delete api key: key not found in confluent")
var ErrFoundExistingServiceAccount = errors.New("unable to create service account, service name already in use")
var ErrNoServiceAccountFound = errors.New("unable to find requested service account")

func (a *CloudApiAccess) ApiKey() models.ApiKey {
return models.ApiKey{Username: a.Username, Password: a.Password}
Expand All @@ -48,6 +50,14 @@ type createServiceAccountResponse struct {
Id string `json:"id"`
}

type listServiceAccountsResponse struct {
Data []struct {
ID string `json:"id"`
DisplayName string `json:"display_name"`
Description string `json:"description"`
} `json:"data"`
}

type createApiKeyResponse struct {
Id string `json:"id"`
Spec struct {
Expand Down Expand Up @@ -119,14 +129,13 @@ func (c *Client) CreateServiceAccount(ctx context.Context, name string, descript
}`

response, err := c.post(ctx, url, payload, c.cloudApiAccess.ApiKey())
if err != nil {
return "", err
if response != nil && response.StatusCode == 409 {
return "", ErrFoundExistingServiceAccount
}
defer response.Body.Close()

if err != nil {
return "", err
}
defer response.Body.Close()

serviceAccountResponse := &createServiceAccountResponse{}
derr := json.NewDecoder(response.Body).Decode(serviceAccountResponse)
Expand All @@ -137,6 +146,33 @@ func (c *Client) CreateServiceAccount(ctx context.Context, name string, descript
return models.ServiceAccountId(serviceAccountResponse.Id), nil
}

func (c *Client) GetServiceAccount(ctx context.Context, displayName string) (models.ServiceAccountId, error) {
url := c.cloudApiAccess.ApiEndpoint + "/iam/v2/service-accounts"

response, err := c.get(ctx, url, c.cloudApiAccess.ApiKey())
if err != nil {
return "", err
}
defer response.Body.Close()
if err != nil {
return "", err
}

serviceAccountResponse := &listServiceAccountsResponse{}
decodeErr := json.NewDecoder(response.Body).Decode(serviceAccountResponse)
if decodeErr != nil {
return "", decodeErr
}

for _, accountData := range serviceAccountResponse.Data {
if accountData.DisplayName == displayName {
return models.ServiceAccountId(accountData.ID), nil
}
}

return "", ErrNoServiceAccountFound
}

func (c *Client) post(ctx context.Context, url string, payload string, apiKey models.ApiKey) (*http.Response, error) {
request, _ := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewBuffer([]byte(payload)))
request.Header.Set("Content-Type", "application/json")
Expand Down
16 changes: 15 additions & 1 deletion src/internal/serviceaccount/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ type accountService struct {
repo serviceAccountRepository
}

const defaultServiceAccountDescription = "Created by Confluent Gateway"

type serviceAccountRepository interface {
GetServiceAccount(capabilityId models.CapabilityId) (*models.ServiceAccount, error)
CreateServiceAccount(serviceAccount *models.ServiceAccount) error
Expand All @@ -35,10 +37,22 @@ func (h *accountService) GetServiceAccount(capabilityId models.CapabilityId) (*m
}

func (h *accountService) CreateServiceAccount(capabilityId models.CapabilityId, clusterId models.ClusterId) error {
serviceAccountId, err := h.confluent.CreateServiceAccount(h.context, string(capabilityId), "Created by Confluent Gateway")
serviceAccountId, err := h.confluent.CreateServiceAccount(h.context, string(capabilityId), defaultServiceAccountDescription)
if err != nil {
return err
}
return h.CreateServiceAccountDBLink(capabilityId, clusterId, serviceAccountId)
}

func (h *accountService) FindServiceAccountAndCreateDBLink(capabilityId models.CapabilityId, clusterId models.ClusterId) error {
serviceAccountId, err := h.confluent.GetServiceAccount(h.context, string(capabilityId))
if err != nil {
return err
}
return h.CreateServiceAccountDBLink(capabilityId, clusterId, serviceAccountId)
}

func (h *accountService) CreateServiceAccountDBLink(capabilityId models.CapabilityId, clusterId models.ClusterId, serviceAccountId models.ServiceAccountId) error {

users, err := h.confluent.GetConfluentInternalUsers(h.context)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions src/internal/serviceaccount/confluent.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

type Confluent interface {
CreateServiceAccount(ctx context.Context, name string, description string) (models.ServiceAccountId, error)
GetServiceAccount(ctx context.Context, displayName string) (models.ServiceAccountId, error)
CreateACLEntry(ctx context.Context, clusterId models.ClusterId, userAccountId models.UserAccountId, entry models.AclDefinition) error
CreateClusterApiKey(ctx context.Context, clusterId models.ClusterId, serviceAccountId models.ServiceAccountId) (models.ApiKey, error)
CreateSchemaRegistryApiKey(ctx context.Context, clusterId models.ClusterId, serviceAccountId models.ServiceAccountId) (models.ApiKey, error)
Expand Down
5 changes: 5 additions & 0 deletions src/internal/serviceaccount/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func NewStepContext(logger logging.Logger, account AccountService, vault VaultSe
type AccountService interface {
GetServiceAccount(models.CapabilityId) (*models.ServiceAccount, error)
CreateServiceAccount(models.CapabilityId, models.ClusterId) error
FindServiceAccountAndCreateDBLink(models.CapabilityId, models.ClusterId) error
GetOrCreateClusterAccess(models.CapabilityId, models.ClusterId) (*models.ClusterAccess, error)
GetClusterAccess(models.CapabilityId, models.ClusterId) (*models.ClusterAccess, error)
CreateAclEntry(models.ClusterId, models.UserAccountId, *models.AclEntry) error
Expand Down Expand Up @@ -71,6 +72,10 @@ func (c *StepContext) CreateServiceAccount() error {
return c.account.CreateServiceAccount(c.input.CapabilityId, c.input.ClusterId)
}

func (c *StepContext) CreateServiceAccountDbLink() error {
return c.account.FindServiceAccountAndCreateDBLink(c.input.CapabilityId, c.input.ClusterId)
}

func (c *StepContext) GetInputCapabilityId() models.CapabilityId {
return c.input.CapabilityId
}
Expand Down
4 changes: 4 additions & 0 deletions src/internal/serviceaccount/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type EnsureServiceAccountStepRequirement interface {
logger
HasServiceAccount() bool
CreateServiceAccount() error
CreateServiceAccountDbLink() error
GetInputCapabilityId() models.CapabilityId
}

Expand All @@ -88,6 +89,9 @@ func ensureServiceAccountStep(step *StepContext) error {
}

err := step.CreateServiceAccount()
if errors.Is(confluent.ErrFoundExistingServiceAccount, err) {
return step.CreateServiceAccountDbLink()
}
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions src/internal/storage/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
)

var ErrTopicNotFound = errors.New("requested topic not found")
var ErrServiceAccountNotFound = errors.New("requested service account not found")

type Database struct {
db *gorm.DB
Expand Down Expand Up @@ -115,8 +116,8 @@ func (d *Database) GetServiceAccount(capabilityId models.CapabilityId) (*models.
Error

if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) { // TODO: do not suppress error, but instead return custom error
return nil, nil
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, ErrServiceAccountNotFound
}

return nil, err
Expand Down

0 comments on commit 572bd65

Please sign in to comment.