diff --git a/internal/entities/handlers/handler.go b/internal/entities/handlers/handler.go new file mode 100644 index 0000000000..82eb787707 --- /dev/null +++ b/internal/entities/handlers/handler.go @@ -0,0 +1,652 @@ +// +// Copyright 2024 Stacklok, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package handlers contains the message handlers for entities. +package handlers + +import ( + "context" + "database/sql" + "errors" + "fmt" + + "github.com/ThreeDotsLabs/watermill/message" + "github.com/google/uuid" + "github.com/rs/zerolog" + + "github.com/stacklok/minder/internal/db" + "github.com/stacklok/minder/internal/engine/entities" + "github.com/stacklok/minder/internal/entities/models" + "github.com/stacklok/minder/internal/entities/properties" + propertyService "github.com/stacklok/minder/internal/entities/properties/service" + "github.com/stacklok/minder/internal/events" + ghprop "github.com/stacklok/minder/internal/providers/github/properties" + "github.com/stacklok/minder/internal/providers/manager" + "github.com/stacklok/minder/internal/reconcilers/messages" + minderv1 "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1" +) + +type messageCreateStrategy interface { + CreateMessage( + ctx context.Context, ewp *models.EntityWithProperties, + ) (*message.Message, error) + GetName() string +} + +type toEntityInfoWrapper struct { + store db.Store + propSvc propertyService.PropertiesService + provMgr manager.ProviderManager +} + +func newToEntityInfoWrapper( + store db.Store, + propSvc propertyService.PropertiesService, + provMgr manager.ProviderManager, +) *toEntityInfoWrapper { + return &toEntityInfoWrapper{ + store: store, + propSvc: propSvc, + provMgr: provMgr, + } +} + +func (c *toEntityInfoWrapper) CreateMessage( + ctx context.Context, ewp *models.EntityWithProperties, +) (*message.Message, error) { + pbEnt, err := c.propSvc.EntityWithPropertiesAsProto(ctx, ewp, c.provMgr) + if err != nil { + return nil, fmt.Errorf("error converting entity to protobuf: %w", err) + } + + m := message.NewMessage(uuid.New().String(), nil) + + eiw := entities.NewEntityInfoWrapper(). + WithProjectID(ewp.Entity.ProjectID). + WithProviderID(ewp.Entity.ProviderID). + WithProtoMessage(ewp.Entity.Type, pbEnt). + WithID(ewp.Entity.Type, ewp.Entity.ID) + + // in case the entity originated from another entity, add that information as well. + // the property service does not provide this information (should it?) so we need to fetch it from the store. + // for now we could have hardcoded the entity type as everything originates from a repository, + // but this is more flexible. + if ewp.Entity.OriginatedFrom != uuid.Nil { + dbEnt, err := c.store.GetEntityByID(ctx, ewp.Entity.OriginatedFrom) + if err != nil { + return nil, fmt.Errorf("error getting originating entity: %w", err) + } + eiw.WithID(entities.EntityTypeFromDB(dbEnt.EntityType), dbEnt.ID) + } + + err = eiw.ToMessage(m) + if err != nil { + return nil, fmt.Errorf("error converting entity to message: %w", err) + } + + return m, nil +} + +func (_ *toEntityInfoWrapper) GetName() string { + return "toEntityInfoWrapper" +} + +type toMinderEntityStrategy struct{} + +func (_ *toMinderEntityStrategy) CreateMessage(_ context.Context, ewp *models.EntityWithProperties) (*message.Message, error) { + m := message.NewMessage(uuid.New().String(), nil) + + entEvent := messages.NewMinderEvent(). + WithProjectID(ewp.Entity.ProjectID). + WithProviderID(ewp.Entity.ProviderID). + WithEntityType(ewp.Entity.Type.String()). + WithEntityID(ewp.Entity.ID) + + err := entEvent.ToMessage(m) + if err != nil { + return nil, fmt.Errorf("error converting entity to message: %w", err) + } + + return m, nil +} + +func (_ *toMinderEntityStrategy) GetName() string { + return "toMinderv1Entity" +} + +type createEmpty struct{} + +func (_ *createEmpty) CreateMessage(_ context.Context, _ *models.EntityWithProperties) (*message.Message, error) { + return nil, nil +} + +func (_ *createEmpty) GetName() string { + return "empty" +} + +func getEntityInner( + ctx context.Context, + entType minderv1.Entity, + entPropMap map[string]any, + hint EntityHint, + propSvc propertyService.PropertiesService, + getEntityOpts *propertyService.CallOptions, +) (*models.EntityWithProperties, error) { + svcHint := propertyService.ByUpstreamHint{} + if hint.ProviderImplementsHint != "" { + svcHint.ProviderImplements.Valid = true + if err := svcHint.ProviderImplements.ProviderType.Scan(hint.ProviderImplementsHint); err != nil { + return nil, fmt.Errorf("error scanning provider type: %w", err) + } + } + + lookupProperties, err := properties.NewProperties(entPropMap) + if err != nil { + return nil, fmt.Errorf("error creating properties: %w", err) + } + + ewp, err := propSvc.EntityWithPropertiesByUpstreamHint( + ctx, + entType, + lookupProperties, + svcHint, + getEntityOpts, + ) + if err != nil { + return nil, fmt.Errorf("error searching entity by ID: %w", err) + } + + return ewp, nil +} + +type getEntityStrategy interface { + GetEntity( + ctx context.Context, entMsg *HandleEntityAndDoMessage, + ) (*models.EntityWithProperties, error) + GetName() string +} + +type getEntityByUpstreamIDStrategy struct { + propSvc propertyService.PropertiesService +} + +func newGetEntityByUpstreamIDStrategy( + propSvc propertyService.PropertiesService, +) *getEntityByUpstreamIDStrategy { + return &getEntityByUpstreamIDStrategy{ + propSvc: propSvc, + } +} + +// GetEntity gets an entity by its upstream ID. +func (g *getEntityByUpstreamIDStrategy) GetEntity( + ctx context.Context, entMsg *HandleEntityAndDoMessage, +) (*models.EntityWithProperties, error) { + return getEntityInner(ctx, + entMsg.Entity.Type, entMsg.Entity.GetByProps, entMsg.Hint, + g.propSvc, + propertyService.CallBuilder()) +} + +// GetName returns the name of the strategy. Used for debugging +func (_ *getEntityByUpstreamIDStrategy) GetName() string { + return "getEntityByUpstreamID" +} + +type refreshEntityByUpstreamIDStrategy struct { + propSvc propertyService.PropertiesService + provMgr manager.ProviderManager + store db.Store +} + +func newRefreshEntityByUpstreamIDStrategy( + propSvc propertyService.PropertiesService, + provMgr manager.ProviderManager, + store db.Store, +) *refreshEntityByUpstreamIDStrategy { + return &refreshEntityByUpstreamIDStrategy{ + propSvc: propSvc, + provMgr: provMgr, + store: store, + } +} + +// GetEntity refreshes an entity by its upstream ID. +func (r *refreshEntityByUpstreamIDStrategy) GetEntity( + ctx context.Context, entMsg *HandleEntityAndDoMessage, +) (*models.EntityWithProperties, error) { + getEnt, err := db.WithTransaction(r.store, func(t db.ExtendQuerier) (*models.EntityWithProperties, error) { + ewp, err := getEntityInner( + ctx, + entMsg.Entity.Type, entMsg.Entity.GetByProps, entMsg.Hint, + r.propSvc, propertyService.CallBuilder().WithStoreOrTransaction(t)) + if err != nil { + return nil, fmt.Errorf("error getting entity: %w", err) + } + + err = r.propSvc.RetrieveAllPropertiesForEntity(ctx, ewp, r.provMgr, propertyService.ReadBuilder()) + if err != nil { + return nil, fmt.Errorf("error fetching repository: %w", err) + } + return ewp, nil + }) + if err != nil { + return nil, fmt.Errorf("error refreshing entity: %w", err) + } + + return getEnt, nil +} + +// GetName returns the name of the strategy. Used for debugging +func (_ *refreshEntityByUpstreamIDStrategy) GetName() string { + return "refreshEntityByUpstreamIDStrategy" +} + +type addOriginatingEntityStrategy struct { + propSvc propertyService.PropertiesService + provMgr manager.ProviderManager + store db.Store +} + +func newAddOriginatingEntityStrategy( + propSvc propertyService.PropertiesService, + provMgr manager.ProviderManager, + store db.Store, +) *addOriginatingEntityStrategy { + return &addOriginatingEntityStrategy{ + propSvc: propSvc, + provMgr: provMgr, + store: store, + } +} + +// GetEntity adds an originating entity. +func (a *addOriginatingEntityStrategy) GetEntity( + ctx context.Context, entMsg *HandleEntityAndDoMessage, +) (*models.EntityWithProperties, error) { + childProps, err := properties.NewProperties(entMsg.Entity.GetByProps) + if err != nil { + return nil, fmt.Errorf("error creating properties: %w", err) + } + + // store the originating entity + childEwp, err := db.WithTransaction(a.store, func(t db.ExtendQuerier) (*models.EntityWithProperties, error) { + parentEwp, err := getEntityInner( + ctx, + entMsg.Owner.Type, entMsg.Owner.GetByProps, entMsg.Hint, + a.propSvc, + propertyService.CallBuilder().WithStoreOrTransaction(t)) + if err != nil { + return nil, fmt.Errorf("error getting parent entity: %w", err) + } + + legacyId, err := a.upsertLegacyEntity(ctx, entMsg.Entity.Type, parentEwp, childProps, t) + if err != nil { + return nil, fmt.Errorf("error upserting legacy entity: %w", err) + } + + prov, err := a.provMgr.InstantiateFromID(ctx, parentEwp.Entity.ProviderID) + if err != nil { + return nil, fmt.Errorf("error getting provider: %w", err) + } + + childEntName, err := prov.GetEntityName(entMsg.Entity.Type, childProps) + if err != nil { + return nil, fmt.Errorf("error getting child entity name: %w", err) + } + + childEnt, err := t.CreateOrEnsureEntityByID(ctx, db.CreateOrEnsureEntityByIDParams{ + ID: legacyId, + EntityType: entities.EntityTypeToDB(entMsg.Entity.Type), + Name: childEntName, + ProjectID: parentEwp.Entity.ProjectID, + ProviderID: parentEwp.Entity.ProviderID, + OriginatedFrom: uuid.NullUUID{ + UUID: parentEwp.Entity.ID, + Valid: true, + }, + }) + if err != nil { + return nil, err + } + + upstreamProps, err := a.propSvc.RetrieveAllProperties(ctx, prov, + parentEwp.Entity.ProjectID, parentEwp.Entity.ProviderID, + childProps, entMsg.Entity.Type, + propertyService.ReadBuilder().WithStoreOrTransaction(t), + ) + if err != nil { + return nil, fmt.Errorf("error retrieving properties: %w", err) + } + + return models.NewEntityWithProperties(childEnt, upstreamProps), nil + + }) + + if err != nil { + return nil, fmt.Errorf("error storing originating entity: %w", err) + } + return childEwp, nil +} + +// GetName returns the name of the strategy. Used for debugging +func (_ *addOriginatingEntityStrategy) GetName() string { + return "addOriginatingEntityStrategy" +} + +func (_ *addOriginatingEntityStrategy) upsertLegacyEntity( + ctx context.Context, + entType minderv1.Entity, + parentEwp *models.EntityWithProperties, childProps *properties.Properties, + t db.ExtendQuerier, +) (uuid.UUID, error) { + var legacyId uuid.UUID + + switch entType { // nolint:exhaustive + case minderv1.Entity_ENTITY_PULL_REQUESTS: + dbPr, err := t.UpsertPullRequest(ctx, db.UpsertPullRequestParams{ + RepositoryID: parentEwp.Entity.ID, + PrNumber: childProps.GetProperty(ghprop.PullPropertyNumber).GetInt64(), + }) + if err != nil { + return uuid.Nil, fmt.Errorf("error upserting pull request: %w", err) + } + legacyId = dbPr.ID + case minderv1.Entity_ENTITY_ARTIFACTS: + // TODO: remove this once we migrate artifacts to entities. We should get rid of the provider name. + dbProv, err := t.GetProviderByID(ctx, parentEwp.Entity.ProviderID) + if err != nil { + return uuid.Nil, fmt.Errorf("error getting provider: %w", err) + } + + dbArtifact, err := t.UpsertArtifact(ctx, db.UpsertArtifactParams{ + RepositoryID: uuid.NullUUID{ + UUID: parentEwp.Entity.ID, + Valid: true, + }, + ArtifactName: childProps.GetProperty(ghprop.ArtifactPropertyName).GetString(), + ArtifactType: childProps.GetProperty(ghprop.ArtifactPropertyType).GetString(), + ArtifactVisibility: childProps.GetProperty(ghprop.ArtifactPropertyVisibility).GetString(), + ProjectID: parentEwp.Entity.ProjectID, + ProviderID: parentEwp.Entity.ProviderID, + ProviderName: dbProv.Name, + }) + if err != nil { + return uuid.Nil, fmt.Errorf("error upserting artifact: %w", err) + } + legacyId = dbArtifact.ID + } + + return legacyId, nil +} + +type delOriginatingEntityStrategy struct { + propSvc propertyService.PropertiesService + provMgr manager.ProviderManager + store db.Store +} + +func newDelOriginatingEntityStrategy( + propSvc propertyService.PropertiesService, + provMgr manager.ProviderManager, + store db.Store, +) *delOriginatingEntityStrategy { + return &delOriginatingEntityStrategy{ + propSvc: propSvc, + provMgr: provMgr, + store: store, + } +} + +// GetEntity deletes the originating entity. +func (d *delOriginatingEntityStrategy) GetEntity( + ctx context.Context, entMsg *HandleEntityAndDoMessage, +) (*models.EntityWithProperties, error) { + childProps, err := properties.NewProperties(entMsg.Entity.GetByProps) + if err != nil { + return nil, fmt.Errorf("error creating properties: %w", err) + } + + tx, err := d.store.BeginTransaction() + if err != nil { + return nil, fmt.Errorf("error starting transaction: %w", err) + } + defer func() { + _ = d.store.Rollback(tx) + }() + + txq := d.store.GetQuerierWithTransaction(tx) + if txq == nil { + return nil, fmt.Errorf("error getting querier") + } + + parentEwp, err := getEntityInner( + ctx, + entMsg.Owner.Type, entMsg.Owner.GetByProps, entMsg.Hint, + d.propSvc, + propertyService.CallBuilder().WithStoreOrTransaction(txq)) + if err != nil { + return nil, fmt.Errorf("error getting parent entity: %w", err) + } + + prov, err := d.provMgr.InstantiateFromID(ctx, parentEwp.Entity.ProviderID) + if err != nil { + return nil, fmt.Errorf("error getting provider: %w", err) + } + + childEntName, err := prov.GetEntityName(entMsg.Entity.Type, childProps) + if err != nil { + return nil, fmt.Errorf("error getting child entity name: %w", err) + } + + err = txq.DeleteEntityByName(ctx, db.DeleteEntityByNameParams{ + Name: childEntName, + ProjectID: parentEwp.Entity.ProjectID, + }) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return nil, err + } + + err = d.deleteLegacyEntity(ctx, entMsg.Entity.Type, parentEwp, childProps, txq) + if err != nil { + return nil, fmt.Errorf("error deleting legacy entity: %w", err) + } + + if err := d.store.Commit(tx); err != nil { + return nil, fmt.Errorf("error committing transaction: %w", err) + } + + return nil, nil +} + +func (_ *delOriginatingEntityStrategy) deleteLegacyEntity( + ctx context.Context, + entType minderv1.Entity, + parentEwp *models.EntityWithProperties, + childProps *properties.Properties, + t db.ExtendQuerier, +) error { + if entType == minderv1.Entity_ENTITY_PULL_REQUESTS { + err := t.DeletePullRequest(ctx, db.DeletePullRequestParams{ + RepositoryID: parentEwp.Entity.ID, + PrNumber: childProps.GetProperty(ghprop.PullPropertyNumber).GetInt64(), + }) + if err != nil { + return fmt.Errorf("error deleting pull request: %w", err) + } + } else { + return fmt.Errorf("unsupported entity type: %v", entType) + } + + return nil +} + +// GetName returns the name of the strategy. Used for debugging +func (_ *delOriginatingEntityStrategy) GetName() string { + return "delOriginatingEntityStrategy" +} + +type handleEntityAndDoBase struct { + evt events.Publisher + + refreshEntity getEntityStrategy + createMessage messageCreateStrategy + + handlerName string + forwardHandlerName string + + handlerMiddleware []message.HandlerMiddleware +} + +// Register satisfies the events.Consumer interface. +func (b *handleEntityAndDoBase) Register(r events.Registrar) { + r.Register(b.handlerName, b.handleRefreshEntityAndDo, b.handlerMiddleware...) +} + +func (b *handleEntityAndDoBase) handleRefreshEntityAndDo(msg *message.Message) error { + ctx := msg.Context() + + l := zerolog.Ctx(ctx).With(). + Str("messageStrategy", b.createMessage.GetName()). + Str("refreshStrategy", b.refreshEntity.GetName()). + Logger() + + // unmarshal the message + entMsg, err := messageToEntityRefreshAndDo(msg) + if err != nil { + l.Error().Err(err).Msg("error unpacking message") + return nil + } + l.Debug().Msg("message unpacked") + + // call refreshEntity + ewp, err := b.refreshEntity.GetEntity(ctx, entMsg) + if err != nil { + l.Error().Err(err).Msg("error refreshing entity") + // do not return error in the handler, just log it + // we might want to special-case retrying /some/ errors specifically those from the + // provider, but for now, just log it + return nil + } + + if ewp != nil { + l.Debug(). + Str("entityID", ewp.Entity.ID.String()). + Str("providerID", ewp.Entity.ProviderID.String()). + Msg("entity refreshed") + } else { + l.Debug().Msg("entity not retrieved") + } + + nextMsg, err := b.createMessage.CreateMessage(ctx, ewp) + if err != nil { + l.Error().Err(err).Msg("error creating message") + return nil + } + + // If nextMsg is nil, it means we don't need to publish anything (entity not found) + if nextMsg != nil { + l.Debug().Msg("publishing message") + if err := b.evt.Publish(b.forwardHandlerName, nextMsg); err != nil { + l.Error().Err(err).Msg("error publishing message") + return nil + } + } else { + l.Info().Msg("no message to publish") + } + + return nil +} + +// NewRefreshEntityAndEvaluateHandler creates a new handler that refreshes an entity and evaluates it. +func NewRefreshEntityAndEvaluateHandler( + evt events.Publisher, + store db.Store, + propSvc propertyService.PropertiesService, + provMgr manager.ProviderManager, + handlerMiddleware ...message.HandlerMiddleware, +) events.Consumer { + return &handleEntityAndDoBase{ + evt: evt, + + refreshEntity: newRefreshEntityByUpstreamIDStrategy(propSvc, provMgr, store), + createMessage: newToEntityInfoWrapper(store, propSvc, provMgr), + + handlerName: events.TopicQueueRefreshEntityAndEvaluate, + forwardHandlerName: events.TopicQueueEntityEvaluate, + + handlerMiddleware: handlerMiddleware, + } +} + +// NewGetEntityAndDeleteHandler creates a new handler that gets an entity and deletes it. +func NewGetEntityAndDeleteHandler( + evt events.Publisher, + propSvc propertyService.PropertiesService, + handlerMiddleware ...message.HandlerMiddleware, +) events.Consumer { + return &handleEntityAndDoBase{ + evt: evt, + + refreshEntity: newGetEntityByUpstreamIDStrategy(propSvc), + createMessage: &toMinderEntityStrategy{}, + + handlerName: events.TopicQueueGetEntityAndDelete, + forwardHandlerName: events.TopicQueueReconcileEntityDelete, + + handlerMiddleware: handlerMiddleware, + } +} + +// NewAddOriginatingEntityHandler creates a new handler that adds an originating entity. +func NewAddOriginatingEntityHandler( + evt events.Publisher, + store db.Store, + propSvc propertyService.PropertiesService, + provMgr manager.ProviderManager, + handlerMiddleware ...message.HandlerMiddleware, +) events.Consumer { + return &handleEntityAndDoBase{ + evt: evt, + + refreshEntity: newAddOriginatingEntityStrategy(propSvc, provMgr, store), + createMessage: newToEntityInfoWrapper(store, propSvc, provMgr), + + handlerName: events.TopicQueueOriginatingEntityAdd, + forwardHandlerName: events.TopicQueueEntityEvaluate, + + handlerMiddleware: handlerMiddleware, + } +} + +// NewRemoveOriginatingEntityHandler creates a new handler that removes an originating entity. +func NewRemoveOriginatingEntityHandler( + evt events.Publisher, + store db.Store, + propSvc propertyService.PropertiesService, + provMgr manager.ProviderManager, + handlerMiddleware ...message.HandlerMiddleware, +) events.Consumer { + return &handleEntityAndDoBase{ + evt: evt, + + refreshEntity: newDelOriginatingEntityStrategy(propSvc, provMgr, store), + createMessage: &createEmpty{}, + + handlerName: events.TopicQueueOriginatingEntityDelete, + + handlerMiddleware: handlerMiddleware, + } +} diff --git a/internal/entities/handlers/handler_test.go b/internal/entities/handlers/handler_test.go new file mode 100644 index 0000000000..df5baf86df --- /dev/null +++ b/internal/entities/handlers/handler_test.go @@ -0,0 +1,183 @@ +// +// Copyright 2024 Stacklok, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package handlers + +import ( + "database/sql" + "testing" + + "github.com/ThreeDotsLabs/watermill/message" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + + mockdb "github.com/stacklok/minder/database/mock" + "github.com/stacklok/minder/internal/engine/entities" + "github.com/stacklok/minder/internal/entities/models" + "github.com/stacklok/minder/internal/entities/properties" + "github.com/stacklok/minder/internal/entities/properties/service" + mock_service "github.com/stacklok/minder/internal/entities/properties/service/mock" + stubeventer "github.com/stacklok/minder/internal/events/stubs" + ghprops "github.com/stacklok/minder/internal/providers/github/properties" + "github.com/stacklok/minder/internal/providers/manager" + mock_manager "github.com/stacklok/minder/internal/providers/manager/mock" + minderv1 "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1" +) + +func TestRefreshEntityAndDoHandler_HandleRefreshEntityAndEval(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + lookupPropMap map[string]any + entPropMap map[string]any + nextHandler string + providerHint string + ewp *models.EntityWithProperties + setupMocks func(*gomock.Controller, *models.EntityWithProperties) (service.PropertiesService, manager.ProviderManager) + expectedError string + expectedPublish bool + }{ + { + name: "successful refresh and publish of a repo", + lookupPropMap: map[string]any{ + properties.PropertyUpstreamID: "123", + }, + ewp: &models.EntityWithProperties{ + Entity: models.EntityInstance{ + ID: uuid.New(), + Type: minderv1.Entity_ENTITY_REPOSITORIES, + Name: "testorg/testrepo", + ProviderID: uuid.New(), + ProjectID: uuid.New(), + }, + }, + nextHandler: "call.me.next", + providerHint: "github", + entPropMap: map[string]any{ + properties.PropertyName: "testorg/testrepo", + ghprops.RepoPropertyName: "testrepo", + ghprops.RepoPropertyOwner: "testorg", + ghprops.RepoPropertyId: int64(123), + properties.RepoPropertyIsPrivate: false, + properties.RepoPropertyIsFork: false, + }, + setupMocks: func(ctrl *gomock.Controller, ewp *models.EntityWithProperties) (service.PropertiesService, manager.ProviderManager) { + mockPropSvc := mock_service.NewMockPropertiesService(ctrl) + mockProvMgr := mock_manager.NewMockProviderManager(ctrl) + + protoEnt, err := ghprops.RepoV1FromProperties(ewp.Properties) + require.NoError(t, err) + + mockPropSvc.EXPECT(). + EntityWithPropertiesByUpstreamHint(gomock.Any(), ewp.Entity.Type, gomock.Any(), gomock.Any(), gomock.Any()). + Return(ewp, nil) + mockPropSvc.EXPECT(). + RetrieveAllPropertiesForEntity(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil) + mockPropSvc.EXPECT(). + EntityWithPropertiesAsProto(gomock.Any(), ewp, mockProvMgr). + Return(protoEnt, nil) + + return mockPropSvc, mockProvMgr + }, + expectedPublish: true, + }, + { + name: "error unpacking message", + setupMocks: func(ctrl *gomock.Controller, _ *models.EntityWithProperties) (service.PropertiesService, manager.ProviderManager) { + return mock_service.NewMockPropertiesService(ctrl), mock_manager.NewMockProviderManager(ctrl) + + }, + expectedError: "error unpacking message", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + getByProps, err := properties.NewProperties(tt.lookupPropMap) + require.NoError(t, err) + + handlerMsg := message.NewMessage(uuid.New().String(), nil) + entityMsg := NewEntityRefreshAndDoMessage(). + WithEntity(minderv1.Entity_ENTITY_REPOSITORIES, getByProps). + WithProviderImplementsHint(tt.providerHint) + + err = entityMsg.ToMessage(handlerMsg) + require.NoError(t, err) + + entProps, err := properties.NewProperties(tt.entPropMap) + require.NoError(t, err) + tt.ewp.Properties = entProps + + mockPropSvc, mockProvMgr := tt.setupMocks(ctrl, tt.ewp) + + mockStore := mockdb.NewMockStore(ctrl) + tx := sql.Tx{} + mockStore.EXPECT(). + BeginTransaction(). + Return(&tx, nil) + mockStore.EXPECT(). + Commit(&tx). + Return(nil) + mockStore.EXPECT(). + Rollback(&tx). + Return(nil) + mockStore.EXPECT(). + GetQuerierWithTransaction(&tx). + Return(mockStore) + + stubEventer := &stubeventer.StubEventer{} + handler := NewRefreshEntityAndEvaluateHandler(stubEventer, mockStore, mockPropSvc, mockProvMgr) + + refreshHandlerStruct, ok := handler.(*handleEntityAndDoBase) + require.True(t, ok) + err = refreshHandlerStruct.handleRefreshEntityAndDo(handlerMsg) + + if tt.expectedError != "" { + assert.Error(t, err) + assert.Contains(t, err.Error(), tt.expectedError) + } else { + assert.NoError(t, err) + } + + if !tt.expectedPublish { + assert.Equal(t, 0, len(stubEventer.Sent), "Expected no publish calls") + return + } + + assert.Equal(t, 1, len(stubEventer.Sent), "Expected one publish call") + sentMsg := stubEventer.Sent[0] + eiw, err := entities.ParseEntityEvent(sentMsg) + require.NoError(t, err) + require.NotNil(t, eiw) + + assert.Equal(t, tt.ewp.Entity.Type, eiw.Type) + assert.Equal(t, tt.ewp.Entity.ProjectID, eiw.ProjectID) + assert.Equal(t, tt.ewp.Entity.ProviderID, eiw.ProviderID) + + pbrepo, ok := eiw.Entity.(*minderv1.Repository) + require.True(t, ok) + assert.Equal(t, tt.entPropMap[ghprops.RepoPropertyName].(string), pbrepo.Name) + }) + } +} diff --git a/internal/entities/handlers/message.go b/internal/entities/handlers/message.go new file mode 100644 index 0000000000..6ab72299da --- /dev/null +++ b/internal/entities/handlers/message.go @@ -0,0 +1,96 @@ +// +// Copyright 2024 Stacklok, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package handlers + +import ( + "encoding/json" + "fmt" + + "github.com/ThreeDotsLabs/watermill/message" + + "github.com/stacklok/minder/internal/entities/properties" + v1 "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1" +) + +// TypedProps is a struct that contains the type of entity and its properties. +// it is used for either the entity or the owner entity. +type TypedProps struct { + Type v1.Entity `json:"type"` + GetByProps map[string]any `json:"get_by_props"` +} + +// EntityHint is a hint that is used to help the entity handler find the entity. +type EntityHint struct { + ProviderImplementsHint string `json:"provider_implements_hint"` +} + +// HandleEntityAndDoMessage is a message that is sent to the entity handler to refresh an entity and perform an action. +type HandleEntityAndDoMessage struct { + Entity TypedProps `json:"entity"` + Owner TypedProps `json:"owner"` + Hint EntityHint `json:"hint"` +} + +// NewEntityRefreshAndDoMessage creates a new HandleEntityAndDoMessage struct. +func NewEntityRefreshAndDoMessage() *HandleEntityAndDoMessage { + return &HandleEntityAndDoMessage{} +} + +// WithEntity sets the entity and its properties. +func (e *HandleEntityAndDoMessage) WithEntity(entType v1.Entity, getByProps *properties.Properties) *HandleEntityAndDoMessage { + e.Entity = TypedProps{ + Type: entType, + GetByProps: getByProps.ToProtoStruct().AsMap(), + } + return e +} + +// WithOwner sets the owner entity and its properties. +func (e *HandleEntityAndDoMessage) WithOwner(ownerType v1.Entity, ownerProps *properties.Properties) *HandleEntityAndDoMessage { + e.Owner = TypedProps{ + Type: ownerType, + GetByProps: ownerProps.ToProtoStruct().AsMap(), + } + return e +} + +// WithProviderImplementsHint sets the provider hint for the entity that will be used when looking up the entity. +func (e *HandleEntityAndDoMessage) WithProviderImplementsHint(providerHint string) *HandleEntityAndDoMessage { + e.Hint.ProviderImplementsHint = providerHint + return e +} + +func messageToEntityRefreshAndDo(msg *message.Message) (*HandleEntityAndDoMessage, error) { + entMsg := &HandleEntityAndDoMessage{} + + err := json.Unmarshal(msg.Payload, entMsg) + if err != nil { + return nil, fmt.Errorf("error unmarshalling entity: %w", err) + } + + return entMsg, nil +} + +// ToMessage converts the HandleEntityAndDoMessage struct to a Watermill message. +func (e *HandleEntityAndDoMessage) ToMessage(msg *message.Message) error { + payloadBytes, err := json.Marshal(e) + if err != nil { + return fmt.Errorf("error marshalling entity: %w", err) + } + + msg.Payload = payloadBytes + return nil +} diff --git a/internal/entities/handlers/message_test.go b/internal/entities/handlers/message_test.go new file mode 100644 index 0000000000..6b562e4fb3 --- /dev/null +++ b/internal/entities/handlers/message_test.go @@ -0,0 +1,109 @@ +// +// Copyright 2024 Stacklok, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package handlers + +import ( + "testing" + + "github.com/ThreeDotsLabs/watermill/message" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/stacklok/minder/internal/entities/properties" + v1 "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1" +) + +func TestEntityRefreshAndDoMessageRoundTrip(t *testing.T) { + t.Parallel() + + scenarios := []struct { + name string + props map[string]any + entType v1.Entity + ownerProps map[string]any + ownerType v1.Entity + providerHint string + }{ + { + name: "Valid repository entity", + props: map[string]any{ + "id": "123", + "name": "test-repo", + }, + entType: v1.Entity_ENTITY_REPOSITORIES, + providerHint: "github", + }, + { + name: "Valid artifact entity", + props: map[string]any{ + "id": "456", + "version": "1.0.0", + }, + entType: v1.Entity_ENTITY_ARTIFACTS, + ownerProps: map[string]any{ + "id": "123", + }, + ownerType: v1.Entity_ENTITY_REPOSITORIES, + providerHint: "docker", + }, + { + name: "Valid pull request entity", + props: map[string]any{ + "id": "789", + }, + entType: v1.Entity_ENTITY_PULL_REQUESTS, + ownerProps: map[string]any{ + "id": "123", + }, + ownerType: v1.Entity_ENTITY_REPOSITORIES, + providerHint: "github", + }, + } + + for _, sc := range scenarios { + t.Run(sc.name, func(t *testing.T) { + t.Parallel() + + props, err := properties.NewProperties(sc.props) + require.NoError(t, err) + + original := NewEntityRefreshAndDoMessage(). + WithEntity(sc.entType, props). + WithProviderImplementsHint(sc.providerHint) + + if sc.ownerProps != nil { + ownerProps, err := properties.NewProperties(sc.ownerProps) + require.NoError(t, err) + original.WithOwner(sc.ownerType, ownerProps) + } + + handlerMsg := message.NewMessage(uuid.New().String(), nil) + err = original.ToMessage(handlerMsg) + require.NoError(t, err) + + roundTrip, err := messageToEntityRefreshAndDo(handlerMsg) + assert.NoError(t, err) + assert.Equal(t, original.Entity.GetByProps, roundTrip.Entity.GetByProps) + assert.Equal(t, original.Entity.Type, roundTrip.Entity.Type) + assert.Equal(t, original.Hint.ProviderImplementsHint, roundTrip.Hint.ProviderImplementsHint) + if original.Owner.Type != v1.Entity_ENTITY_UNSPECIFIED { + assert.Equal(t, original.Owner.GetByProps, roundTrip.Owner.GetByProps) + assert.Equal(t, original.Owner.Type, roundTrip.Owner.Type) + } + }) + } +} diff --git a/internal/entities/models/models.go b/internal/entities/models/models.go index 8586074126..f13bc46e43 100644 --- a/internal/entities/models/models.go +++ b/internal/entities/models/models.go @@ -26,11 +26,12 @@ import ( // EntityInstance represents an entity instance type EntityInstance struct { - ID uuid.UUID - Type minderv1.Entity - Name string - ProviderID uuid.UUID - ProjectID uuid.UUID + ID uuid.UUID + Type minderv1.Entity + Name string + ProviderID uuid.UUID + ProjectID uuid.UUID + OriginatedFrom uuid.UUID } // EntityWithProperties represents an entity instance with properties @@ -41,13 +42,19 @@ type EntityWithProperties struct { // NewEntityWithProperties creates a new EntityWithProperties instance func NewEntityWithProperties(dbEntity db.EntityInstance, props *properties.Properties) *EntityWithProperties { + var originatedFrom uuid.UUID + if dbEntity.OriginatedFrom.Valid { + originatedFrom = dbEntity.OriginatedFrom.UUID + } + return &EntityWithProperties{ Entity: EntityInstance{ - ID: dbEntity.ID, - Type: entities.EntityTypeFromDB(dbEntity.EntityType), - Name: dbEntity.Name, - ProviderID: dbEntity.ProviderID, - ProjectID: dbEntity.ProjectID, + ID: dbEntity.ID, + Type: entities.EntityTypeFromDB(dbEntity.EntityType), + Name: dbEntity.Name, + ProviderID: dbEntity.ProviderID, + ProjectID: dbEntity.ProjectID, + OriginatedFrom: originatedFrom, }, Properties: props, } diff --git a/internal/events/constants.go b/internal/events/constants.go index 2ee1f1034c..d06e9d8af4 100644 --- a/internal/events/constants.go +++ b/internal/events/constants.go @@ -36,6 +36,14 @@ const ( ) const ( + // TopicQueueOriginatingEntityAdd adds an entity originating from another entity to the database + TopicQueueOriginatingEntityAdd = "originating.entity.add.event" + // TopicQueueOriginatingEntityDelete deletes an entity originating from another entity from the database + TopicQueueOriginatingEntityDelete = "originating.entity.delete.event" + // TopicQueueGetEntityAndDelete retrieves an entity from the database and schedules it for deletion + TopicQueueGetEntityAndDelete = "get.entity.delete.event" + // TopicQueueRefreshEntityAndEvaluate makes sure that entity properties are up-to-date and schedules an evaluation + TopicQueueRefreshEntityAndEvaluate = "refresh.entity.evaluate.event" // TopicQueueEntityEvaluate is the topic for entity evaluation events from webhooks TopicQueueEntityEvaluate = "execute.entity.event" // TopicQueueEntityFlush is the topic for flushing internal webhook events diff --git a/internal/proto/internal.pb.go b/internal/proto/internal.pb.go index 534a9938a1..b35050ec84 100644 --- a/internal/proto/internal.pb.go +++ b/internal/proto/internal.pb.go @@ -15,7 +15,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.2-devel +// protoc-gen-go v1.34.2 // protoc (unknown) // source: internal.proto diff --git a/internal/service/service.go b/internal/service/service.go index 7ea9ed03ac..646824ea1f 100644 --- a/internal/service/service.go +++ b/internal/service/service.go @@ -37,6 +37,7 @@ import ( "github.com/stacklok/minder/internal/email/noop" "github.com/stacklok/minder/internal/engine" "github.com/stacklok/minder/internal/engine/selectors" + "github.com/stacklok/minder/internal/entities/handlers" propService "github.com/stacklok/minder/internal/entities/properties/service" "github.com/stacklok/minder/internal/events" "github.com/stacklok/minder/internal/flags" @@ -257,6 +258,10 @@ func AllInOneServerService( im := installations.NewInstallationManager(ghProviders) evt.ConsumeEvents(im) + // Register the entity refresh manager to handle entity refresh events + refresh := handlers.NewRefreshEntityAndEvaluateHandler(evt, store, propSvc, providerManager) + evt.ConsumeEvents(refresh) + // Register the email manager to handle email invitations var mailClient events.Consumer if cfg.Email.AWSSES.Region != "" && cfg.Email.AWSSES.Sender != "" { diff --git a/pkg/api/protobuf/go/minder/v1/minder.pb.go b/pkg/api/protobuf/go/minder/v1/minder.pb.go index aff5b45b3d..83f48f88e8 100644 --- a/pkg/api/protobuf/go/minder/v1/minder.pb.go +++ b/pkg/api/protobuf/go/minder/v1/minder.pb.go @@ -15,7 +15,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.2-devel +// protoc-gen-go v1.34.2 // protoc (unknown) // source: minder/v1/minder.proto