Skip to content

Commit

Permalink
Add new watermill handlers that get or refresh entities by properties…
Browse files Browse the repository at this point in the history
… and call another handler

These will be used in webhook handlers instead of calling the property
service directly. The handlers follow the same base logic, just with
"pluggable" ways of retrieving the entity, converting the entity to
properties and which handler to call next.

Related: stacklok#4327
  • Loading branch information
jhrozek committed Sep 19, 2024
1 parent d869bce commit d5b3a6f
Show file tree
Hide file tree
Showing 19 changed files with 1,321 additions and 13 deletions.
184 changes: 184 additions & 0 deletions internal/entities/handlers/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
//
// Copyright 2024 Stacklok, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package handlers contains the message handlers for entities.
package handlers

import (
watermill "github.com/ThreeDotsLabs/watermill/message"
"github.com/rs/zerolog"

"github.com/stacklok/minder/internal/db"
"github.com/stacklok/minder/internal/entities/handlers/message"
"github.com/stacklok/minder/internal/entities/handlers/strategies"
entStrategies "github.com/stacklok/minder/internal/entities/handlers/strategies/entity"
msgStrategies "github.com/stacklok/minder/internal/entities/handlers/strategies/message"
propertyService "github.com/stacklok/minder/internal/entities/properties/service"
"github.com/stacklok/minder/internal/events"
"github.com/stacklok/minder/internal/providers/manager"
)

type handleEntityAndDoBase struct {
evt events.Publisher

refreshEntity strategies.GetEntityStrategy
createMessage strategies.MessageCreateStrategy

handlerName string
forwardHandlerName string

handlerMiddleware []watermill.HandlerMiddleware
}

// Register satisfies the events.Consumer interface.
func (b *handleEntityAndDoBase) Register(r events.Registrar) {
r.Register(b.handlerName, b.handleRefreshEntityAndDo, b.handlerMiddleware...)
}

func (b *handleEntityAndDoBase) handleRefreshEntityAndDo(msg *watermill.Message) error {
ctx := msg.Context()

l := zerolog.Ctx(ctx).With().
Str("messageStrategy", b.createMessage.GetName()).
Str("refreshStrategy", b.refreshEntity.GetName()).
Logger()

// unmarshal the message
entMsg, err := message.ToEntityRefreshAndDo(msg)
if err != nil {
l.Error().Err(err).Msg("error unpacking message")
return nil
}
l.Debug().Msg("message unpacked")

// call refreshEntity
ewp, err := b.refreshEntity.GetEntity(ctx, entMsg)
if err != nil {
l.Error().Err(err).Msg("error refreshing entity")
// do not return error in the handler, just log it
// we might want to special-case retrying /some/ errors specifically those from the
// provider, but for now, just log it
return nil
}

if ewp != nil {
l.Debug().
Str("entityID", ewp.Entity.ID.String()).
Str("providerID", ewp.Entity.ProviderID.String()).
Msg("entity refreshed")
} else {
l.Debug().Msg("entity not retrieved")
}

nextMsg, err := b.createMessage.CreateMessage(ctx, ewp)
if err != nil {
l.Error().Err(err).Msg("error creating message")
return nil
}

// If nextMsg is nil, it means we don't need to publish anything (entity not found)
if nextMsg != nil {
l.Debug().Msg("publishing message")
if err := b.evt.Publish(b.forwardHandlerName, nextMsg); err != nil {
l.Error().Err(err).Msg("error publishing message")
return nil
}
} else {
l.Info().Msg("no message to publish")
}

return nil
}

// NewRefreshEntityAndEvaluateHandler creates a new handler that refreshes an entity and evaluates it.
func NewRefreshEntityAndEvaluateHandler(
evt events.Publisher,
store db.Store,
propSvc propertyService.PropertiesService,
provMgr manager.ProviderManager,
handlerMiddleware ...watermill.HandlerMiddleware,
) events.Consumer {
return &handleEntityAndDoBase{
evt: evt,

refreshEntity: entStrategies.NewRefreshEntityByUpstreamIDStrategy(propSvc, provMgr, store),
createMessage: msgStrategies.NewToEntityInfoWrapper(store, propSvc, provMgr),

handlerName: events.TopicQueueRefreshEntityAndEvaluate,
forwardHandlerName: events.TopicQueueEntityEvaluate,

handlerMiddleware: handlerMiddleware,
}
}

// NewGetEntityAndDeleteHandler creates a new handler that gets an entity and deletes it.
func NewGetEntityAndDeleteHandler(
evt events.Publisher,
propSvc propertyService.PropertiesService,
handlerMiddleware ...watermill.HandlerMiddleware,
) events.Consumer {
return &handleEntityAndDoBase{
evt: evt,

refreshEntity: entStrategies.NewGetEntityByUpstreamIDStrategy(propSvc),
createMessage: msgStrategies.NewToMinderEntity(),

handlerName: events.TopicQueueGetEntityAndDelete,
forwardHandlerName: events.TopicQueueReconcileEntityDelete,

handlerMiddleware: handlerMiddleware,
}
}

// NewAddOriginatingEntityHandler creates a new handler that adds an originating entity.
func NewAddOriginatingEntityHandler(
evt events.Publisher,
store db.Store,
propSvc propertyService.PropertiesService,
provMgr manager.ProviderManager,
handlerMiddleware ...watermill.HandlerMiddleware,
) events.Consumer {
return &handleEntityAndDoBase{
evt: evt,

refreshEntity: entStrategies.NewAddOriginatingEntityStrategy(propSvc, provMgr, store),
createMessage: msgStrategies.NewToEntityInfoWrapper(store, propSvc, provMgr),

handlerName: events.TopicQueueOriginatingEntityAdd,
forwardHandlerName: events.TopicQueueEntityEvaluate,

handlerMiddleware: handlerMiddleware,
}
}

// NewRemoveOriginatingEntityHandler creates a new handler that removes an originating entity.
func NewRemoveOriginatingEntityHandler(
evt events.Publisher,
store db.Store,
propSvc propertyService.PropertiesService,
provMgr manager.ProviderManager,
handlerMiddleware ...watermill.HandlerMiddleware,
) events.Consumer {
return &handleEntityAndDoBase{
evt: evt,

refreshEntity: entStrategies.NewDelOriginatingEntityStrategy(propSvc, provMgr, store),
createMessage: msgStrategies.NewCreateEmpty(),

handlerName: events.TopicQueueOriginatingEntityDelete,

handlerMiddleware: handlerMiddleware,
}
}
176 changes: 176 additions & 0 deletions internal/entities/handlers/handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
//
// Copyright 2024 Stacklok, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package handlers

import (
"database/sql"
"testing"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"

mockdb "github.com/stacklok/minder/database/mock"
"github.com/stacklok/minder/internal/engine/entities"
message2 "github.com/stacklok/minder/internal/entities/handlers/message"
"github.com/stacklok/minder/internal/entities/models"
"github.com/stacklok/minder/internal/entities/properties"
"github.com/stacklok/minder/internal/entities/properties/service"
mock_service "github.com/stacklok/minder/internal/entities/properties/service/mock"
stubeventer "github.com/stacklok/minder/internal/events/stubs"
ghprops "github.com/stacklok/minder/internal/providers/github/properties"
"github.com/stacklok/minder/internal/providers/manager"
mock_manager "github.com/stacklok/minder/internal/providers/manager/mock"
minderv1 "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1"
)

func TestRefreshEntityAndDoHandler_HandleRefreshEntityAndEval(t *testing.T) {
t.Parallel()

tests := []struct {
name string
lookupPropMap map[string]any
entPropMap map[string]any
nextHandler string
providerHint string
ewp *models.EntityWithProperties
setupMocks func(*gomock.Controller, *models.EntityWithProperties) (service.PropertiesService, manager.ProviderManager)
expectedError string
expectedPublish bool
}{
{
name: "successful refresh and publish of a repo",
lookupPropMap: map[string]any{
properties.PropertyUpstreamID: "123",
},
ewp: &models.EntityWithProperties{
Entity: models.EntityInstance{
ID: uuid.New(),
Type: minderv1.Entity_ENTITY_REPOSITORIES,
Name: "testorg/testrepo",
ProviderID: uuid.New(),
ProjectID: uuid.New(),
},
},
nextHandler: "call.me.next",
providerHint: "github",
entPropMap: map[string]any{
properties.PropertyName: "testorg/testrepo",
ghprops.RepoPropertyName: "testrepo",
ghprops.RepoPropertyOwner: "testorg",
ghprops.RepoPropertyId: int64(123),
properties.RepoPropertyIsPrivate: false,
properties.RepoPropertyIsFork: false,
},
setupMocks: func(ctrl *gomock.Controller, ewp *models.EntityWithProperties) (service.PropertiesService, manager.ProviderManager) {
mockPropSvc := mock_service.NewMockPropertiesService(ctrl)
mockProvMgr := mock_manager.NewMockProviderManager(ctrl)

protoEnt, err := ghprops.RepoV1FromProperties(ewp.Properties)
require.NoError(t, err)

mockPropSvc.EXPECT().
EntityWithPropertiesByUpstreamHint(gomock.Any(), ewp.Entity.Type, gomock.Any(), gomock.Any(), gomock.Any()).
Return(ewp, nil)
mockPropSvc.EXPECT().
RetrieveAllPropertiesForEntity(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil)
mockPropSvc.EXPECT().
EntityWithPropertiesAsProto(gomock.Any(), ewp, mockProvMgr).
Return(protoEnt, nil)

return mockPropSvc, mockProvMgr
},
expectedPublish: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

ctrl := gomock.NewController(t)
defer ctrl.Finish()

getByProps, err := properties.NewProperties(tt.lookupPropMap)
require.NoError(t, err)

handlerMsg := message.NewMessage(uuid.New().String(), nil)
entityMsg := message2.NewEntityRefreshAndDoMessage().
WithEntity(minderv1.Entity_ENTITY_REPOSITORIES, getByProps).
WithProviderImplementsHint(tt.providerHint)

err = entityMsg.ToMessage(handlerMsg)
require.NoError(t, err)

entProps, err := properties.NewProperties(tt.entPropMap)
require.NoError(t, err)
tt.ewp.Properties = entProps

mockPropSvc, mockProvMgr := tt.setupMocks(ctrl, tt.ewp)

mockStore := mockdb.NewMockStore(ctrl)
tx := sql.Tx{}
mockStore.EXPECT().
BeginTransaction().
Return(&tx, nil)
mockStore.EXPECT().
Commit(&tx).
Return(nil)
mockStore.EXPECT().
Rollback(&tx).
Return(nil)
mockStore.EXPECT().
GetQuerierWithTransaction(&tx).
Return(mockStore)

stubEventer := &stubeventer.StubEventer{}
handler := NewRefreshEntityAndEvaluateHandler(stubEventer, mockStore, mockPropSvc, mockProvMgr)

refreshHandlerStruct, ok := handler.(*handleEntityAndDoBase)
require.True(t, ok)
err = refreshHandlerStruct.handleRefreshEntityAndDo(handlerMsg)

if tt.expectedError != "" {
assert.Error(t, err)
assert.Contains(t, err.Error(), tt.expectedError)
} else {
assert.NoError(t, err)
}

if !tt.expectedPublish {
assert.Equal(t, 0, len(stubEventer.Sent), "Expected no publish calls")
return
}

assert.Equal(t, 1, len(stubEventer.Sent), "Expected one publish call")
sentMsg := stubEventer.Sent[0]
eiw, err := entities.ParseEntityEvent(sentMsg)
require.NoError(t, err)
require.NotNil(t, eiw)

assert.Equal(t, tt.ewp.Entity.Type, eiw.Type)
assert.Equal(t, tt.ewp.Entity.ProjectID, eiw.ProjectID)
assert.Equal(t, tt.ewp.Entity.ProviderID, eiw.ProviderID)

pbrepo, ok := eiw.Entity.(*minderv1.Repository)
require.True(t, ok)
assert.Equal(t, tt.entPropMap[ghprops.RepoPropertyName].(string), pbrepo.Name)
})
}
}
Loading

0 comments on commit d5b3a6f

Please sign in to comment.